Skip to content

Flink: Support inspecting table#6222

Merged
stevenzwu merged 6 commits into
apache:masterfrom
hililiwei:Inspecting
Jan 13, 2023
Merged

Flink: Support inspecting table#6222
stevenzwu merged 6 commits into
apache:masterfrom
hililiwei:Inspecting

Conversation

@hililiwei

@hililiwei hililiwei commented Nov 19, 2022

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Support inspecting table in flink sql.

select * from {tableName}${metadataTableName}

The '$' syntax borrows from Flink-Table-Store, under the Flink umbrella.

List

  • History

To show table history:

SELECT * FROM prod.db.table$history;
  • Metadata Log Entries

To show table metadata log entries:

SELECT * from prod.db.table$metadata_log_entries;
  • Snapshots

To show the valid snapshots for a table:

SELECT * FROM prod.db.table$snapshots;
  • Files

To show a table's current data files:

SELECT * FROM prod.db.table$files;
  • Manifests

To show a table's current file manifests:

SELECT * FROM prod.db.table$manifests;
  • Partitions

To show a table's current partitions:

SELECT * FROM prod.db.table$partitions;
  • All Metadata Tables

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

    • All Data Files

To show all of the table's data files and each file's metadata:

SELECT * FROM prod.db.table$all_data_files;
    • All Manifests

To show all of the table's manifest files:

SELECT * FROM prod.db.table$all_manifests;
  • References

To show a table's known snapshot references:

SELECT * FROM prod.db.table$refs;

Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java Outdated
@hililiwei

Copy link
Copy Markdown
Contributor Author

@stevenzwu @openinx @rdblue @Fokko could you please take a look at it when you get a chance? thx.

@Fokko Fokko requested a review from stevenzwu November 23, 2022 09:52
@hililiwei

Copy link
Copy Markdown
Contributor Author

also cc @pvary, thx.

Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java Outdated
@pvary

pvary commented Dec 2, 2022

Copy link
Copy Markdown
Contributor

@hililiwei: Hive/Spark/Impala(?) uses SELECT * FROM prod.db.table.history like naming convention for accessing metadata tables. See: https://iceberg.apache.org/docs/latest/spark-queries/#history

Do we want to introduce a different naming convention for Flink? How widespread is the usage of the SELECT * FROM prod.db.table$history like SQL?

@hililiwei

hililiwei commented Dec 2, 2022

Copy link
Copy Markdown
Contributor Author

Do we want to introduce a different naming convention for Flink? How widespread is the usage of the SELECT * FROM prod.db.table$history like SQL?

@pvary, thank you for your feedback.
I personally like SELECT * FROM prod.db.table.history too, but unfortunately, flink does not support this syntax, and it always
discards .history. I tried for a long time and couldn't get it to work properly. Eventually, I had to choose the same way as flink-table-store.

@pvary

pvary commented Dec 2, 2022

Copy link
Copy Markdown
Contributor

Do we want to introduce a different naming convention for Flink? How widespread is the usage of the SELECT * FROM prod.db.table$history like SQL?

@pvary, thank you for your feedback. I personally like SELECT * FROM prod.db.table.history too, but unfortunately, flink does not support this syntax, and it always discards .history. I tried for a long time and couldn't get it to work properly. Eventually, I had to choose the same way as flink-table-store.

Thanks @hililiwei!

Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java Outdated
Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java Outdated
Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java Outdated
Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java Outdated
@pvary pvary closed this Dec 3, 2022
@pvary pvary reopened this Dec 3, 2022
@pvary

pvary commented Dec 3, 2022

Copy link
Copy Markdown
Contributor

Missclicked 😢

What I was trying to comment:
Sadly I don't have enough time to properly review the PR, but I remember that we had issues with the partition evolution in the past when @szlta tested the Hive metadata tables. IIRC the issues surfaced when we changed from (a, b) partitioning to (a, c).
See the relevant PRs: https://github.com/apache/iceberg/pulls?q=is%3Apr+szlta+partition

The issues are handled on Core level, but it might worth to add some checks for edge cases too. Like:

  • Removed columns
  • Readded columns
  • Partition evolution
  • Schema evolution

@hililiwei hililiwei force-pushed the Inspecting branch 2 times, most recently from 40aeedd to ad1f420 Compare December 5, 2022 15:00
Comment thread docs/flink-getting-started.md Outdated
@stevenzwu

Copy link
Copy Markdown
Contributor

@hililiwei we should add comprehensive unit test for StructRowData.

I have some internal DataGenerators for unit test code with very comprehensive coverage all field types (including complex nested types). Maybe I will submit a separate PR for that, which will cover Flink RowData and Iceberg GenericRecord. You can expand it with support for Iceberg StructLike. With that, we can write unit test/assertions compare the actual and expected.

// make a defensive copy to ensure entries do not change
List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(map.entrySet());

ArrayData keyArray =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I got it that we need run the conversion recursively (e.g. for StringData key or RowData value). I like to propose a diff structure for this class.

We can repurpose collectionToArrayData to a convertFromStructToRowData(Type type, Object value) method. If type is most primitive, it should be simple pass-thru. for other types (like timestamp, struct, map, array) there are some conversions to Flink types (like TimestampData, RowData, MapData, ArrayData).

Then for the map, we don't have to convert to key and value array. I believe the reason you did it is to reuse the collectionToArrayData.

Comment thread flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java Outdated
@hililiwei hililiwei force-pushed the Inspecting branch 2 times, most recently from f701467 to 6f40b74 Compare December 6, 2022 07:41
@hililiwei

Copy link
Copy Markdown
Contributor Author

@hililiwei we should add comprehensive unit test for StructRowData.

I have some internal DataGenerators for unit test code with very comprehensive coverage all field types (including complex nested types). Maybe I will submit a separate PR for that, which will cover Flink RowData and Iceberg GenericRecord. You can expand it with support for Iceberg StructLike. With that, we can write unit test/assertions compare the actual and expected.

That would be great. I'm also thinking of adding UT for it. So i'll do it based on your code.

@hililiwei

Copy link
Copy Markdown
Contributor Author

UT failed because of #5376. I'm trying to fix it.

@hililiwei hililiwei force-pushed the Inspecting branch 3 times, most recently from d62a614 to 3e9267a Compare December 7, 2022 08:47
@hililiwei hililiwei force-pushed the Inspecting branch 6 times, most recently from f4a168c to 8dd0688 Compare December 20, 2022 13:31
@hililiwei

Copy link
Copy Markdown
Contributor Author

Added test case for StructRowData. @stevenzwu please take a look, thx.

Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java Outdated
Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java Outdated
@hililiwei hililiwei force-pushed the Inspecting branch 2 times, most recently from 6fe3594 to d566549 Compare December 21, 2022 06:16
@hililiwei

Copy link
Copy Markdown
Contributor Author

like to get more feedback for adding a new public method in the api module. cc @rdblue @aokolnychyi @RussellSpitzer @szehon-ho

Here is the context why we need this. The converter in FlinkSchemaUtil creates Iceberg Schema without doc attributes. Hence @hililiwei implemented the util class and method to carry over the doc attribute from docSourceSchema to schema (converted from Flink TableSchema).

  public static Schema convert(TableSchema schema) {
  }

I suggested moving it into TypeUtil class in api module as it is very similar to the reassignIds method in the TypeUtil class.

cc @rdblue @aokolnychyi @RussellSpitzer @szehon-ho

@hililiwei hililiwei force-pushed the Inspecting branch 2 times, most recently from 7b08642 to 09dca6b Compare December 21, 2022 06:59
Comment thread core/src/main/java/org/apache/iceberg/BaseFilesTable.java Outdated
Comment thread flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java Outdated
@stevenzwu stevenzwu merged commit 6ab1986 into apache:master Jan 13, 2023
@stevenzwu

Copy link
Copy Markdown
Contributor

thanks @hililiwei for contributing this major feature

@hililiwei

Copy link
Copy Markdown
Contributor Author

thanks @stevenzwu @pvary @szehon-ho

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants