Spark: enable stream-results option for remove orphan files#14278
Conversation
|
Hi @RussellSpitzer @pvary @liziyan-lzy @huaxingao, if any of you have time to help review this that would be greatly appreciated! |
|
@arifazmidd: Could you fix the test please? |
|
Thanks for running the CI @pvary; I have fixed the formatting issues. |
| } | ||
| } | ||
|
|
||
| orphanFileDS.unpersist(); |
There was a problem hiding this comment.
I am unfamiliar with Spark, so this is just a question.
Do we need a try/finally to unpersist the DS?
There was a problem hiding this comment.
I have added the try/finally as you mentioned but it's not necessary as spark automatically monitors this and drops old data in a LRU fashion. We are just explicitly trying to do this right when we no longer need the data.
| Dataset<String> orphanFileDS, SetAccumulator<Pair<String, String>> conflicts) { | ||
| // Cache the dataset and force computation to populate the conflicts accumulator | ||
| // This allows us to validate conflicts before starting any deletions | ||
| orphanFileDS = orphanFileDS.cache(); |
There was a problem hiding this comment.
I'm not familiar with Spark, so this is just a question:
What is the cost of this? Do we lose what we gain with not reading every file at once?
There was a problem hiding this comment.
The data is being cached into the executors' memory so it's distributed and will spill to disk if needed. We also unpersist this data when it is no longer needed. This prevents us from having to double compute this dataset; once during the prefix validation on count() and then again on toLocalIterator or collectAsList(). Instead the dataset can be streamed or collected from cache.
| } | ||
|
|
||
| @TestTemplate | ||
| public void testStreamResultsBackwardsCompatibility() throws IOException, InterruptedException { |
There was a problem hiding this comment.
There is no actual difference between the tests with streaming on and off.
The only noticeable difference could be the sample/full result in the list.
Could we test that?
Shall we create a method, or change the parametrization of this single test to avoid code duplication?
There was a problem hiding this comment.
Yes there is no real difference between what the result is with streaming on or off other than the output containing a sample vs the full result.
I have removed testStreamResultsBackwardsCompatibility() as it is already covered in testDryRun. I also removed testStreamResultsWithDryRun() and added a check in testDryRun with streaming enabled.
Now we only have one new test method testStreamResultsDeletion which ensure files are correctly deleted with streaming enabled. This doesn't check that the output should only be 20k rows though. I'm not sure if that's something we want to test since it would require creating 20k+ files. The correctness of the streaming behavior is already validated by these tests.
There was a problem hiding this comment.
The correctness of the streaming behavior is already validated by these tests.
If I accidentally remove the whole streaming feature, but keep the config, the test will not fail. For me, this is concerning
There was a problem hiding this comment.
Hmm that's a good point. To address this I attempted to follow the ExpireSnapshotsAction.testUseLocalIterator() pattern by comparing Spark job counts, but it turns out both modes use the same number of jobs in our case because:
- Both modes cache and count the dataset for validation
- The difference (collectAsList vs toLocalIterator) happens on the cached data, which doesn't create different job patterns
Alternative approaches I can think of right now to verify streaming behavior:
- Test with enough files to hit the 20k sampling limit. I guess someone could still remove most of the streaming feature but just keep this sampling portion at the end and the test would still pass though.
- Test that verifies function calls (
toLocalIteratorvscollectAsList).
Do you think either of these suffice?
There was a problem hiding this comment.
Maybe make the sample list file sizes configurable for tests?
That's still lame, because the feature is streaming and not the collection 😄
On a second thought. Maybe it's ok. At least we can know that the result is not stored in memory.
Could we extend or inject or call directly the RemoveOrphanFilesProcedure? This is very Sparky question, and I don't have a good answer.
There was a problem hiding this comment.
Yes but the multiple batch deletion is not unique to streaming mode. We use an iterator and delete in batches for both
if (streamResults()) {
return deleteFiles(orphanFileDS.toLocalIterator(), orphanFileDS);
} else {
return deleteFiles(orphanFileDS.collectAsList().iterator(), orphanFileDS);
}
and then in deleteFiles:
Iterator<List<String>> fileGroups = Iterators.partition(orphanFiles, DELETE_GROUP_SIZE);
while (fileGroups.hasNext()) {
...
<call to deleteBulk or deleteNonBulk>
...
}
There was a problem hiding this comment.
Do I undestand the Spark behavior correctly, if I think that the Iterators.partition(orphanFiles, DELETE_GROUP_SIZE) will create a single group for the non-streaming path, and multiple groups for the streaming path?
If I am mistaken with the above assumption, then I don't understand why we have 2 parameters in the deleteFiles method. We can just have a single DS parameter, or we could even merge the deleteFile to the doExecute
There was a problem hiding this comment.
I believe the group number will be equivalent for both streaming and non-streaming.
We had two parameters because ExpireSnapshotsAction's deleteFiles method takes in an iterator so to keep consistent we refactored to pass the iterator and then we are passing the dataset for un-persisting. But you are correct, this can be improved. There are two options I see.
- Pass only the dataset and determine the iterator to use within the
deleteFilesmethod. - Pass only the iterator and don't unpersist the dataset from cache manually, allow Spark to handle it on its own.
There was a problem hiding this comment.
I will be out for a few days. Will come back next week. Sorry for the delay
There was a problem hiding this comment.
Nws, thanks for the review so far. I've updated the test to have a configurable output sample size parameter as we previously discussed and simplified deleteFiles to only take the dataset as a parameter (option 1 from above).
pvary
left a comment
There was a problem hiding this comment.
Generally looks good to me, but I would like to have someone with more Spark knowledge to review it too.
CC: @szehon-ho, @huaxingao, @RussellSpitzer
|
|
||
| // Cache and force computation to populate conflicts accumulator | ||
| orphanFileDS = orphanFileDS.cache(); | ||
| orphanFileDS.count(); |
There was a problem hiding this comment.
Is it possible to have an error here, and we don't unpersist the DS?
There was a problem hiding this comment.
Good catch, yes it is possible.
We can either go with the alternative option I had suggested above:
Dataset<String> orphanFileDS = null;
try {
orphanFileDS = findOrphanFiles(...);
return deleteFiles(orphanFileDS);
} finally {
if (orphanFileDS != null) {
orphanFileDS.unpersist();
}
}
or wrap the code block in findOrphanFiles with try-catch.
orphanFileDS = orphanFileDS.cache();
try {
orphanFileDS.count();
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
throw new ValidationException(...);
}
return orphanFileDS;
} catch (Exception e) { // not sure if we want to be catching broad exception and re-throwing like this
orphanFileDS.unpersist();
throw e;
}
I think the first option is better.
There was a problem hiding this comment.
In case of an error we might already called the orphanFileDS.cache(), but not return a value, just throw an exception. So in this case the orphanFileDS will be null and we don't unpersist the cache with this code:
Dataset<String> orphanFileDS = null;
try {
orphanFileDS = findOrphanFiles(...);
return deleteFiles(orphanFileDS);
} finally {
if (orphanFileDS != null) {
orphanFileDS.unpersist();
}
}
So I think we need to do the wrapping in the findOrphanFiles.
| * | ||
| * <p>Streaming mode can be enabled via the {@value #STREAM_RESULTS} option to avoid loading all | ||
| * orphan file paths into driver memory. When enabled, the result will contain only a sample of file | ||
| * paths (up to {@value #MAX_ORPHAN_FILE_PATHS_TO_RETURN_WHEN_STREAMING}). The total count of |
There was a problem hiding this comment.
should this be @value #MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT
There was a problem hiding this comment.
Ah yes sorry the variable name was updated but didn't update the documentation.
| PREFIX_MISMATCH_MODE_PARAM, | ||
| PREFIX_LISTING_PARAM | ||
| PREFIX_LISTING_PARAM, | ||
| STREAM_RESULTS_PARAM |
There was a problem hiding this comment.
shall we update docs/docs/spark-procedures.md for remove_orphan_files to add the new parameter?
There was a problem hiding this comment.
Updated docs -- same content as expire_snapshots documentation with an added note about the output size max
|
@arifazmidd Thanks for the PR! LGTM overall. |
|
Merged to main. @arifazmidd: Please port the changes to Spark 3.4, and 4.0. This command could help: Please tell us on the new PR if the backport was clean (no manual changes are required), or you needed to do changes manually. In this case highlight the extra changes, so we have easier time to review. Thanks, |
|
Thanks for the reviews @pvary and @huaxingao! |
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results option to DeleteOrphanFilesSparkAction to prevent driver OOM when removing large numbers of orphan files. Instead of collecting all orphan file paths into driver memory, files are streamed partition-by-partition using toLocalIterator() and deleted in batches of 100K. When enabled, the result contains a sample of up to 20,000 file paths. The total count of deleted files is logged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results option to DeleteOrphanFilesSparkAction to prevent driver OOM when removing large numbers of orphan files. Instead of collecting all orphan file paths into driver memory, files are streamed partition-by-partition using toLocalIterator() and deleted in batches of 100K. When enabled, the result contains a sample of up to 20,000 file paths. The total count of deleted files is logged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results option to DeleteOrphanFilesSparkAction to prevent driver OOM when removing large numbers of orphan files. Instead of collecting all orphan file paths into driver memory, files are streamed partition-by-partition using toLocalIterator() and deleted in batches of 100K. When enabled, the result contains a sample of up to 20,000 file paths. The total count of deleted files is logged. Co-authored-by: Dushyant Kumar <dukumar@linkedin.biz> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Closes #3703
Description
This PR adds streaming support to the
remove_orphan_filesSpark procedure to prevent driver OOM issues when dealing with tables that have many orphan files.This mimics the existing behavior for
expire_snapshotswith thestream_resultsparameter that was added in #4152Changes
stream-resultsoption toDeleteOrphanFilesSparkActiondeleteFiles()method to take a dataset and process streaming deletion usingtoLocalIterator()STREAM_RESULTS_PARAMtoRemoveOrphanFilesProcedureReal-World Testing Results
Tested on AWS EMR with a production table containing ~3PB of orphaned data:
Key Findings: