-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Flink: FlinkInputSplit extends LocatableInputSplit instread of InputSplit #3817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |||||||||
| import java.io.UncheckedIOException; | ||||||||||
| import java.util.List; | ||||||||||
| import java.util.Map; | ||||||||||
| import java.util.Set; | ||||||||||
| import org.apache.flink.api.common.typeinfo.TypeInformation; | ||||||||||
| import org.apache.flink.configuration.Configuration; | ||||||||||
| import org.apache.flink.configuration.ReadableConfig; | ||||||||||
|
|
@@ -31,6 +32,7 @@ | |||||||||
| import org.apache.flink.table.api.TableSchema; | ||||||||||
| import org.apache.flink.table.api.config.ExecutionConfigOptions; | ||||||||||
| import org.apache.flink.table.data.RowData; | ||||||||||
| import org.apache.hadoop.fs.Path; | ||||||||||
| import org.apache.iceberg.Schema; | ||||||||||
| import org.apache.iceberg.Table; | ||||||||||
| import org.apache.iceberg.TableScan; | ||||||||||
|
|
@@ -40,10 +42,16 @@ | |||||||||
| import org.apache.iceberg.flink.FlinkSchemaUtil; | ||||||||||
| import org.apache.iceberg.flink.TableLoader; | ||||||||||
| import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; | ||||||||||
| import org.apache.iceberg.hadoop.HadoopFileIO; | ||||||||||
| import org.apache.iceberg.io.FileIO; | ||||||||||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||||||||||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; | ||||||||||
| import org.slf4j.Logger; | ||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||
|
|
||||||||||
| public class FlinkSource { | ||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class); | ||||||||||
|
|
||||||||||
| private FlinkSource() { | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -70,12 +78,15 @@ public static Builder forRowData() { | |||||||||
| * Source builder to build {@link DataStream}. | ||||||||||
| */ | ||||||||||
| public static class Builder { | ||||||||||
| private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs"); | ||||||||||
|
|
||||||||||
| private StreamExecutionEnvironment env; | ||||||||||
| private Table table; | ||||||||||
| private TableLoader tableLoader; | ||||||||||
| private TableSchema projectedSchema; | ||||||||||
| private ReadableConfig readableConfig = new Configuration(); | ||||||||||
| private final ScanContext.Builder contextBuilder = ScanContext.builder(); | ||||||||||
| private Boolean exposeLocality; | ||||||||||
|
|
||||||||||
| public Builder tableLoader(TableLoader newLoader) { | ||||||||||
| this.tableLoader = newLoader; | ||||||||||
|
|
@@ -157,6 +168,11 @@ public Builder streaming(boolean streaming) { | |||||||||
| return this; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public Builder exposeLocality(boolean newExposeLocality) { | ||||||||||
| this.exposeLocality = newExposeLocality; | ||||||||||
| return this; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public Builder nameMapping(String nameMapping) { | ||||||||||
| contextBuilder.nameMapping(nameMapping); | ||||||||||
| return this; | ||||||||||
|
|
@@ -195,6 +211,7 @@ public FlinkInputFormat buildFormat() { | |||||||||
| } else { | ||||||||||
| contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); | ||||||||||
| } | ||||||||||
| contextBuilder.exposeLocality(localityEnabled()); | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be possible to override
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please review whether meet expectations, thx.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iceberg/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java Lines 269 to 272 in 52808f5
It is used here to determine whether to use flink config. |
||||||||||
|
|
||||||||||
| return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); | ||||||||||
| } | ||||||||||
|
|
@@ -225,7 +242,8 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) { | |||||||||
| if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { | ||||||||||
| int maxInferParallelism = readableConfig.get(FlinkConfigOptions | ||||||||||
| .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); | ||||||||||
| Preconditions.checkState(maxInferParallelism >= 1, | ||||||||||
| Preconditions.checkState( | ||||||||||
| maxInferParallelism >= 1, | ||||||||||
|
hililiwei marked this conversation as resolved.
|
||||||||||
| FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); | ||||||||||
| int splitNum; | ||||||||||
| try { | ||||||||||
|
|
@@ -247,6 +265,29 @@ int inferParallelism(FlinkInputFormat format, ScanContext context) { | |||||||||
| parallelism = Math.max(1, parallelism); | ||||||||||
| return parallelism; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private boolean localityEnabled() { | ||||||||||
| Boolean localityEnabled = | ||||||||||
| this.exposeLocality != null ? this.exposeLocality : | ||||||||||
| readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO); | ||||||||||
|
|
||||||||||
| if (localityEnabled != null && !localityEnabled) { | ||||||||||
| return false; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| FileIO fileIO = table.io(); | ||||||||||
|
rdblue marked this conversation as resolved.
|
||||||||||
| if (fileIO instanceof HadoopFileIO) { | ||||||||||
| HadoopFileIO hadoopFileIO = (HadoopFileIO) fileIO; | ||||||||||
| try { | ||||||||||
| String scheme = new Path(table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme(); | ||||||||||
| return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme); | ||||||||||
| } catch (IOException e) { | ||||||||||
| LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", table, e); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return false; | ||||||||||
|
hililiwei marked this conversation as resolved.
|
||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public static boolean isBounded(Map<String, String> properties) { | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TestHelpers; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.common.DynMethods; | ||
| import org.apache.iceberg.data.GenericAppenderHelper; | ||
| import org.apache.iceberg.data.RandomGenericData; | ||
| import org.apache.iceberg.data.Record; | ||
|
|
@@ -42,6 +43,7 @@ | |
| import org.apache.iceberg.flink.TableLoader; | ||
| import org.apache.iceberg.flink.TestFixtures; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
|
|
@@ -178,6 +180,39 @@ public void testInferedParallelism() throws IOException { | |
| Assert.assertEquals("Should produce the expected parallelism.", 1, parallelism); | ||
| } | ||
|
|
||
| @Test | ||
| public void testExposeLocality() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It only verifies data read. this doesn't really verify locality aware assignment. Ideally, we need 2 files stored in 2 hosts with HDFS and run a cluster of TMs on those two hosts. Then we can verify the assigned files/splits are from the same host. But I am not sure if this can be done in a unit test setup.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, +1, ideally it is, but I haven't found a way to achieve it.So here only test whether it works properly when
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hililiwei How does Flink code base test this feature?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to be tested by manually specifying the hostname. more refer: https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is for unit test the assigner, not an e2e test of the whole thing. Except for this lack of e2e test, PR overall looks good to me. Have you tested this in a hadoop cluster setup manually?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| Table table = | ||
| catalog.createTable(TableIdentifier.of("default", "t"), TestFixtures.SCHEMA, TestFixtures.SPEC); | ||
|
|
||
| TableLoader tableLoader = TableLoader.fromHadoopTable(table.location()); | ||
| List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 10, 0L); | ||
| expectedRecords.forEach(expectedRecord -> expectedRecord.set(2, "2020-03-20")); | ||
|
|
||
| GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); | ||
| DataFile dataFile = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); | ||
| helper.appendToTable(dataFile); | ||
|
|
||
| // test sql api | ||
| Configuration tableConf = getTableEnv().getConfig().getConfiguration(); | ||
| tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), false); | ||
|
|
||
| List<Row> results = sql("select * from t"); | ||
| org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Conflict with org.apache.iceberg.TestHelpers. |
||
|
|
||
| // test table api | ||
| tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO.key(), true); | ||
| FlinkSource.Builder builder = FlinkSource.forRowData().tableLoader(tableLoader).table(table); | ||
|
|
||
| Boolean localityEnabled = | ||
| DynMethods.builder("localityEnabled").hiddenImpl(builder.getClass()).build().invoke(builder); | ||
| // When running with CI or local, `localityEnabled` will be false even if this configuration is enabled | ||
| Assert.assertFalse("Expose split locality info should be false.", localityEnabled); | ||
|
|
||
| results = run(builder, Maps.newHashMap(), "where dt='2020-03-20'", "*"); | ||
| org.apache.iceberg.flink.TestHelpers.assertRecords(results, expectedRecords, TestFixtures.SCHEMA); | ||
| } | ||
|
|
||
| private List<Row> sql(String query, Object... args) { | ||
| TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); | ||
| try (CloseableIterator<Row> iter = tableResult.collect()) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.