Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.functions;

import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.TimestampType;

/**
* A Spark function implementation for the Iceberg hour transform.
*
* <p>Example usage: {@code SELECT system.hours('source_col')}.
*/
public class HoursFunction extends UnaryUnboundFunction {

@Override
protected BoundFunction doBind(DataType valueType) {
if (valueType instanceof TimestampType) {
return new TimestampToHoursFunction();
} else {
throw new UnsupportedOperationException(
"Expected value to be timestamp: " + valueType.catalogString());
}
}

@Override
public String description() {
return name()
+ "(col) - Call Iceberg's hour transform\n"
+ " col :: source column (must be timestamp)";
}

@Override
public String name() {
return "hours";
}

public static class TimestampToHoursFunction implements ScalarFunction<Integer> {
// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToHours(micros);
}

@Override
public String name() {
return "hours";
}

@Override
public DataType[] inputTypes() {
return new DataType[] {DataTypes.TimestampType};
}

@Override
public DataType resultType() {
return DataTypes.IntegerType;
}

@Override
public String canonicalName() {
return "iceberg.hours(timestamp)";
}

@Override
public Integer produceResult(InternalRow input) {
// return null for null input to match what Spark does in codegen
return input.isNullAt(0) ? null : invoke(input.getLong(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private SparkFunctions() {}
"years", new YearsFunction(),
"months", new MonthsFunction(),
"days", new DaysFunction(),
"hours", new HoursFunction(),
"bucket", new BucketFunction(),
"truncate", new TruncateFunction());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.sql;

import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.AnalysisException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestSparkHoursFunction extends SparkTestBaseWithCatalog {

@Before
public void useCatalog() {
sql("USE %s", catalogName);
}

@Test
public void testTimestamps() {
Assert.assertEquals(
"Expected to produce 17501 * 24 + 10",
420034,
scalarSql("SELECT system.hours(TIMESTAMP '2017-12-01 10:12:55.038194 UTC+00:00')"));
Assert.assertEquals(
"Expected to produce 0 * 24 + 0 = 0",
0,
scalarSql("SELECT system.hours(TIMESTAMP '1970-01-01 00:00:01.000001 UTC+00:00')"));
Assert.assertEquals(
"Expected to produce -1",
-1,
scalarSql("SELECT system.hours(TIMESTAMP '1969-12-31 23:59:58.999999 UTC+00:00')"));
Assert.assertNull(scalarSql("SELECT system.hours(CAST(null AS TIMESTAMP))"));
}

@Test
public void testWrongNumberOfArguments() {
AssertHelpers.assertThrows(
"Function resolution should not work with zero arguments",
AnalysisException.class,
"Function 'hours' cannot process input: (): Wrong number of inputs",
() -> scalarSql("SELECT system.hours()"));

AssertHelpers.assertThrows(
"Function resolution should not work with more than one argument",
AnalysisException.class,
"Function 'hours' cannot process input: (date, date): Wrong number of inputs",
() -> scalarSql("SELECT system.hours(date('1969-12-31'), date('1969-12-31'))"));
}

@Test
public void testInvalidInputTypes() {
AssertHelpers.assertThrows(
"Int type should not be coercible to timestamp",
AnalysisException.class,
"Function 'hours' cannot process input: (int): Expected value to be timestamp",
() -> scalarSql("SELECT system.hours(1)"));

AssertHelpers.assertThrows(
"Long type should not be coercible to timestamp",
AnalysisException.class,
"Function 'hours' cannot process input: (bigint): Expected value to be timestamp",
() -> scalarSql("SELECT system.hours(1L)"));
}
}