Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,33 @@
package org.apache.iceberg.spark;

import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.bucket;
import static org.apache.iceberg.expressions.Expressions.day;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThan;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.hour;
import static org.apache.iceberg.expressions.Expressions.in;
import static org.apache.iceberg.expressions.Expressions.isNaN;
import static org.apache.iceberg.expressions.Expressions.isNull;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.month;
import static org.apache.iceberg.expressions.Expressions.not;
import static org.apache.iceberg.expressions.Expressions.notEqual;
import static org.apache.iceberg.expressions.Expressions.notIn;
import static org.apache.iceberg.expressions.Expressions.notNaN;
import static org.apache.iceberg.expressions.Expressions.notNull;
import static org.apache.iceberg.expressions.Expressions.or;
import static org.apache.iceberg.expressions.Expressions.ref;
import static org.apache.iceberg.expressions.Expressions.startsWith;
import static org.apache.iceberg.expressions.Expressions.truncate;
import static org.apache.iceberg.expressions.Expressions.year;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expression.Operation;
Expand All @@ -47,10 +54,12 @@
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.NaNUtil;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.UserDefinedScalarFunc;
import org.apache.spark.sql.connector.expressions.filter.And;
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
Expand All @@ -59,6 +68,9 @@

public class SparkV2Filters {

public static final Set<String> SUPPORTED_FUNCTIONS =
ImmutableSet.of("years", "months", "days", "hours", "bucket", "truncate");

private static final String TRUE = "ALWAYS_TRUE";
private static final String FALSE = "ALWAYS_FALSE";
private static final String EQ = "=";
Expand Down Expand Up @@ -98,6 +110,18 @@ public class SparkV2Filters {

private SparkV2Filters() {}

public static Expression convert(Predicate[] predicates) {
Expression expression = Expressions.alwaysTrue();
for (Predicate predicate : predicates) {
Expression converted = convert(predicate);
Preconditions.checkArgument(
converted != null, "Cannot convert Spark predicate to Iceberg expression: %s", predicate);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct behavior?

I think the reason why we didn't have bulk conversion was so that the caller could handle predicates that couldn't be converted individually. In most cases, we want to push as many predicates as possible. Failing the entire conversion with an exception because one couldn't be converted isn't a good option because we then can't push the predicates that can be converted and let Spark or other engines handle the rest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the behavior of SparkFilters. You are right. It should not raise an exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think that the difference is that there are limited uses of SparkFilters and those cases would reject the operation if any filter wasn't convertible (e.g. when deleting or in an overwrite).

I think this is okay since it's following what was done elsewhere.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's following what was done elsewhere this means if there was non-iceberg UDF predicate that got pushed down it would fail to push down the iceberg expressions? Looking at the code this only seems to be used (currently) in the deleteWhere, but I think for the deleteWhere codepath we should not throw.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the doc and implementation of canDeleteWhere/deleteWhere :

  /**
   * Checks whether it is possible to delete data from a data source table that matches filter
   * expressions.
   * <p>
   * Rows should be deleted from the data source iff all of the filter expressions match.
   * That is, the expressions must be interpreted as a set of filters that are ANDed together.
   * <p>
   * Spark will call this method at planning time to check whether {@link #deleteWhere(Predicate[])}
   * would reject the delete operation because it requires significant effort. If this method
   * returns false, Spark will not call {@link #deleteWhere(Predicate[])} and will try to rewrite
   * the delete operation and produce row-level changes if the data source table supports deleting
   * individual records.
   *
   * @param predicates V2 filter expressions, used to select rows to delete when all expressions
   *                  match
   * @return true if the delete operation can be performed
   *
   * @since 3.4.0
   */
  default boolean canDeleteWhere(Predicate[] predicates) {
    return true;
  }

  /**
   * Delete data from a data source table that matches filter expressions. Note that this method
   * will be invoked only if {@link #canDeleteWhere(Predicate[])} returns true.
   * <p>
   * Rows are deleted from the data source iff all of the filter expressions match. That is, the
   * expressions must be interpreted as a set of filters that are ANDed together.
   * <p>
   * Implementations may reject a delete operation if the delete isn't possible without significant
   * effort. For example, partitioned data sources may reject deletes that do not filter by
   * partition columns because the filter may require rewriting files without deleted records.
   * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
   * error message that identifies which expression was rejected.
   *
   * @param predicates predicate expressions, used to select rows to delete when all expressions
   *                  match
   * @throws IllegalArgumentException If the delete is rejected due to required effort
   */
  void deleteWhere(Predicate[] predicates);

And the implementation of canDelete:

  @Override
  public boolean canDeleteWhere(Predicate[] predicates) {
    Preconditions.checkArgument(
        snapshotId == null, "Cannot delete from table at a specific snapshot: %s", snapshotId);

    Expression deleteExpr = Expressions.alwaysTrue();

    for (Predicate predicate : predicates) {
      Expression expr = SparkV2Filters.convert(predicate);
      if (expr != null) {
        deleteExpr = Expressions.and(deleteExpr, expr);
      } else {
        return false;
      }
    }

    return canDeleteUsingMetadata(deleteExpr);
  }

So the Expression deleteExpr = SparkV2Filters.convert(predicates); should only be called when all the predicate could be translated into iceberg expressions.

expression = Expressions.and(expression, converted);
}

return expression;
}

@SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"})
public static Expression convert(Predicate predicate) {
Operation op = FILTERS.get(predicate.name());
Expand All @@ -110,51 +134,69 @@ public static Expression convert(Predicate predicate) {
return Expressions.alwaysFalse();

case IS_NULL:
return isRef(child(predicate)) ? isNull(SparkUtil.toColumnName(child(predicate))) : null;
if (canConvertToTerm(child(predicate))) {
UnboundTerm<Object> term = toTerm(child(predicate));
return term != null ? isNull(term) : null;
}

return null;

case NOT_NULL:
return isRef(child(predicate)) ? notNull(SparkUtil.toColumnName(child(predicate))) : null;
if (canConvertToTerm(child(predicate))) {
UnboundTerm<Object> term = toTerm(child(predicate));
return term != null ? notNull(term) : null;
}

return null;

case LT:
if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
String columnName = SparkUtil.toColumnName(leftChild(predicate));
return lessThan(columnName, convertLiteral(rightChild(predicate)));
} else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
String columnName = SparkUtil.toColumnName(rightChild(predicate));
return greaterThan(columnName, convertLiteral(leftChild(predicate)));
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null ? lessThan(term, convertLiteral(rightChild(predicate))) : null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null ? greaterThan(term, convertLiteral(leftChild(predicate))) : null;
} else {
return null;
}

case LT_EQ:
if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
String columnName = SparkUtil.toColumnName(leftChild(predicate));
return lessThanOrEqual(columnName, convertLiteral(rightChild(predicate)));
} else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
String columnName = SparkUtil.toColumnName(rightChild(predicate));
return greaterThanOrEqual(columnName, convertLiteral(leftChild(predicate)));
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null
? lessThanOrEqual(term, convertLiteral(rightChild(predicate)))
: null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null
? greaterThanOrEqual(term, convertLiteral(leftChild(predicate)))
: null;
} else {
return null;
}

case GT:
if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
String columnName = SparkUtil.toColumnName(leftChild(predicate));
return greaterThan(columnName, convertLiteral(rightChild(predicate)));
} else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
String columnName = SparkUtil.toColumnName(rightChild(predicate));
return lessThan(columnName, convertLiteral(leftChild(predicate)));
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null ? greaterThan(term, convertLiteral(rightChild(predicate))) : null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null ? lessThan(term, convertLiteral(leftChild(predicate))) : null;
} else {
return null;
}

case GT_EQ:
if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
String columnName = SparkUtil.toColumnName(leftChild(predicate));
return greaterThanOrEqual(columnName, convertLiteral(rightChild(predicate)));
} else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
String columnName = SparkUtil.toColumnName(rightChild(predicate));
return lessThanOrEqual(columnName, convertLiteral(leftChild(predicate)));
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null
? greaterThanOrEqual(term, convertLiteral(rightChild(predicate)))
: null;
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null
? lessThanOrEqual(term, convertLiteral(leftChild(predicate)))
: null;
} else {
return null;
}
Expand Down Expand Up @@ -191,13 +233,17 @@ public static Expression convert(Predicate predicate) {

case IN:
if (isSupportedInPredicate(predicate)) {
return in(
SparkUtil.toColumnName(childAtIndex(predicate, 0)),
Arrays.stream(predicate.children())
.skip(1)
.map(val -> convertLiteral(((Literal<?>) val)))
.filter(Objects::nonNull)
.collect(Collectors.toList()));
UnboundTerm<Object> term = toTerm(childAtIndex(predicate, 0));

return term != null
? in(
term,
Arrays.stream(predicate.children())
.skip(1)
.map(val -> convertLiteral(((Literal<?>) val)))
.filter(Objects::nonNull)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null is filtered out, so it will not fail or return null when null is in the value list.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't correct. If there is a null here then at least one literal could not be converted right? That means we can't convert the entire expression.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looking into this more I don't think this is a bug introduced by this PR, but a null should not be ignored. Any null value must return null for the entire IN expression. Any IN or NOT IN expression with a null in the set MUST be handled by Spark.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the operations same as SparkFilters. I think the original comments are here: #749 (comment). Should we change it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is absolutely a bug. Iceberg should never accept an IN or NOT IN predicate containing a null value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do a follow-up for this.

.collect(Collectors.toList()))
: null;
} else {
return null;
}
Expand All @@ -206,18 +252,23 @@ public static Expression convert(Predicate predicate) {
Not notPredicate = (Not) predicate;
Predicate childPredicate = notPredicate.child();
if (childPredicate.name().equals(IN) && isSupportedInPredicate(childPredicate)) {
UnboundTerm<Object> term = toTerm(childAtIndex(childPredicate, 0));
if (term == null) {
return null;
}

// infer an extra notNull predicate for Spark NOT IN filters
// as Iceberg expressions don't follow the 3-value SQL boolean logic
// col NOT IN (1, 2) in Spark is equal to notNull(col) && notIn(col, 1, 2) in Iceberg
Expression notIn =
notIn(
SparkUtil.toColumnName(childAtIndex(childPredicate, 0)),
term,
Arrays.stream(childPredicate.children())
.skip(1)
.map(val -> convertLiteral(((Literal<?>) val)))
.filter(Objects::nonNull)
.collect(Collectors.toList()));
return and(notNull(SparkUtil.toColumnName(childAtIndex(childPredicate, 0))), notIn);
return and(notNull(term), notIn);
} else if (hasNoInFilter(childPredicate)) {
Expression child = convert(childPredicate);
if (child != null) {
Expand Down Expand Up @@ -258,15 +309,13 @@ public static Expression convert(Predicate predicate) {
}

private static Pair<UnboundTerm<Object>, Object> predicateChildren(Predicate predicate) {
if (isRef(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = ref(SparkUtil.toColumnName(leftChild(predicate)));
Object value = convertLiteral(rightChild(predicate));
return Pair.of(term, value);
if (canConvertToTerm(leftChild(predicate)) && isLiteral(rightChild(predicate))) {
UnboundTerm<Object> term = toTerm(leftChild(predicate));
return term != null ? Pair.of(term, convertLiteral(rightChild(predicate))) : null;

} else if (isRef(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = ref(SparkUtil.toColumnName(rightChild(predicate)));
Object value = convertLiteral(leftChild(predicate));
return Pair.of(term, value);
} else if (canConvertToTerm(rightChild(predicate)) && isLiteral(leftChild(predicate))) {
UnboundTerm<Object> term = toTerm(rightChild(predicate));
return term != null ? Pair.of(term, convertLiteral(leftChild(predicate))) : null;

} else {
return null;
Expand Down Expand Up @@ -302,10 +351,26 @@ private static <T> T childAtIndex(Predicate predicate, int index) {
return (T) predicate.children()[index];
}

private static boolean canConvertToTerm(
org.apache.spark.sql.connector.expressions.Expression expr) {
return isRef(expr) || isSystemFunc(expr);
}

private static boolean isRef(org.apache.spark.sql.connector.expressions.Expression expr) {
return expr instanceof NamedReference;
}

private static boolean isSystemFunc(org.apache.spark.sql.connector.expressions.Expression expr) {
if (expr instanceof UserDefinedScalarFunc) {
UserDefinedScalarFunc udf = (UserDefinedScalarFunc) expr;
return udf.canonicalName().startsWith("iceberg")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this string check? I feel like all of the functions should be listed in the list on 369?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be possible some other UDF functions like 'catalog.other.years'.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that this is using the UDF's canonicalName but the other check uses the function's name. Those can differ: BucketFunction.name() return bucket, what the user would call, but the canonical function name identifies the exact bound function no matter how it is loaded so BucketInt.canonicalName() returns iceberg.bucket(int).

If we want to limit to just the Iceberg-defined functions, then this is necessary. It may be better to have a set of supported functions that uses just the canonicalName.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to have a set of supported functions that uses just the canonicalName.

There are some canonicalName that are not constants, for example:

    @Override
    public String canonicalName() {
      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
    }

@rdblue rdblue Jul 14, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that's not a constant, there are a limited number of values that sqlType.catalogString() might return. I think it would be better to enumerate them, rather than just looking for iceberg and mapping the name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to do that, however TruncateDecimal needs precision and scale which are runtime values.

    @Override
    public String canonicalName() {
      return String.format("iceberg.truncate(decimal(%d,%d))", precision, scale);
    }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can similarly enumerate them, but this isn't a big deal.

&& SUPPORTED_FUNCTIONS.contains(udf.name())
&& Arrays.stream(udf.children()).allMatch(child -> isLiteral(child) || isRef(child));
}

return false;
}

private static boolean isLiteral(org.apache.spark.sql.connector.expressions.Expression expr) {
return expr instanceof Literal;
}
Expand Down Expand Up @@ -360,10 +425,57 @@ private static boolean hasNoInFilter(Predicate predicate) {
}

private static boolean isSupportedInPredicate(Predicate predicate) {
if (!isRef(childAtIndex(predicate, 0))) {
if (!canConvertToTerm(childAtIndex(predicate, 0))) {
return false;
} else {
return Arrays.stream(predicate.children()).skip(1).allMatch(SparkV2Filters::isLiteral);
}
}

/** Should be called after {@link #canConvertToTerm} passed */
private static <T> UnboundTerm<Object> toTerm(T input) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be really nice if Spark passed the BoundFunction in through the expression!

if (input instanceof NamedReference) {
return Expressions.ref(SparkUtil.toColumnName((NamedReference) input));
} else if (input instanceof UserDefinedScalarFunc) {
return udfToTerm((UserDefinedScalarFunc) input);
} else {
return null;
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static UnboundTerm<Object> udfToTerm(UserDefinedScalarFunc udf) {
org.apache.spark.sql.connector.expressions.Expression[] children = udf.children();
String udfName = udf.name().toLowerCase(Locale.ROOT);
if (children.length == 1) {
org.apache.spark.sql.connector.expressions.Expression child = children[0];
if (isRef(child)) {
Comment thread
rdblue marked this conversation as resolved.
String column = SparkUtil.toColumnName((NamedReference) child);
switch (udfName) {
case "years":
return year(column);
case "months":
return month(column);
case "days":
return day(column);
case "hours":
return hour(column);
}
}
} else if (children.length == 2) {
if (isLiteral(children[0]) && isRef(children[1])) {
String column = SparkUtil.toColumnName((NamedReference) children[1]);
switch (udfName) {
case "bucket":
int numBuckets = (Integer) convertLiteral((Literal<?>) children[0]);
return bucket(column, numBuckets);
case "truncate":
int width = (Integer) convertLiteral((Literal<?>) children[0]);
return truncate(column, width);
}
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
implements SupportsRuntimeFiltering {
implements SupportsRuntimeV2Filtering {

private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class);

Expand Down Expand Up @@ -119,8 +119,8 @@ public NamedReference[] filterAttributes() {
}

@Override
public void filter(Filter[] filters) {
Expression runtimeFilterExpr = convertRuntimeFilters(filters);
public void filter(Predicate[] predicates) {
Expression runtimeFilterExpr = convertRuntimeFilters(predicates);

if (runtimeFilterExpr != Expressions.alwaysTrue()) {
Map<Integer, Evaluator> evaluatorsBySpecId = Maps.newHashMap();
Expand Down Expand Up @@ -160,11 +160,11 @@ public void filter(Filter[] filters) {

// at this moment, Spark can only pass IN filters for a single attribute
// if there are multiple filter attributes, Spark will pass two separate IN filters
private Expression convertRuntimeFilters(Filter[] filters) {
private Expression convertRuntimeFilters(Predicate[] predicates) {
Expression runtimeFilterExpr = Expressions.alwaysTrue();

for (Filter filter : filters) {
Expression expr = SparkFilters.convert(filter);
for (Predicate predicate : predicates) {
Expression expr = SparkV2Filters.convert(predicate);
if (expr != null) {
try {
Binder.bind(expectedSchema().asStruct(), expr, caseSensitive());
Expand All @@ -173,7 +173,7 @@ private Expression convertRuntimeFilters(Filter[] filters) {
LOG.warn("Failed to bind {} to expected schema, skipping runtime filter", expr, e);
}
} else {
LOG.warn("Unsupported runtime filter {}", filter);
LOG.warn("Unsupported runtime filter {}", predicate);
}
}

Expand Down
Loading