Fix variant type filtering in ParquetMetricsRowGroupFilter#14081
Conversation
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Thanks @geruh for taking this up! The fix looks right to me but I think we need to cleanup the tests in ParquetMetricsRowGroupFilter since I don't think there's a reason we need to actually write files (take a look at how the other tests in the class work).
Can we also add some end to end tests via Spark 4.0 which are more like what was reported in the original issue?
| OutputFile outFile = Files.localOutput(parquetFile); | ||
| try (FileAppender<GenericRecord> appender = | ||
| Parquet.write(outFile) | ||
| .schema(variantSchema) | ||
| .createWriterFunc(GenericParquetWriter::create) | ||
| .build()) { | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| GenericRecord record = GenericRecord.create(variantSchema); | ||
| record.setField("id", i); | ||
|
|
||
| if (i % 2 == 0) { | ||
| VariantMetadata metadata = Variants.metadata("field"); | ||
| ShreddedObject obj = Variants.object(metadata); | ||
| obj.put("field", Variants.of("value" + i)); | ||
| Variant variant = Variant.of(metadata, obj); | ||
| record.setField("variant_field", variant); | ||
| } | ||
|
|
||
| appender.add(record); | ||
| } | ||
| } |
There was a problem hiding this comment.
Why do we need to write records in these tests? At this level of abstraction, I think we should just create the row group filter with the notNull("variant_field") and it assert that shouldRead is true.
There was a problem hiding this comment.
+1. Seems we don't need to write to the files.
There was a problem hiding this comment.
Thanks for the feedback Amogh!
Good point about avoiding file writing at this level. I initially wrote the tests this way because this test class shares a schema for writing out data files in both ORC and Parquet. We're now in a situation where ORC doesn't have full support for variant types while Parquet does, so adding variant fields to the shared schema would break the existing ORC tests.
That said, it probably makes sense to use separate schemas for Parquet and ORC given the differnt levels of support. and write the tests to reflect that.
There was a problem hiding this comment.
Talked with @geruh offline and I also poked around refactoring this class, and while I think we should (this mixture of orc/parquet both trying to test "row group filtering" is leading to weird tests), it's a big change especially for something going into a patch release.
Also while technically the implementation of filtering with variant doesn't depend on the actual contents of the file, after some more thought I concluded that it's better to write a more realistic test which does contain the records like @geruh was doing before.
| OutputFile outFile = Files.localOutput(parquetFile); | ||
| try (FileAppender<GenericRecord> appender = | ||
| Parquet.write(outFile) | ||
| .schema(variantSchema) | ||
| .createWriterFunc(GenericParquetWriter::create) | ||
| .build()) { | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| GenericRecord record = GenericRecord.create(variantSchema); | ||
| record.setField("id", i); | ||
| record.setField("variant_field", null); | ||
| appender.add(record); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Same as above, don't think we need to actually write parquet files in these tests
| // evaluated post scan. Variant types also need to be evaluated post scan to access | ||
| // shredded statistics. |
There was a problem hiding this comment.
I'd remove the "variant types also need to be evaluated post scan to access shredded statistics". I'd just update to reflect the current state of things which is in that first sentence, "When filtering nested types or variant...." and the second sentence to be "Leave these type filters...".
For shredded stats pruning, the core library already contains BoundExtract, what we need is the translation/plumbing from engines to that extract, and then I think we can do pruning based on the shredded stats. For now though , I'd just leave it out of comments since I think it's more confusing and a bit inaccurate.
|
I also think this is something that should go into a 1.10.1 patch release since it's unfortunately a correctness issue |
|
cc @aihuaxu |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Also can we double check some of the other cases like eq, in..? The spark tests would surface that as well
| // evaluated post scan. Variant types also need to be evaluated post scan to access | ||
| // shredded statistics. | ||
| Type type = schema.findType(id); | ||
| if (type instanceof Type.NestedType || type.isVariantType()) { |
RussellSpitzer
left a comment
There was a problem hiding this comment.
The fix looks good to me, I agree with @amogh-jahagirdar that the tests should get changed to match up with the others in the file.
We will definitely have to resist this once we push through variant shredded predicates
aihuaxu
left a comment
There was a problem hiding this comment.
The change looks good to me. Thanks for fixing.
| // evaluated post scan. Variant types also need to be evaluated post scan to access | ||
| // shredded statistics. |
| OutputFile outFile = Files.localOutput(parquetFile); | ||
| try (FileAppender<GenericRecord> appender = | ||
| Parquet.write(outFile) | ||
| .schema(variantSchema) | ||
| .createWriterFunc(GenericParquetWriter::create) | ||
| .build()) { | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| GenericRecord record = GenericRecord.create(variantSchema); | ||
| record.setField("id", i); | ||
|
|
||
| if (i % 2 == 0) { | ||
| VariantMetadata metadata = Variants.metadata("field"); | ||
| ShreddedObject obj = Variants.object(metadata); | ||
| obj.put("field", Variants.of("value" + i)); | ||
| Variant variant = Variant.of(metadata, obj); | ||
| record.setField("variant_field", variant); | ||
| } | ||
|
|
||
| appender.add(record); | ||
| } | ||
| } |
There was a problem hiding this comment.
+1. Seems we don't need to write to the files.
| OutputFile outFile = Files.localOutput(parquetFile); | ||
| try (FileAppender<GenericRecord> appender = | ||
| Parquet.write(outFile) | ||
| .schema(variantSchema) | ||
| .createWriterFunc(GenericParquetWriter::create) | ||
| .build()) { | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| GenericRecord record = GenericRecord.create(variantSchema); | ||
| record.setField("id", i); | ||
|
|
||
| if (i % 2 == 0) { | ||
| VariantMetadata metadata = Variants.metadata("field"); | ||
| ShreddedObject obj = Variants.object(metadata); | ||
| obj.put("field", Variants.of("value" + i)); | ||
| Variant variant = Variant.of(metadata, obj); | ||
| record.setField("variant_field", variant); | ||
| } | ||
|
|
||
| appender.add(record); | ||
| } | ||
| } |
There was a problem hiding this comment.
Talked with @geruh offline and I also poked around refactoring this class, and while I think we should (this mixture of orc/parquet both trying to test "row group filtering" is leading to weird tests), it's a big change especially for something going into a patch release.
Also while technically the implementation of filtering with variant doesn't depend on the actual contents of the file, after some more thought I concluded that it's better to write a more realistic test which does contain the records like @geruh was doing before.
| assertThat(planAsString) | ||
| .as("Post scan filter should match") | ||
| .contains("Filter (" + sparkFilter + ")"); | ||
| .containsAnyOf("Filter (" + sparkFilter + ")", "Filter " + sparkFilter); |
There was a problem hiding this comment.
Are the filters on variant missing the parens?
There was a problem hiding this comment.
No this is specific to the spark filters. I added this so we can capture a simple spark filter. e.g. when spark doesn't apply a NOT NULL check to a filter. This is observed with IN predicates where Spark doesn't add the implicit IS NOT NULL check, resulting in simpler filter expressions without parentheses.
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Ok from my side I think this looks good now w just a minor non-blocking nit, thank you @geruh for the fix. I'll leave it up for another day or so for others before merging
| assertThat((actualValue).toString()) | ||
| .as("%s contents should match (VariantVal JSON)", context) | ||
| .isEqualTo((expectedValue).toString()); |
There was a problem hiding this comment.
Discussed offline, I think it's reasonable to compare the JSON representation to determine if the logical variant is equivalent.
nastra
left a comment
There was a problem hiding this comment.
LGTM with a few minor comments
|
Thanks @geruh! Thank you @aihuaxu @RussellSpitzer @nastra for reviewing. |
…GroupFilter (apache#14081) (cherry picked from commit fb63af0)
Fixes #14071
Variant types were not handled correctly in Parquet row group filtering. The
ParquetMetricsRowGroupFilter.notNull()needs to account for variant types which require post scan evaluation to access shredded statistics for nested field filtering, similar to structs are handled.This change ensures variant columns are properly handled during Parquet metrics evaluation, allowing row groups with variant data to be correctly included for post-scan filtering rather than being incorrectly filtered out at the row group level.
Testing