Arrow: Fix vectorized reads for Parquet TIMESTAMP_MILLIS types#14499
Conversation
| if (System.getProperty(ALLOCATION_MANAGER_TYPE_PROPERTY) == null) { | ||
| System.setProperty(ALLOCATION_MANAGER_TYPE_PROPERTY, "Netty"); | ||
| } |
There was a problem hiding this comment.
well the issue is, arrow's auto-detection of allocation manager type breaks when classes are shaded org.apache.iceberg.shaded.org.apache.arrow.*, because CheckAllocator.check() inspects JAR paths via ProtectionDomain.getCodeSource().getLocation() and doesn't recognize shaded package structures. Setting this property bypasses the broken-based detection, which is essential since our BigIntVector allocation for TIMESTAMP_MILLIS requires a working RootAllocator in shaded Spark runtime environments.
There was a problem hiding this comment.
Could we add a brief comment explaining why we set arrow.memory.allocation.manager.type to Netty, and consider logging at debug when we default it to Netty so it’s visible at runtime?
|
Is there any way to cover this with tests? One option I can think of is to add a test case to Spark where we write the file with |
|
|
||
| org.apache.arrow.vector.FieldVector eventTimeVector = root.getVector("event_time"); | ||
| assertThat(eventTimeVector).isNotNull(); | ||
| assertThat(eventTimeVector).isInstanceOf(org.apache.arrow.vector.BigIntVector.class); |
There was a problem hiding this comment.
can we also assert the value?
There was a problem hiding this comment.
done.
also, in VectorizedParquetDefinitionLevelReader.java, arrow validity buffer was not being properly set for non-null values. (bit should be 1 when Value is not null). Fixed that.
| try (ParquetWriter<InternalRow> writer = | ||
| new NativeSparkWriterBuilder(outputFile) | ||
| .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) | ||
| .set("spark.sql.parquet.writeLegacyFormat", "false") | ||
| .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS") | ||
| .set("spark.sql.parquet.fieldId.write.enabled", "true") | ||
| .build()) { | ||
| for (InternalRow row : rows) { | ||
| writer.write(row); | ||
| } | ||
| } |
There was a problem hiding this comment.
Do we have a native parquet writer in the base too?
Maybe we can create a test without spark
There was a problem hiding this comment.
Well, the issue is, iceberg’s native parquet writers explicitly reject TIMESTAMP_MILLIS (check here), as by design iceberg standardizes on microsecond precision.
TIMESTAMP_MILLS support exists only for reading externally-produced files (like from Spark)
To write a test without Spark will add a lot of extra code, and that too using Parquet API. IMO, it’s better to have the current end to end test with Spark.
There was a problem hiding this comment.
Could we use ExampleParquetWriter or something similar?
There was a problem hiding this comment.
Or maybe use AvroParquetWriter to create the Parquet file? Just an idea, maybe it is easier to write the file that way, and it supports millisec precision.
There was a problem hiding this comment.
@nandorKollar well, for
(a) ExampleParquetWriter : I fear, we have to manually construct Parquet group objects and manually define the schema with TIMESTAMP_MILLIS logical type, which is much more complex than our current approach. Moreover, Iceberg never used ExampleParquetWriter, using it would require writing low-level parquet code.
(b) AvroParquetWriter : Iceberg’s AvroSchemaUtil always uses timestampMicros , never timestampMillis check here. While avro supports timestamp-millis, you cannot use it through. We have to manually create the avro schema for it, and also handle the data conversion. Again, it will bring more unnecessary code.
IMO, we should stick to our current approach.
There was a problem hiding this comment.
If we plan to support reading Parquet files with TIMESTAMP_MILLIS logical type, we need to support it in all of the readers.
- Arrow
- Spark
- Flink
For this we need to find a way to test it.
There was a problem hiding this comment.
@shubham19may it would be better not to rely on Spark module to cover this case with a test, but rather keep the the arrow module 'self-contained', cover it's functionality with tests there. TestVariantReaders already creates a custom Parquet writer with Avro object modell, it doesn't look too complicated. I'll try to put together an example with ExampleParquetWriter, it might be a bit more complicated. As Iceberg's Parquet writer doesn't write timestamp millis, unfortunately we need to implement our test writer for these types. Actually this is not the only case which is not covered, for example Parquet files with unsigned integer types are not covered either, probably there we can't even use the trick to test it in Spark, as unsigned types are unknown in Spark too.
There was a problem hiding this comment.
Sure @pvary and @nandorKollar.
I have updated the test, moved it away from Spark to the arrow module, inside TestArrowReader.java. Please give it a review whenever you are free, and do tell me if any further changes are required.
| if (setArrowValidityVector) { | ||
| BitVectorHelper.setBit(vector.getValidityBuffer(), bufferIdx); | ||
| } | ||
|
|
There was a problem hiding this comment.
nit: newline here is not needed.
pvary
left a comment
There was a problem hiding this comment.
LGTM.
Thanks @shubham19may!
One small formal ask only.
Please @nandorKollar and @huaxingao review.
Thanks,
Peter
@shubham19may thanks for adding the test case, looks good! |
|
|
||
| Table table = tables.create(schema, tableLocation); | ||
|
|
||
| File testFile = new File(tempDir, "timestamp-millis-test.parquet"); |
There was a problem hiding this comment.
nit: would it be possible to use InMemoryOutputFile instead of temp files? We can maybe rewrite the Arrow tests in a subsequent followup PR.
|
Oh, one more thing: can we please make the title of this change more precise, we no longer fix any 'shaded JAR initialization' problem, that's misleading. |
done |
|
Merged to main. |
|
Thanks @pvary @huaxingao @nandorKollar for the reviews and help. |
Fixes: #14430 & #14046
Description
This PR fixes: Reading Parquet files with
TIMESTAMP_MILLISError:
Cause: when reading parquet files with
TIMESTAMP_MILLISlogical type annotation, the VectorizedArrowReader incorrectly allocated a TimeStampMicroTZVector based on iceberg schema (which expects microsecond precision), but the actual reader (TimeStampMicroTZVector) writes raw long values that require a BigIntVector.Fix:
FieldwithArrowType.Int(Long.SIZE, true)typeBigIntVectorthat matches what TimestampMillisReader expects and returnReadType.TIMESTAMP_MILLISto trigger millisecond-to-microsecond conversionTesting
spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLISAddFile()API)spark.sql.iceberg.vectorization.enabled=true