Parquet: Implement Variant metrics#12496
Conversation
| private final T lowerBound; | ||
| private final T upperBound; | ||
|
|
||
| public FieldMetrics(int id, long valueCount, long nullValueCount) { |
There was a problem hiding this comment.
These are just convenience constructors.
|
|
||
| private static <T> T visitVariant( | ||
| Types.VariantType variant, GroupType group, TypeWithSchemaVisitor<T> visitor) { | ||
| Types.VariantType variant, GroupType variantGroup, TypeWithSchemaVisitor<T> visitor) { |
There was a problem hiding this comment.
In order to call a visitor that has a different return type than the Parquet schema visitor (in this case, VariantMetrics instead of FieldMetrics), the Parquet type needs to be passed to the visit method. I think that not including this originally was an accident.
| if (writer != null) { | ||
| return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig); | ||
| return ParquetMetrics.metrics( | ||
| schema, parquetSchema, metricsConfig, writer.getFooter(), model.metrics()); |
There was a problem hiding this comment.
Schema is currently required because there is no annotation for variants in the Parquet schema.
f9dc85f to
77021c7
Compare
| Map<Integer, Long> columnSizes = Maps.newHashMap(); | ||
| Multimap<ColumnPath, ColumnChunkMetaData> columns = | ||
| Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); | ||
| for (BlockMetaData block : metadata.getBlocks()) { |
There was a problem hiding this comment.
This organizes the metadata for each column so that it is available when processing that leaf column in the builder.
| Map<Integer, FieldMetrics<?>> metricsById = | ||
| fields.collect(Collectors.toMap(FieldMetrics::id, Function.identity())); | ||
|
|
||
| Iterable<FieldMetrics<ByteBuffer>> results = |
There was a problem hiding this comment.
Metrics are returned from the builder as an iterable of FieldMetrics, one for each field. The lower and upper bounds are already serialized since the field type is known in the visitor.
| Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap(); | ||
| Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap(); | ||
|
|
||
| for (FieldMetrics<ByteBuffer> metrics : results) { |
There was a problem hiding this comment.
This translates the metrics for each field into the top-level maps. (When we modify how we track metrics/stats we would probably change this.)
|
|
||
| int length = truncateLength(mode); | ||
|
|
||
| FieldMetrics<ByteBuffer> metrics = metricsFromFieldMetrics(fieldId, iPrimitive, length); |
There was a problem hiding this comment.
The main value of doing this in the visitor is to avoid processing the column metadata if there is already a FieldMetrics object available.
| T lowerBound = null; | ||
| T upperBound = null; | ||
|
|
||
| for (ColumnChunkMetaData column : columns.get(path)) { |
There was a problem hiding this comment.
This uses the column path so the same columns map can be used for fields with IDs (normal Iceberg fields) and for fields without IDs (shredded Variant fields).
| } | ||
|
|
||
| return ImmutableList.of( | ||
| new FieldMetrics<>( |
There was a problem hiding this comment.
Currently, all fields are kept and truncated to 16. This seems reasonable, but we could also apply the truncation length to all sub-fields, or use it for the number of fields to keep bounds for?
There was a problem hiding this comment.
I feel like truncate 16 for all the shredded fields is the most intuitive out of all the options. That said I was wondering, would it make more sense to pass the MetricsMode to MetricsVariantVisitor that way if it's a truncate, we can use the user set length property as opposed to ignoring it? I don't think this is a hard blocker though, since the default is already truncate(16) and it's probably a sane truncation for the shredded fields as well.
There was a problem hiding this comment.
Right now, I just wanted to get the basics working so I went with the simplest implementation. We should definitely revisit this and discuss how to configure metrics collection.
That said, I think the mode isn't the right problem to solve. For the mode, we don't keep counts other than the top-level value and null count for the variant itself. That leaves only how to handle lower and upper bounds, where we know that truncate is the right config and 16 is a reasonable length default. At that point, the only question is whether we want to hard-code it, pass through the mode for the variant column to use a configurable length for all sub-fields, or if we want to have a truncate length for each field individually.
The bigger problem is which fields to collect metrics for. Restricting the fields to just the ones that are shredded is a good heuristic because we don't expect types to be uniform for other fields, and a value of another type will prevent the field's bounds from being stored. Even then, there could be quite a few fields and that will make the lower and upper bound payloads large. We may want to further restrict the number of fields, but for now I think the reasonable path forward is to use the shredded fields. Then we can see if we want to change it once we tackle the problem of how we determine the fields to shred.
77021c7 to
77d811c
Compare
| @@ -1172,6 +1172,11 @@ acceptedBreaks: | |||
| \ java.util.function.Consumer<T>)" | |||
| justification: "Removing deprecated code" | |||
There was a problem hiding this comment.
To help me understand the scope of this PR: we will collect the metrics for Variant subcolumns but the storing them as Variant to Avro files and retrieving from Avro files will be separate since that depends on Avro Variant read/write implementation. And also the pruning with such metrics will be separate as well.
Is that correct?
There was a problem hiding this comment.
Yes, this PR translates Parquet metrics for shredded fields to a Variant object that is accepted by InclusiveMetricsEvaluator. For Avro, we would need to come up with a strategy for producing metrics. I'm not sure that it makes sense to because there are no shredded fields.
There was a problem hiding this comment.
Got it. I'm referring to write the metrics (in Variant object) into Iceberg manifest file (in Avro file), not about collecting metrics for Avro - we don't shred fields for Avro files so it doesn't make sense to to produce metrics.
a8043fa to
a041643
Compare
| private static final Map<Types.NestedField, Integer> FIELDS_WITH_NAN_COUNT_TO_ID = | ||
| ImmutableMap.of( | ||
| FLOAT_FIELD, 3, DOUBLE_FIELD, 4, FLOAT_LIST, 10, MAP_FIELD_1, 18, MAP_FIELD_2, 22); | ||
| ImmutableMap.of(FLOAT_FIELD, 3, DOUBLE_FIELD, 4); |
There was a problem hiding this comment.
Like ORC, Parquet will no longer produce metrics for repeated fields, like those in map keys or values or in list elements.
| } | ||
| } | ||
|
|
||
| private Type toParquetSchema(VariantValue value) { |
There was a problem hiding this comment.
Moved into ParquetVariantUtil.
0618811 to
b337972
Compare
b004fb2 to
396c4b7
Compare
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Still going through tests, but took look through the code changes and had some questions/comments.
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
I've checked this out locally and stepped through the tests, this is looking great. Thanks @rdblue!
| String truncatedString = | ||
| UnicodeUtil.truncateStringMax((String) value.asPrimitive().get(), 16); | ||
| return truncatedString != null ? Variants.of(PhysicalType.STRING, truncatedString) : null; | ||
| case BINARY: | ||
| ByteBuffer truncatedBuffer = | ||
| BinaryUtil.truncateBinaryMin((ByteBuffer) value.asPrimitive().get(), 16); |
There was a problem hiding this comment.
Nit: Do we want to move 16 to an internal constant?
| } | ||
|
|
||
| return ImmutableList.of( | ||
| new FieldMetrics<>( |
There was a problem hiding this comment.
I feel like truncate 16 for all the shredded fields is the most intuitive out of all the options. That said I was wondering, would it make more sense to pass the MetricsMode to MetricsVariantVisitor that way if it's a truncate, we can use the user set length property as opposed to ignoring it? I don't think this is a hard blocker though, since the default is already truncate(16) and it's probably a sane truncation for the shredded fields as well.
|
Thanks for the reviews, @aihuaxu and @amogh-jahagirdar! I'll leave this open a bit longer because I think @danielcweeks also wanted to take a look. I'll follow up with him. |
|
Merging this. Thanks for the reviews, @aihuaxu, @amogh-jahagirdar, and @danielcweeks! |
This implements metrics for Variant types stored in Parquet files, using new visitors to produce the metrics.
This also refactors the existing metrics code to use a visitor. If I remember correctly, the metrics code predates the Parquet visitor. I think the visitor version is cleaner.