Spark 3.3: Add SparkChangelogTable#5740
Conversation
| } | ||
|
|
||
| private Pair<Table, Long> load(Identifier ident) { | ||
| private Table load(Identifier ident, String version) { |
There was a problem hiding this comment.
I am using existing options for configuring boundaries. This means we cannot use SQL right now. Only the DF API. Hopefully, we will have support for options in Spark 3.4.
An alternative option is to add a stored procedure to generate a changelog and register it as a view. We will need the procedure in any case to generate pre and pos images. I am reluctant to use table identifiers as it makes the logic tricky.
| } | ||
|
|
||
| SparkChangelogBatch that = (SparkChangelogBatch) o; | ||
| return scan.equals(that.scan); |
There was a problem hiding this comment.
I don't think it is very clean to implement both Scan and Batch in one class. I understand we had a performance regression but I think it was because our Batch implementation did not implement equals and hashCode.
Here is the code in Spark BatchScanExec.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec =>
this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
case _ =>
false
}
There was a problem hiding this comment.
@bryanck, do you remember the details on that issue? Do you think my assumption is reasonable?
There was a problem hiding this comment.
Yes, that was what I found, the equals call returned false and the filters weren't pushed down. I had a workaround for that, but IIRC I ran into some other issues. Unfortunately I didn't delve deeper at that point and I went with reverting the change. It could be that implementing equals resolves the issue. I could run a benchmark test to confirm if interested.
There was a problem hiding this comment.
Great, I can submit a separate PR and it would be awesome if you could re-run the benchmark. I'll ping you.
| } | ||
|
|
||
| public static String[] blockLocations(FileIO io, CombinedScanTask task) { | ||
| public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) { |
There was a problem hiding this comment.
probably more of a question for my understanding. Iceberg only guarantee compatibility for classes from iceberg-api module, correct?
There was a problem hiding this comment.
Correct. Only iceberg-api has the API / ABI compatibility guarantees.
There was a problem hiding this comment.
Yep, but this is compatible as CombinedScanTask implements ScanTaskGroup. Existing user code should continue to work.
| } | ||
|
|
||
| @Test | ||
| public void testMetadataDeletes() { |
There was a problem hiding this comment.
why is this called metadata delete? is it because of the assertion of DataOperations.DELETE?
There was a problem hiding this comment.
I believe this is because the actual delete operation is issues against an entire partition, the partition of data = 'a'. This delete operation uses an optimized / "metadata only" operation; no data files need to be read or rewritten to perform the delete.
Thata's always been my understanding of "metadata deletes". That they are deletes which only require updating metadata, without having to inspect data files.
| long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); | ||
| long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); | ||
| return new Stats(sizeInBytes, rowsCount); | ||
| } |
There was a problem hiding this comment.
If I remember correctly, statistics were calculated multiple times during the same query in some other scenarios.
Would there be any benefit to caching this result? It was @bryanck I believe who found that we were spending extra time in statistics calculation before.
There was a problem hiding this comment.
I double checked and we have the same logic in our regular scans. I think it will be fairly cheap to call this method multiple times because taskGroups() caches the result and we will simply iterate over it in memory.
| boolean isChangelog = false; | ||
|
|
||
| for (String meta : parsed.second()) { | ||
| if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) { |
There was a problem hiding this comment.
changelog should be the last element of the list, right? this may have false match.
There was a problem hiding this comment.
This is for path-based tables, which have a bit weird identifiers like location#meta1,meta2,meta3 so I am not sure whether changelog must be last. Let me think.
There was a problem hiding this comment.
I double checked this and I think we should follow the existing logic for path-based tables where the order of parts in a selector does not matter.
| return sparkTable.copyWithSnapshotId(Long.parseLong(version)); | ||
|
|
||
| } else if (table instanceof SparkChangelogTable) { | ||
| throw new UnsupportedOperationException("AS OF is not supported for changelogs"); |
There was a problem hiding this comment.
nit: maybe complete AS OF as AsOfTime
There was a problem hiding this comment.
Spark supports both timestamp and version based syntax.
temporalClause
: FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING)
| FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=valueExpression
flyrain
left a comment
There was a problem hiding this comment.
Thanks for the PR @aokolnychyi. Looks good overall. I favor this solution over a view. Is there a plan to support specifying a snapshot range in SQL, e.g. select * from table.changes where start_snapshot = xxx and end_snapshot = xxx?
| import org.apache.spark.broadcast.Broadcast; | ||
| import org.apache.spark.sql.connector.read.InputPartition; | ||
|
|
||
| class SparkInputPartition implements InputPartition, Serializable { |
| if (isChangelog) { | ||
| return new SparkChangelogTable(table, !cacheEnabled); | ||
| } else if (snapshotId != null) { | ||
| return new SparkTable(table, snapshotId, !cacheEnabled); | ||
| } else if (asOfTimestamp != null) { | ||
| return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); | ||
| return new SparkTable( | ||
| table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp), !cacheEnabled); | ||
| } else { | ||
| return Pair.of(table, null); | ||
| return new SparkTable(table, null, !cacheEnabled); |
There was a problem hiding this comment.
A refactor suggestion: we may use a builder here.
There was a problem hiding this comment.
I'll try it out if we decide to make changes in SparkCatalog.
There was a problem hiding this comment.
I tried but it seemed like an overkill as it is just a single place where it makes sense. However, I did refactor this part a bit so it should be slightly better now.
437fd3f to
b6b4da4
Compare
| long snapshotId = Long.parseLong(id.group(1)); | ||
| return Pair.of(table, snapshotId); | ||
| return new SparkTable(table, snapshotId, !cacheEnabled); | ||
| } |
There was a problem hiding this comment.
Not a blocker. It'd be more readable if we wrap the code within the catch clause. like this:
try {
org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
return new SparkTable(table, !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Table table = loadAlternativeTable(ident);
if (table != null) {
return table;
} else {
throw e;
}
}
There was a problem hiding this comment.
That's a good idea. Let me do that in a separate PR after this one.
|
Thanks for reviewing, @stevenzwu @flyrain @kbendick! |
This PR adds
SparkChangelogTablefor querying changelogs in Spark.