Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.functions;

import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.TimestampType;

/**
* A Spark function implementation for the Iceberg day transform.
*
* <p>Example usage: {@code SELECT system.days('source_col')}.
*/
public class DaysFunction extends UnaryUnboundFunction {

@Override
protected BoundFunction doBind(DataType valueType) {
if (valueType instanceof DateType) {
return new DateToDaysFunction();
} else if (valueType instanceof TimestampType) {
return new TimestampToDaysFunction();
} else {
throw new UnsupportedOperationException(
"Expected value to be date or timestamp: " + valueType.catalogString());
}
}

@Override
public String description() {
return name()
+ "(col) - Call Iceberg's day transform\n"
+ " col :: source column (must be date or timestamp)";
}

@Override
public String name() {
return "days";
}

private abstract static class BaseToDaysFunction implements ScalarFunction<Integer> {
@Override
public String name() {
return "days";
}

@Override
public DataType resultType() {
return DataTypes.DateType;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result type is Date to match what we do in core.

}
}

// Spark and Iceberg internal representations of dates match so no transformation is required
public static class DateToDaysFunction extends BaseToDaysFunction {
// magic method used in codegen
public static int invoke(int days) {
return days;
}

@Override
public DataType[] inputTypes() {
return new DataType[] {DataTypes.DateType};
}

@Override
public String canonicalName() {
return "iceberg.days(date)";
}

@Override
public Integer produceResult(InternalRow input) {
// return null for null input to match what Spark does in codegen
return input.isNullAt(0) ? null : input.getInt(0);
}
}

public static class TimestampToDaysFunction extends BaseToDaysFunction {
// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToDays(micros);
}

@Override
public DataType[] inputTypes() {
return new DataType[] {DataTypes.TimestampType};
}

@Override
public String canonicalName() {
return "iceberg.days(timestamp)";
}

@Override
public Integer produceResult(InternalRow input) {
// return null for null input to match what Spark does in codegen
return input.isNullAt(0) ? null : invoke(input.getLong(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private SparkFunctions() {}
"iceberg_version", new IcebergVersionFunction(),
"years", new YearsFunction(),
"months", new MonthsFunction(),
"days", new DaysFunction(),
"bucket", new BucketFunction(),
"truncate", new TruncateFunction());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.sql;

import java.sql.Date;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.AnalysisException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestSparkDaysFunction extends SparkTestBaseWithCatalog {

@Before
public void useCatalog() {
sql("USE %s", catalogName);
}

@Test
public void testDates() {
Assert.assertEquals(
"Expected to produce 2017-12-01",
Date.valueOf("2017-12-01"),
scalarSql("SELECT system.days(date('2017-12-01'))"));
Assert.assertEquals(
"Expected to produce 1970-01-01",
Date.valueOf("1970-01-01"),
scalarSql("SELECT system.days(date('1970-01-01'))"));
Assert.assertEquals(
"Expected to produce 1969-12-31",
Date.valueOf("1969-12-31"),
scalarSql("SELECT system.days(date('1969-12-31'))"));
Assert.assertNull(scalarSql("SELECT system.days(CAST(null AS DATE))"));
}

@Test
public void testTimestamps() {
Assert.assertEquals(
"Expected to produce 2017-12-01",
Date.valueOf("2017-12-01"),
scalarSql("SELECT system.days(TIMESTAMP '2017-12-01 10:12:55.038194 UTC+00:00')"));
Assert.assertEquals(
"Expected to produce 1970-01-01",
Date.valueOf("1970-01-01"),
scalarSql("SELECT system.days(TIMESTAMP '1970-01-01 00:00:01.000001 UTC+00:00')"));
Assert.assertEquals(
"Expected to produce 1969-12-31",
Date.valueOf("1969-12-31"),
scalarSql("SELECT system.days(TIMESTAMP '1969-12-31 23:59:58.999999 UTC+00:00')"));
Assert.assertNull(scalarSql("SELECT system.days(CAST(null AS DATE))"));
}

@Test
public void testWrongNumberOfArguments() {
AssertHelpers.assertThrows(
"Function resolution should not work with zero arguments",
AnalysisException.class,
"Function 'days' cannot process input: (): Wrong number of inputs",
() -> scalarSql("SELECT system.days()"));

AssertHelpers.assertThrows(
"Function resolution should not work with more than one argument",
AnalysisException.class,
"Function 'days' cannot process input: (date, date): Wrong number of inputs",
() -> scalarSql("SELECT system.days(date('1969-12-31'), date('1969-12-31'))"));
}

@Test
public void testInvalidInputTypes() {
AssertHelpers.assertThrows(
"Int type should not be coercible to date/timestamp",
AnalysisException.class,
"Function 'days' cannot process input: (int): Expected value to be date or timestamp",
() -> scalarSql("SELECT system.days(1)"));

AssertHelpers.assertThrows(
"Long type should not be coercible to date/timestamp",
AnalysisException.class,
"Function 'days' cannot process input: (bigint): Expected value to be date or timestamp",
() -> scalarSql("SELECT system.days(1L)"));
}
}