Core: Add readable metrics columns to files metadata tables#5376
Conversation
c06a868 to
a6e3cbe
Compare
| Transforms.identity(field.type()) | ||
| .toHumanString(Conversions.fromByteBuffer(field.type(), value))); | ||
| } catch (Exception e) { // Ignore | ||
| return Optional.empty(); |
There was a problem hiding this comment.
This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).
Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.
223a3ad to
936e2ea
Compare
|
All Spark tests are updated/fixed now. Fyi @RussellSpitzer @aokolnychyi @rdblue if you guys have time to leave some feedback. There are new tests added but its a bit big to show 'Files Changed': TestMetadataTableMetricsColumns.java |
| return Optional.of( | ||
| Transforms.identity(field.type()) | ||
| .toHumanString(Conversions.fromByteBuffer(field.type(), value))); | ||
| } catch (Exception e) { // Ignore |
There was a problem hiding this comment.
Do you have examples when this throws an exception?
There was a problem hiding this comment.
Yea I tried a put a comment, but unfortunately got disassociated with this line after a rebase.
This happens in some cases, I found it in some case of importing external files to Iceberg table, ie TestIcebergSourceHadoopTables.testFilesTableWithSnapshotIdInheritance, where the I think columns are out of order of the original schema and the metrics are corrupt (underflow exception in this case).
Not sure if we should error out the files tables, in that case, I was leaning towards just returning null. User has original column to see why the error happened.
There was a problem hiding this comment.
Following up on this, this is a non-issue as the spark procedures set the flag: schema.name-mapping.default , just this test does not. Fixed the test.
|
It seems like a great idea to add readable metrics. It is hard to make sense of them otherwise. @szehon-ho, what do you think about adding a single map column, let's say called We can then easily access them via SQL. I am okay with individual columns too but it seems a bit cleaner to just have one. |
d1324e3 to
62eb36c
Compare
|
Let me check in a bit. |
|
Let me take a look today. |
aokolnychyi
left a comment
There was a problem hiding this comment.
Looks really close to me.
|
Let me take a look now. |
| return new Schema(joinedColumns); | ||
| } | ||
|
|
||
| public static Schema joinCommon(Schema left, Schema right) { |
There was a problem hiding this comment.
What about simply adapting the existing join method? Are there any scenarios where we want to skip the validation and simply add all columns (old logic)?
| return CloseableIterable.transform(files(projection), file -> (StructLike) file); | ||
| } else { | ||
| Schema fileProjection = TypeUtil.selectNot(projection, READABLE_METRICS_FIELD_IDS); | ||
| Schema minProjection = |
There was a problem hiding this comment.
I think this logic should be part of BaseFilesTableScan and BaseAllFilesTableScan.
Otherwise, our scans won't report the correct schema in Scan$schema().
There was a problem hiding this comment.
I think putting it there will break the scan right, as its not the projection the user requested.
Note, this is actually a bit subtle here. Because we are doing the join, (original projection + minimum metrics), the file's schema becomes
{any_projected_field_on_file} : {readable_metrics because its also projected} : {un-projected but required metrics fields}
So the ContentFileWithMetrics works because it will discard any of the "un-projected but required metrics fields", given they are outside the range it will read. For the remaining fields it uses the existing logic (delegate to file for the first n-1, and then get from MetricsStruct for nth field).
I mean, we could add a select method to GenericDataFile to modify its internal 'fromProjectionPos' map to conform back to the original projection (dropping the "un-projected but required metrics fields"). But it would mainly be for clarity, and not strictly needed.
|
Added additional test, looks it is working even when readable_metric column is selected before other columns (spark somehow calls the rows in their original order) |
|
|
||
| public static Schema join(Schema left, Schema right) { | ||
| List<Types.NestedField> joinedColumns = Lists.newArrayList(); | ||
| joinedColumns.addAll(left.columns()); |
There was a problem hiding this comment.
nit: This changes the original behavior, why not add a new function?
There was a problem hiding this comment.
@chenjunjiedada Yea that was my original version, and changed after comment of @aokolnychyi #5376 (comment)
There was a problem hiding this comment.
Technically this is changing a public API which previously would have allowed these combos. This is ... maybe ok since it's a utility method but we may end up breaking users of the function at runtime. That said I think Anton is right and any schema with multiple columns with the same ID would always be wrong.
|
Really nice PR, thanks @szehon-ho and @aokolnychyi for the effort! When can we merge this? I think it is ready and has been two months since the last review, which will lead to more conflicts if leave it. |
21c7205 to
0e68ae3
Compare
|
@RussellSpitzer addressed the comments, thanks! |
|
Actually hold on a second, looking at a small refactor to make it more generic to add a readable_metric definition in future |
…ion in READABLE_METRIC_COLS static array
|
@RussellSpitzer should be good now for another look when you get a chance, thanks! |
| Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class); | ||
| inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table"); | ||
|
|
||
| NameMapping mapping = MappingUtil.create(table.schema()); |
There was a problem hiding this comment.
Test used to write the wrong metrics to imported table, without these lines.
| public static Schema readableMetricsSchema(Schema dataTableSchema, Schema metadataTableSchema) { | ||
| List<Types.NestedField> fields = Lists.newArrayList(); | ||
| Map<Integer, String> idToName = dataTableSchema.idToName(); | ||
| AtomicInteger nextId = |
There was a problem hiding this comment.
I don't believe this has to be atomic, and metadataTableSchema should already have a method highestFieldId() which according to the doc includes nested fields
There was a problem hiding this comment.
If you prefer incrementAndGet() to ++nextId though I think using the Atomic just for readability is probably fine
There was a problem hiding this comment.
Ah AtomicInteger is because there is a lambda function in there (the map) and compiler complains :
Variable used in lambda expression should be final or effectively final
There was a problem hiding this comment.
Re: highestFieldId(), good to know! Done.
|
|
||
| Table filesTable = new FilesTable(table.ops(), table); | ||
| Types.StructType actual = filesTable.newScan().schema().select("readable_metrics").asStruct(); | ||
|
|
There was a problem hiding this comment.
Just to make this a little easier in the future you may just want to do something like
firstAssigned = (schema.highestId - 15)
Then do
1001 = firstAssigned +1; ....
not sure this really helps that much though
RussellSpitzer
left a comment
There was a problem hiding this comment.
One remaining nit on the "highestId" call. I think overall we probably should do a refactoring of our tests for the files table in Spark, they have been really brittle to changes for a long time and I think we can do better. I think that can wait though or maybe be a task for a newcomer who wants to understand metadata tables better.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Actually wasn't there a set of tests checking that the projection was working correctly? I'm not sure I see those tests anymore but maybe I've looked at this for too long?
|
Would love to see what is a good way to simplify it without breaking the checks. Currently compares every single field. |
|
Thanks @RussellSpitzer @aokolnychyi @chenjunjiedada for detailed reviews |
) (apache#861) Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
|
@szehon-ho @RussellSpitzer Is there any document about these readable metrics ? All these metrics are exposed using files metadata only ? |
@szehon-ho Actual column names are without 's' in the end |
Closes #4362
This adds following columns to all files tables:
These are then a map of column_name to value.