diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 067abe8a6e41..49cef328e9ca 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -40,4 +40,10 @@ private FlinkConfigOptions() { .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator."); + + public static final ConfigOption TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO = + ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") + .booleanType() + .noDefaultValue() + .withDescription("Expose split host information to use Flink's locality aware split assigner."); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index a4cbab5c37e4..1027b5bf8d1c 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; @@ -83,7 +84,9 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException @Override public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); + return context.exposeLocality() ? + new LocatableInputSplitAssigner(inputSplits) : + new DefaultInputSplitAssigner(inputSplits); } @Override diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java index b59574f585ec..5bb85fe7162a 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java @@ -19,29 +19,21 @@ package org.apache.iceberg.flink.source; -import org.apache.flink.core.io.InputSplit; +import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -/** - * TODO Implement {@link LocatableInputSplit}. - */ -public class FlinkInputSplit implements InputSplit { +public class FlinkInputSplit extends LocatableInputSplit { - private final int splitNumber; private final CombinedScanTask task; - FlinkInputSplit(int splitNumber, CombinedScanTask task) { - this.splitNumber = splitNumber; + FlinkInputSplit(int splitNumber, CombinedScanTask task, @Nullable String[] hostnames) { + super(splitNumber, hostnames); this.task = task; } - @Override - public int getSplitNumber() { - return splitNumber; - } - CombinedScanTask getTask() { return task; } @@ -49,8 +41,9 @@ CombinedScanTask getTask() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("splitNumber", splitNumber) + .add("splitNumber", getSplitNumber()) .add("task", task) + .add("hosts", Arrays.toString(getHostnames())) .toString(); } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index a3263d284c0c..df8009ff553d 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -31,6 +32,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -40,10 +42,16 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FlinkSource { + private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class); + private FlinkSource() { } @@ -70,12 +78,15 @@ public static Builder forRowData() { * Source builder to build {@link DataStream}. */ public static class Builder { + private static final Set FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs"); + private StreamExecutionEnvironment env; private Table table; private TableLoader tableLoader; private TableSchema projectedSchema; private ReadableConfig readableConfig = new Configuration(); private final ScanContext.Builder contextBuilder = ScanContext.builder(); + private Boolean exposeLocality; public Builder tableLoader(TableLoader newLoader) { this.tableLoader = newLoader; @@ -157,6 +168,11 @@ public Builder streaming(boolean streaming) { return this; } + public Builder exposeLocality(boolean newExposeLocality) { + this.exposeLocality = newExposeLocality; + return this; + } + public Builder nameMapping(String nameMapping) { contextBuilder.nameMapping(nameMapping); return this; @@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() { } else { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); } + contextBuilder.exposeLocality(localityEnabled()); return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); } @@ -225,7 +242,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) { if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { int maxInferParallelism = readableConfig.get(FlinkConfigOptions .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); - Preconditions.checkState(maxInferParallelism >= 1, + Preconditions.checkState( + maxInferParallelism >= 1, FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); int splitNum; try { @@ -247,6 +265,29 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) { parallelism = Math.max(1, parallelism); return parallelism; } + + private boolean localityEnabled() { + Boolean localityEnabled = + this.exposeLocality != null ? this.exposeLocality : + readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO); + + if (localityEnabled != null && !localityEnabled) { + return false; + } + + FileIO fileIO = table.io(); + if (fileIO instanceof HadoopFileIO) { + HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO; + try { + String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme(); + return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme); + } catch (IOException e) { + LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e); + } + } + + return false; + } } public static boolean isBounded(Map properties) { diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index e0001146299e..213f9c95a2dd 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -29,8 +29,11 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; @Internal public class FlinkSplitPlanner { @@ -41,9 +44,19 @@ static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) { try (CloseableIterable tasksIterable = planTasks(table, context)) { List tasks = Lists.newArrayList(tasksIterable); FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; - for (int i = 0; i < tasks.size(); i++) { - splits[i] = new FlinkInputSplit(i, tasks.get(i)); - } + boolean exposeLocality = context.exposeLocality(); + + Tasks.range(tasks.size()) + .stopOnFailure() + .executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null) + .run(index -> { + CombinedScanTask task = tasks.get(index); + String[] hostnames = null; + if (exposeLocality) { + hostnames = Util.blockLocations(table.io(), task); + } + splits[index] = new FlinkInputSplit(index, task, hostnames); + }); return splits; } catch (IOException e) { throw new UncheckedIOException("Failed to process tasks iterable", e); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index d290a6478f90..25b3bd04a34c 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -72,6 +72,7 @@ class ScanContext implements Serializable { ConfigOptions.key("include-column-stats").booleanType().defaultValue(false); private final boolean caseSensitive; + private final boolean exposeLocality; private final Long snapshotId; private final Long startSnapshotId; private final Long endSnapshotId; @@ -90,8 +91,8 @@ class ScanContext implements Serializable { private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, - boolean isStreaming, Duration monitorInterval, String nameMapping, - Schema schema, List filters, long limit, boolean includeColumnStats) { + boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema, + List filters, long limit, boolean includeColumnStats, boolean exposeLocality) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; this.startSnapshotId = startSnapshotId; @@ -108,6 +109,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.filters = filters; this.limit = limit; this.includeColumnStats = includeColumnStats; + this.exposeLocality = exposeLocality; } boolean caseSensitive() { @@ -170,6 +172,10 @@ boolean includeColumnStats() { return includeColumnStats; } + boolean exposeLocality() { + return exposeLocality; + } + ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) @@ -186,6 +192,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .project(schema) .filters(filters) .limit(limit) + .exposeLocality(exposeLocality) .includeColumnStats(includeColumnStats) .build(); } @@ -207,6 +214,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .exposeLocality(exposeLocality) .build(); } @@ -230,6 +238,7 @@ static class Builder { private List filters; private long limit = -1L; private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue(); + private boolean exposeLocality; private Builder() { } @@ -309,6 +318,11 @@ Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } + Builder exposeLocality(boolean newExposeLocality) { + this.exposeLocality = newExposeLocality; + return this; + } + Builder fromProperties(Map properties) { Configuration config = new Configuration(); properties.forEach(config::setString); @@ -331,7 +345,7 @@ public ScanContext build() { return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, - filters, limit, includeColumnStats); + filters, limit, includeColumnStats, exposeLocality); } } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java index 9af1b7c65331..b4f463196dc5 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -33,6 +33,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; @@ -42,6 +43,7 @@ import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; @@ -178,6 +180,39 @@ public void testInferedParallelism() throws IOException { Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); } + @Test + public void testExposeLocality() throws Exception { + Table table = + catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC); + + TableLoader tableLoader = TableLoader.fromHadoopTable(table.location()); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L); + expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20")); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + helper.appendToTable(dataFile); + + // test sql api + Configuration tableConf = getTableEnv().getConfig().getConfiguration(); + tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false); + + List results = sql("select * from t"); + org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); + + // test table api + tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true); + FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(tableLoader).table(table); + + Boolean localityEnabled = + DynMethods.builder("localityEnabled").hiddenImpl(builder.getClass()).build().invoke(builder); + // When running with CI or local, `localityEnabled` will be false even if this configuration is enabled + Assert.assertFalse("Expose split locality info should be false.", localityEnabled); + + results = run(builder, Maps.newHashMap(), "where dt='2020-03-20'", "*"); + org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); + } + private List sql(String query, Object... args) { TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); try (CloseableIterator iter = tableResult.collect()) {