-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Spark 3.4: Support pushing down system functions by V2 filters #7886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dc10ba2
b758cc5
985f3f5
222fb23
0a8a352
5e0628c
624fe54
7d67dda
997bed0
ee27567
c2efd43
69f08f2
ee81e6d
8671457
405cce5
27401ce
0f74daa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,26 +19,33 @@ | |
| package org.apache.iceberg.spark; | ||
|
|
||
| import static org.apache.iceberg.expressions.Expressions.and; | ||
| import static org.apache.iceberg.expressions.Expressions.bucket; | ||
| import static org.apache.iceberg.expressions.Expressions.day; | ||
| import static org.apache.iceberg.expressions.Expressions.equal; | ||
| import static org.apache.iceberg.expressions.Expressions.greaterThan; | ||
| import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; | ||
| import static org.apache.iceberg.expressions.Expressions.hour; | ||
| import static org.apache.iceberg.expressions.Expressions.in; | ||
| import static org.apache.iceberg.expressions.Expressions.isNaN; | ||
| import static org.apache.iceberg.expressions.Expressions.isNull; | ||
| import static org.apache.iceberg.expressions.Expressions.lessThan; | ||
| import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; | ||
| import static org.apache.iceberg.expressions.Expressions.month; | ||
| import static org.apache.iceberg.expressions.Expressions.not; | ||
| import static org.apache.iceberg.expressions.Expressions.notEqual; | ||
| import static org.apache.iceberg.expressions.Expressions.notIn; | ||
| import static org.apache.iceberg.expressions.Expressions.notNaN; | ||
| import static org.apache.iceberg.expressions.Expressions.notNull; | ||
| import static org.apache.iceberg.expressions.Expressions.or; | ||
| import static org.apache.iceberg.expressions.Expressions.ref; | ||
| import static org.apache.iceberg.expressions.Expressions.startsWith; | ||
| import static org.apache.iceberg.expressions.Expressions.truncate; | ||
| import static org.apache.iceberg.expressions.Expressions.year; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.expressions.Expression.Operation; | ||
|
|
@@ -47,10 +54,12 @@ | |
| import org.apache.iceberg.expressions.UnboundTerm; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; | ||
| import org.apache.iceberg.util.NaNUtil; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.apache.spark.sql.connector.expressions.Literal; | ||
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
| import org.apache.spark.sql.connector.expressions.UserDefinedScalarFunc; | ||
| import org.apache.spark.sql.connector.expressions.filter.And; | ||
| import org.apache.spark.sql.connector.expressions.filter.Not; | ||
| import org.apache.spark.sql.connector.expressions.filter.Or; | ||
|
|
@@ -59,6 +68,9 @@ | |
|
|
||
| public class SparkV2Filters { | ||
|
|
||
| public static final Set<String> SUPPORTED_FUNCTIONS = | ||
| ImmutableSet.of("years", "months", "days", "hours", "bucket", "truncate"); | ||
|
|
||
| private static final String TRUE = "ALWAYS_TRUE"; | ||
| private static final String FALSE = "ALWAYS_FALSE"; | ||
| private static final String EQ = "="; | ||
|
|
@@ -98,6 +110,18 @@ public class SparkV2Filters { | |
|
|
||
| private SparkV2Filters() {} | ||
|
|
||
| public static Expression convert(Predicate[] predicates) { | ||
| Expression expression = Expressions.alwaysTrue(); | ||
| for (Predicate predicate : predicates) { | ||
| Expression converted = convert(predicate); | ||
| Preconditions.checkArgument( | ||
| converted != null, "Cannot convert Spark predicate to Iceberg expression: %s", predicate); | ||
| expression = Expressions.and(expression, converted); | ||
| } | ||
|
|
||
| return expression; | ||
| } | ||
|
|
||
| @SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"}) | ||
| public static Expression convert(Predicate predicate) { | ||
| Operation op = FILTERS.get(predicate.name()); | ||
|
|
@@ -110,51 +134,69 @@ public static Expression convert(Predicate predicate) { | |
| return Expressions.alwaysFalse(); | ||
|
|
||
| case IS_NULL: | ||
| return isRef(child(predicate)) ? isNull(SparkUtil.toColumnName(child(predicate))) : null; | ||
| if (canConvertToTerm(child(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(child(predicate)); | ||
| return term != null ? isNull(term) : null; | ||
| } | ||
|
|
||
| return null; | ||
|
|
||
| case NOT_NULL: | ||
| return isRef(child(predicate)) ? notNull(SparkUtil.toColumnName(child(predicate))) : null; | ||
| if (canConvertToTerm(child(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(child(predicate)); | ||
| return term != null ? notNull(term) : null; | ||
| } | ||
|
|
||
| return null; | ||
|
|
||
| case LT: | ||
| if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(leftChild(predicate)); | ||
| return lessThan(columnName, convertLiteral(rightChild(predicate))); | ||
| } else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(rightChild(predicate)); | ||
| return greaterThan(columnName, convertLiteral(leftChild(predicate))); | ||
| if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(leftChild(predicate)); | ||
| return term != null ? lessThan(term, convertLiteral(rightChild(predicate))) : null; | ||
| } else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(rightChild(predicate)); | ||
| return term != null ? greaterThan(term, convertLiteral(leftChild(predicate))) : null; | ||
| } else { | ||
| return null; | ||
| } | ||
|
|
||
| case LT_EQ: | ||
| if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(leftChild(predicate)); | ||
| return lessThanOrEqual(columnName, convertLiteral(rightChild(predicate))); | ||
| } else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(rightChild(predicate)); | ||
| return greaterThanOrEqual(columnName, convertLiteral(leftChild(predicate))); | ||
| if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(leftChild(predicate)); | ||
| return term != null | ||
| ? lessThanOrEqual(term, convertLiteral(rightChild(predicate))) | ||
| : null; | ||
| } else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(rightChild(predicate)); | ||
| return term != null | ||
| ? greaterThanOrEqual(term, convertLiteral(leftChild(predicate))) | ||
| : null; | ||
| } else { | ||
| return null; | ||
| } | ||
|
|
||
| case GT: | ||
| if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(leftChild(predicate)); | ||
| return greaterThan(columnName, convertLiteral(rightChild(predicate))); | ||
| } else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(rightChild(predicate)); | ||
| return lessThan(columnName, convertLiteral(leftChild(predicate))); | ||
| if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(leftChild(predicate)); | ||
| return term != null ? greaterThan(term, convertLiteral(rightChild(predicate))) : null; | ||
| } else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(rightChild(predicate)); | ||
| return term != null ? lessThan(term, convertLiteral(leftChild(predicate))) : null; | ||
| } else { | ||
| return null; | ||
| } | ||
|
|
||
| case GT_EQ: | ||
| if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(leftChild(predicate)); | ||
| return greaterThanOrEqual(columnName, convertLiteral(rightChild(predicate))); | ||
| } else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| String columnName = SparkUtil.toColumnName(rightChild(predicate)); | ||
| return lessThanOrEqual(columnName, convertLiteral(leftChild(predicate))); | ||
| if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(leftChild(predicate)); | ||
| return term != null | ||
| ? greaterThanOrEqual(term, convertLiteral(rightChild(predicate))) | ||
| : null; | ||
| } else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(rightChild(predicate)); | ||
| return term != null | ||
| ? lessThanOrEqual(term, convertLiteral(leftChild(predicate))) | ||
| : null; | ||
| } else { | ||
| return null; | ||
| } | ||
|
|
@@ -191,13 +233,17 @@ public static Expression convert(Predicate predicate) { | |
|
|
||
| case IN: | ||
| if (isSupportedInPredicate(predicate)) { | ||
| return in( | ||
| SparkUtil.toColumnName(childAtIndex(predicate, 0)), | ||
| Arrays.stream(predicate.children()) | ||
| .skip(1) | ||
| .map(val -> convertLiteral(((Literal<?>) val))) | ||
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toList())); | ||
| UnboundTerm<Object> term = toTerm(childAtIndex(predicate, 0)); | ||
|
|
||
| return term != null | ||
| ? in( | ||
| term, | ||
| Arrays.stream(predicate.children()) | ||
| .skip(1) | ||
| .map(val -> convertLiteral(((Literal<?>) val))) | ||
| .filter(Objects::nonNull) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this isn't correct. If there is a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, looking into this more I don't think this is a bug introduced by this PR, but a null should not be ignored. Any null value must return
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here the operations same as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is absolutely a bug. Iceberg should never accept an IN or NOT IN predicate containing a null value.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will do a follow-up for this. |
||
| .collect(Collectors.toList())) | ||
| : null; | ||
| } else { | ||
| return null; | ||
| } | ||
|
|
@@ -206,18 +252,23 @@ public static Expression convert(Predicate predicate) { | |
| Not notPredicate = (Not) predicate; | ||
| Predicate childPredicate = notPredicate.child(); | ||
| if (childPredicate.name().equals(IN) && isSupportedInPredicate(childPredicate)) { | ||
| UnboundTerm<Object> term = toTerm(childAtIndex(childPredicate, 0)); | ||
| if (term == null) { | ||
| return null; | ||
| } | ||
|
|
||
| // infer an extra notNull predicate for Spark NOT IN filters | ||
| // as Iceberg expressions don't follow the 3-value SQL boolean logic | ||
| // col NOT IN (1, 2) in Spark is equal to notNull(col) && notIn(col, 1, 2) in Iceberg | ||
| Expression notIn = | ||
| notIn( | ||
| SparkUtil.toColumnName(childAtIndex(childPredicate, 0)), | ||
| term, | ||
| Arrays.stream(childPredicate.children()) | ||
| .skip(1) | ||
| .map(val -> convertLiteral(((Literal<?>) val))) | ||
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toList())); | ||
| return and(notNull(SparkUtil.toColumnName(childAtIndex(childPredicate, 0))), notIn); | ||
| return and(notNull(term), notIn); | ||
| } else if (hasNoInFilter(childPredicate)) { | ||
| Expression child = convert(childPredicate); | ||
| if (child != null) { | ||
|
|
@@ -258,15 +309,13 @@ public static Expression convert(Predicate predicate) { | |
| } | ||
|
|
||
| private static Pair<UnboundTerm<Object>, Object> predicateChildren(Predicate predicate) { | ||
| if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = ref(SparkUtil.toColumnName(leftChild(predicate))); | ||
| Object value = convertLiteral(rightChild(predicate)); | ||
| return Pair.of(term, value); | ||
| if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(leftChild(predicate)); | ||
| return term != null ? Pair.of(term, convertLiteral(rightChild(predicate))) : null; | ||
|
|
||
| } else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = ref(SparkUtil.toColumnName(rightChild(predicate))); | ||
| Object value = convertLiteral(leftChild(predicate)); | ||
| return Pair.of(term, value); | ||
| } else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) { | ||
| UnboundTerm<Object> term = toTerm(rightChild(predicate)); | ||
| return term != null ? Pair.of(term, convertLiteral(leftChild(predicate))) : null; | ||
|
|
||
| } else { | ||
| return null; | ||
|
|
@@ -302,10 +351,26 @@ private static <T> T childAtIndex(Predicate predicate, int index) { | |
| return (T) predicate.children()[index]; | ||
| } | ||
|
|
||
| private static boolean canConvertToTerm( | ||
| org.apache.spark.sql.connector.expressions.Expression expr) { | ||
| return isRef(expr) || isSystemFunc(expr); | ||
| } | ||
|
|
||
| private static boolean isRef(org.apache.spark.sql.connector.expressions.Expression expr) { | ||
| return expr instanceof NamedReference; | ||
| } | ||
|
|
||
| private static boolean isSystemFunc(org.apache.spark.sql.connector.expressions.Expression expr) { | ||
| if (expr instanceof UserDefinedScalarFunc) { | ||
| UserDefinedScalarFunc udf = (UserDefinedScalarFunc) expr; | ||
| return udf.canonicalName().startsWith("iceberg") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this string check? I feel like all of the functions should be listed in the list on 369?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There could be possible some other UDF functions like 'catalog.other.years'.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem is that this is using the UDF's If we want to limit to just the Iceberg-defined functions, then this is necessary. It may be better to have a set of supported functions that uses just the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There are some
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While that's not a constant, there are a limited number of values that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tried to do that, however @Override
public String canonicalName() {
return String.format("iceberg.truncate(decimal(%d,%d))", precision, scale);
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can similarly enumerate them, but this isn't a big deal. |
||
| && SUPPORTED_FUNCTIONS.contains(udf.name()) | ||
| && Arrays.stream(udf.children()).allMatch(child -> isLiteral(child) || isRef(child)); | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| private static boolean isLiteral(org.apache.spark.sql.connector.expressions.Expression expr) { | ||
| return expr instanceof Literal; | ||
| } | ||
|
|
@@ -360,10 +425,57 @@ private static boolean hasNoInFilter(Predicate predicate) { | |
| } | ||
|
|
||
| private static boolean isSupportedInPredicate(Predicate predicate) { | ||
| if (!isRef(childAtIndex(predicate, 0))) { | ||
| if (!canConvertToTerm(childAtIndex(predicate, 0))) { | ||
| return false; | ||
| } else { | ||
| return Arrays.stream(predicate.children()).skip(1).allMatch(SparkV2Filters::isLiteral); | ||
| } | ||
| } | ||
|
|
||
| /** Should be called after {@link #canConvertToTerm} passed */ | ||
| private static <T> UnboundTerm<Object> toTerm(T input) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be really nice if Spark passed the |
||
| if (input instanceof NamedReference) { | ||
| return Expressions.ref(SparkUtil.toColumnName((NamedReference) input)); | ||
| } else if (input instanceof UserDefinedScalarFunc) { | ||
| return udfToTerm((UserDefinedScalarFunc) input); | ||
| } else { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("checkstyle:CyclomaticComplexity") | ||
| private static UnboundTerm<Object> udfToTerm(UserDefinedScalarFunc udf) { | ||
| org.apache.spark.sql.connector.expressions.Expression[] children = udf.children(); | ||
| String udfName = udf.name().toLowerCase(Locale.ROOT); | ||
| if (children.length == 1) { | ||
| org.apache.spark.sql.connector.expressions.Expression child = children[0]; | ||
| if (isRef(child)) { | ||
|
rdblue marked this conversation as resolved.
|
||
| String column = SparkUtil.toColumnName((NamedReference) child); | ||
| switch (udfName) { | ||
| case "years": | ||
| return year(column); | ||
| case "months": | ||
| return month(column); | ||
| case "days": | ||
| return day(column); | ||
| case "hours": | ||
| return hour(column); | ||
| } | ||
| } | ||
| } else if (children.length == 2) { | ||
| if (isLiteral(children[0]) && isRef(children[1])) { | ||
| String column = SparkUtil.toColumnName((NamedReference) children[1]); | ||
| switch (udfName) { | ||
| case "bucket": | ||
| int numBuckets = (Integer) convertLiteral((Literal<?>) children[0]); | ||
| return bucket(column, numBuckets); | ||
| case "truncate": | ||
| int width = (Integer) convertLiteral((Literal<?>) children[0]); | ||
| return truncate(column, width); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct behavior?
I think the reason why we didn't have bulk conversion was so that the caller could handle predicates that couldn't be converted individually. In most cases, we want to push as many predicates as possible. Failing the entire conversion with an exception because one couldn't be converted isn't a good option because we then can't push the predicates that can be converted and let Spark or other engines handle the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the behavior of
SparkFilters. You are right. It should not raise an exception.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think that the difference is that there are limited uses of
SparkFiltersand those cases would reject the operation if any filter wasn't convertible (e.g. when deleting or in an overwrite).I think this is okay since it's following what was done elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if it's following what was done elsewhere this means if there was non-iceberg UDF predicate that got pushed down it would fail to push down the iceberg expressions? Looking at the code this only seems to be used (currently) in the deleteWhere, but I think for the deleteWhere codepath we should not throw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the doc and implementation of
canDeleteWhere/deleteWhere:And the implementation of
canDelete:So the
Expression deleteExpr = SparkV2Filters.convert(predicates);should only be called when all thepredicatecould be translated into iceberg expressions.