Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ private FlinkConfigOptions() {
.intType()
.defaultValue(100)
.withDescription("Sets max infer parallelism for source operator.");

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
.booleanType()
.noDefaultValue()
Comment thread
hililiwei marked this conversation as resolved.
.withDescription("Expose split host information to use Flink's locality aware split assigner.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -83,7 +84,9 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException

@Override
public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
return context.exposeLocality() ?
new LocatableInputSplitAssigner(inputSplits) :
new DefaultInputSplitAssigner(inputSplits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,31 @@

package org.apache.iceberg.flink.source;

import org.apache.flink.core.io.InputSplit;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* TODO Implement {@link LocatableInputSplit}.
*/
public class FlinkInputSplit implements InputSplit {
public class FlinkInputSplit extends LocatableInputSplit {

private final int splitNumber;
private final CombinedScanTask task;

FlinkInputSplit(int splitNumber, CombinedScanTask task) {
this.splitNumber = splitNumber;
FlinkInputSplit(int splitNumber, CombinedScanTask task, @Nullable String[] hostnames) {
super(splitNumber, hostnames);
this.task = task;
}

@Override
public int getSplitNumber() {
return splitNumber;
}

CombinedScanTask getTask() {
return task;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("splitNumber", splitNumber)
.add("splitNumber", getSplitNumber())
.add("task", task)
.add("hosts", Arrays.toString(getHostnames()))
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
Expand All @@ -40,10 +42,16 @@
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSource {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);

private FlinkSource() {
}

Expand All @@ -70,12 +78,15 @@ public static Builder forRowData() {
* Source builder to build {@link DataStream}.
*/
public static class Builder {
private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");

private StreamExecutionEnvironment env;
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private ReadableConfig readableConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private Boolean exposeLocality;

public Builder tableLoader(TableLoader newLoader) {
this.tableLoader = newLoader;
Expand Down Expand Up @@ -157,6 +168,11 @@ public Builder streaming(boolean streaming) {
return this;
}

public Builder exposeLocality(boolean newExposeLocality) {
this.exposeLocality = newExposeLocality;
return this;
}

public Builder nameMapping(String nameMapping) {
contextBuilder.nameMapping(nameMapping);
return this;
Expand Down Expand Up @@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() {
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
}
contextBuilder.exposeLocality(localityEnabled());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to override exposeLocality in this builder so that you can set it differently for different sources. Keeping a boolean in this builder and passing that as an override for the environment property in localityEnabled() should work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review whether meet expectations, thx.

@rdblue rdblue Jan 18, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the exposeLocality variable used? If an explicit value is passed to this builder, it should be passed into localityEnabled() so that method can use the setting, but only if the underlying file system is hdfs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean localityEnabled() {
Boolean localityConfig =
this.exposeLocality != null ? this.exposeLocality :
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);

It is used here to determine whether to use flink config.


return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}
Expand Down Expand Up @@ -225,7 +242,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
int maxInferParallelism = readableConfig.get(FlinkConfigOptions
.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkState(maxInferParallelism >= 1,
Preconditions.checkState(
maxInferParallelism >= 1,
Comment thread
hililiwei marked this conversation as resolved.
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
int splitNum;
try {
Expand All @@ -247,6 +265,29 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) {
parallelism = Math.max(1, parallelism);
return parallelism;
}

private boolean localityEnabled() {
Boolean localityEnabled =
this.exposeLocality != null ? this.exposeLocality :
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);

if (localityEnabled != null && !localityEnabled) {
return false;
}

FileIO fileIO = table.io();
Comment thread
rdblue marked this conversation as resolved.
if (fileIO instanceof HadoopFileIO) {
HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO;
try {
String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
} catch (IOException e) {
LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e);
}
}

return false;
Comment thread
hililiwei marked this conversation as resolved.
}
}

public static boolean isBounded(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

@Internal
public class FlinkSplitPlanner {
Expand All @@ -41,9 +44,19 @@ static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) {
try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) {
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for (int i = 0; i < tasks.size(); i++) {
splits[i] = new FlinkInputSplit(i, tasks.get(i));
}
boolean exposeLocality = context.exposeLocality();

Tasks.range(tasks.size())
.stopOnFailure()
.executeWith(exposeLocality ? ThreadPools.getWorkerPool() : null)
.run(index -> {
CombinedScanTask task = tasks.get(index);
String[] hostnames = null;
if (exposeLocality) {
hostnames = Util.blockLocations(table.io(), task);
}
splits[index] = new FlinkInputSplit(index, task, hostnames);
});
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to process tasks iterable", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ScanContext implements Serializable {
ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);

private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
private final Long startSnapshotId;
private final Long endSnapshotId;
Expand All @@ -90,8 +91,8 @@ class ScanContext implements Serializable {

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
Schema schema, List<Expression> filters, long limit, boolean includeColumnStats) {
boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema,
List<Expression> filters, long limit, boolean includeColumnStats, boolean exposeLocality) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
Expand All @@ -108,6 +109,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
this.exposeLocality = exposeLocality;
}

boolean caseSensitive() {
Expand Down Expand Up @@ -170,6 +172,10 @@ boolean includeColumnStats() {
return includeColumnStats;
}

boolean exposeLocality() {
return exposeLocality;
}

ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -186,6 +192,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.project(schema)
.filters(filters)
.limit(limit)
.exposeLocality(exposeLocality)
.includeColumnStats(includeColumnStats)
.build();
}
Expand All @@ -207,6 +214,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.exposeLocality(exposeLocality)
.build();
}

Expand All @@ -230,6 +238,7 @@ static class Builder {
private List<Expression> filters;
private long limit = -1L;
private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue();
private boolean exposeLocality;

private Builder() {
}
Expand Down Expand Up @@ -309,6 +318,11 @@ Builder includeColumnStats(boolean newIncludeColumnStats) {
return this;
}

Builder exposeLocality(boolean newExposeLocality) {
this.exposeLocality = newExposeLocality;
return this;
}

Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
Expand All @@ -331,7 +345,7 @@ public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit, includeColumnStats);
filters, limit, includeColumnStats, exposeLocality);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -178,6 +180,39 @@ public void testInferedParallelism() throws IOException {
Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism);
}

@Test
public void testExposeLocality() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only verifies data read. this doesn't really verify locality aware assignment. Ideally, we need 2 files stored in 2 hosts with HDFS and run a cluster of TMs on those two hosts. Then we can verify the assigned files/splits are from the same host. But I am not sure if this can be done in a unit test setup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, +1, ideally it is, but I haven't found a way to achieve it.So here only test whether it works properly when table.exec.iceberg.expose-split-locality-info is set to false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hililiwei How does Flink code base test this feature?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be tested by manually specifying the hostname. more refer: https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
Or try to test it by introducing miniDFS, but the project doesn't seem willing to introduce it.

@stevenzwu stevenzwu Jan 14, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for unit test the assigner, not an e2e test of the whole thing. Except for this lack of e2e test, PR overall looks good to me. Have you tested this in a hadoop cluster setup manually?

@hililiwei hililiwei Jan 17, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, during the test phase, I printed some logs to see if it was working properly, such as this one:
code:
2022-1-17-2
Logs:
2022-1-17

Table table =
catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC);

TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L);
expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20"));

GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords);
helper.appendToTable(dataFile);

// test sql api
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false);

List<Row> results = sql("select * from t");
org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conflict with org.apache.iceberg.TestHelpers.


// test table api
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true);
FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(tableLoader).table(table);

Boolean localityEnabled =
DynMethods.builder("localityEnabled").hiddenImpl(builder.getClass()).build().invoke(builder);
// When running with CI or local, `localityEnabled` will be false even if this configuration is enabled
Assert.assertFalse("Expose split locality info should be false.", localityEnabled);

results = run(builder, Maps.newHashMap(), "where dt='2020-03-20'", "*");
org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA);
}

private List<Row> sql(String query, Object... args) {
TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
try (CloseableIterator<Row> iter = tableResult.collect()) {
Expand Down