Skip to content

Spark: enable stream-results option for remove orphan files#14278

Merged
pvary merged 20 commits into
apache:mainfrom
arifazmidd:spark-stream-results-for-remove-orphan-files
Nov 6, 2025
Merged

Spark: enable stream-results option for remove orphan files#14278
pvary merged 20 commits into
apache:mainfrom
arifazmidd:spark-stream-results-for-remove-orphan-files

Conversation

@arifazmidd

@arifazmidd arifazmidd commented Oct 8, 2025

Copy link
Copy Markdown
Contributor

Closes #3703

Description

This PR adds streaming support to the remove_orphan_files Spark procedure to prevent driver OOM issues when dealing with tables that have many orphan files.

This mimics the existing behavior for expire_snapshots with the stream_results parameter that was added in #4152

Changes

  • Added stream-results option to DeleteOrphanFilesSparkAction
  • Modified deleteFiles() method to take a dataset and process streaming deletion using toLocalIterator()
  • Returns sample of up to 20,000 file paths
  • Added STREAM_RESULTS_PARAM to RemoveOrphanFilesProcedure

Real-World Testing Results

Tested on AWS EMR with a production table containing ~3PB of orphaned data:

Run Description Learnings
Dry Run using Original Implementation Driver crashes due to OOM.
Dry Run using Stream Results Completed successfully. Returned a sample of 20k file paths that will be deleted and the total count of files that will be deleted as ~45M.
Full Run using Original Implementation Driver crashes due to OOM.
Full Run using Stream Results Terminated after 4 hours because that was the timeout set; however, it successfully iterated through and deleted ~31M files.
Full Run using Original Implementation (after streaming run) Completed successfully. Deleted the remaining ~14M orphaned files.
Full Run using Stream Results on sps-eta (validation run) Completed successfully. Deleted ~1200 new orphan files.

Key Findings:

  • Original implementation consistently crashes with OOM on large-scale orphan file cleanup (~45M files)
  • Streaming implementation successfully handles massive workloads without memory issues
  • Successfully deleted ~31M files in a single streaming run (terminated by timeout, not failure)
  • Streaming approach enables incremental cleanup of large orphan file sets

@github-actions github-actions Bot added the spark label Oct 8, 2025
@arifazmidd

Copy link
Copy Markdown
Contributor Author

Hi @RussellSpitzer @pvary @liziyan-lzy @huaxingao, if any of you have time to help review this that would be greatly appreciated!

@pvary

pvary commented Oct 14, 2025

Copy link
Copy Markdown
Contributor

@arifazmidd: Could you fix the test please?

@arifazmidd

arifazmidd commented Oct 14, 2025

Copy link
Copy Markdown
Contributor Author

Thanks for running the CI @pvary; I have fixed the formatting issues.

}
}

orphanFileDS.unpersist();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unfamiliar with Spark, so this is just a question.

Do we need a try/finally to unpersist the DS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the try/finally as you mentioned but it's not necessary as spark automatically monitors this and drops old data in a LRU fashion. We are just explicitly trying to do this right when we no longer need the data.

Dataset<String> orphanFileDS, SetAccumulator<Pair<String, String>> conflicts) {
// Cache the dataset and force computation to populate the conflicts accumulator
// This allows us to validate conflicts before starting any deletions
orphanFileDS = orphanFileDS.cache();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Spark, so this is just a question:
What is the cost of this? Do we lose what we gain with not reading every file at once?

@arifazmidd arifazmidd Oct 21, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data is being cached into the executors' memory so it's distributed and will spill to disk if needed. We also unpersist this data when it is no longer needed. This prevents us from having to double compute this dataset; once during the prefix validation on count() and then again on toLocalIterator or collectAsList(). Instead the dataset can be streamed or collected from cache.

}

@TestTemplate
public void testStreamResultsBackwardsCompatibility() throws IOException, InterruptedException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no actual difference between the tests with streaming on and off.
The only noticeable difference could be the sample/full result in the list.
Could we test that?
Shall we create a method, or change the parametrization of this single test to avoid code duplication?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is no real difference between what the result is with streaming on or off other than the output containing a sample vs the full result.

I have removed testStreamResultsBackwardsCompatibility() as it is already covered in testDryRun. I also removed testStreamResultsWithDryRun() and added a check in testDryRun with streaming enabled.

Now we only have one new test method testStreamResultsDeletion which ensure files are correctly deleted with streaming enabled. This doesn't check that the output should only be 20k rows though. I'm not sure if that's something we want to test since it would require creating 20k+ files. The correctness of the streaming behavior is already validated by these tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correctness of the streaming behavior is already validated by these tests.

If I accidentally remove the whole streaming feature, but keep the config, the test will not fail. For me, this is concerning

@arifazmidd arifazmidd Oct 23, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's a good point. To address this I attempted to follow the ExpireSnapshotsAction.testUseLocalIterator() pattern by comparing Spark job counts, but it turns out both modes use the same number of jobs in our case because:

  1. Both modes cache and count the dataset for validation
  2. The difference (collectAsList vs toLocalIterator) happens on the cached data, which doesn't create different job patterns

Alternative approaches I can think of right now to verify streaming behavior:

  1. Test with enough files to hit the 20k sampling limit. I guess someone could still remove most of the streaming feature but just keep this sampling portion at the end and the test would still pass though.
  2. Test that verifies function calls (toLocalIterator vs collectAsList).

Do you think either of these suffice?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make the sample list file sizes configurable for tests?
That's still lame, because the feature is streaming and not the collection 😄
On a second thought. Maybe it's ok. At least we can know that the result is not stored in memory.

Could we extend or inject or call directly the RemoveOrphanFilesProcedure? This is very Sparky question, and I don't have a good answer.

@arifazmidd arifazmidd Oct 24, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but the multiple batch deletion is not unique to streaming mode. We use an iterator and delete in batches for both

if (streamResults()) {
      return deleteFiles(orphanFileDS.toLocalIterator(), orphanFileDS);
} else {
      return deleteFiles(orphanFileDS.collectAsList().iterator(), orphanFileDS);
}

and then in deleteFiles:

Iterator<List<String>> fileGroups = Iterators.partition(orphanFiles, DELETE_GROUP_SIZE);

while (fileGroups.hasNext()) {
  ...
  <call to deleteBulk or deleteNonBulk>
  ...
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I undestand the Spark behavior correctly, if I think that the Iterators.partition(orphanFiles, DELETE_GROUP_SIZE) will create a single group for the non-streaming path, and multiple groups for the streaming path?

If I am mistaken with the above assumption, then I don't understand why we have 2 parameters in the deleteFiles method. We can just have a single DS parameter, or we could even merge the deleteFile to the doExecute

@arifazmidd arifazmidd Oct 28, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the group number will be equivalent for both streaming and non-streaming.

We had two parameters because ExpireSnapshotsAction's deleteFiles method takes in an iterator so to keep consistent we refactored to pass the iterator and then we are passing the dataset for un-persisting. But you are correct, this can be improved. There are two options I see.

  1. Pass only the dataset and determine the iterator to use within the deleteFiles method.
  2. Pass only the iterator and don't unpersist the dataset from cache manually, allow Spark to handle it on its own.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be out for a few days. Will come back next week. Sorry for the delay

@arifazmidd arifazmidd Oct 29, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nws, thanks for the review so far. I've updated the test to have a configurable output sample size parameter as we previously discussed and simplified deleteFiles to only take the dataset as a parameter (option 1 from above).

@pvary pvary left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me, but I would like to have someone with more Spark knowledge to review it too.
CC: @szehon-ho, @huaxingao, @RussellSpitzer

@pvary pvary requested review from huaxingao and szehon-ho November 3, 2025 13:23

// Cache and force computation to populate conflicts accumulator
orphanFileDS = orphanFileDS.cache();
orphanFileDS.count();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an error here, and we don't unpersist the DS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, yes it is possible.

We can either go with the alternative option I had suggested above:

Dataset<String> orphanFileDS = null;
try {
  orphanFileDS = findOrphanFiles(...);
  return deleteFiles(orphanFileDS);
} finally {
  if (orphanFileDS != null) { 
    orphanFileDS.unpersist();
  }
}

or wrap the code block in findOrphanFiles with try-catch.

orphanFileDS = orphanFileDS.cache();

try {
  orphanFileDS.count();

  if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
    throw new ValidationException(...); 
  }
  
  return orphanFileDS; 
} catch (Exception e) { // not sure if we want to be catching broad exception and re-throwing like this
  orphanFileDS.unpersist();
  throw e;
}

I think the first option is better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of an error we might already called the orphanFileDS.cache(), but not return a value, just throw an exception. So in this case the orphanFileDS will be null and we don't unpersist the cache with this code:

Dataset<String> orphanFileDS = null;
try {
  orphanFileDS = findOrphanFiles(...);
  return deleteFiles(orphanFileDS);
} finally {
  if (orphanFileDS != null) { 
    orphanFileDS.unpersist();
  }
}

So I think we need to do the wrapping in the findOrphanFiles.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

*
* <p>Streaming mode can be enabled via the {@value #STREAM_RESULTS} option to avoid loading all
* orphan file paths into driver memory. When enabled, the result will contain only a sample of file
* paths (up to {@value #MAX_ORPHAN_FILE_PATHS_TO_RETURN_WHEN_STREAMING}). The total count of

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be @value #MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes sorry the variable name was updated but didn't update the documentation.

PREFIX_MISMATCH_MODE_PARAM,
PREFIX_LISTING_PARAM
PREFIX_LISTING_PARAM,
STREAM_RESULTS_PARAM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update docs/docs/spark-procedures.md for remove_orphan_files to add the new parameter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docs -- same content as expire_snapshots documentation with an added note about the output size max

@huaxingao

Copy link
Copy Markdown
Contributor

@arifazmidd Thanks for the PR! LGTM overall.

@github-actions github-actions Bot added the docs label Nov 5, 2025
@pvary pvary merged commit 99ccfc4 into apache:main Nov 6, 2025
29 checks passed
@pvary

pvary commented Nov 6, 2025

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @arifazmidd for the PR and @huaxingao for the review!

@arifazmidd: Please port the changes to Spark 3.4, and 4.0. This command could help:

g diff HEAD^ spark/v3.5 |sed "s/v3.5/v4.0/g">/tmp/patch;g apply -3 -p1 /tmp/patch

Please tell us on the new PR if the backport was clean (no manual changes are required), or you needed to do changes manually. In this case highlight the extra changes, so we have easier time to review.

Thanks,
Peter

@arifazmidd

Copy link
Copy Markdown
Contributor Author

Thanks for the reviews @pvary and @huaxingao!

cccs-nik pushed a commit to CybercentreCanada/iceberg that referenced this pull request Nov 18, 2025
thomaschow pushed a commit to thomaschow/iceberg that referenced this pull request Jan 19, 2026
dushyantk1509 pushed a commit to dushyantk1509/iceberg that referenced this pull request Mar 11, 2026
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results
option to DeleteOrphanFilesSparkAction to prevent driver OOM when
removing large numbers of orphan files. Instead of collecting all orphan
file paths into driver memory, files are streamed partition-by-partition
using toLocalIterator() and deleted in batches of 100K.

When enabled, the result contains a sample of up to 20,000 file paths.
The total count of deleted files is logged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
dushyantk1509 pushed a commit to dushyantk1509/iceberg that referenced this pull request Mar 17, 2026
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results
option to DeleteOrphanFilesSparkAction to prevent driver OOM when
removing large numbers of orphan files. Instead of collecting all orphan
file paths into driver memory, files are streamed partition-by-partition
using toLocalIterator() and deleted in batches of 100K.

When enabled, the result contains a sample of up to 20,000 file paths.
The total count of deleted files is logged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
maluchari pushed a commit to linkedin/iceberg that referenced this pull request Apr 7, 2026
Backport of apache/iceberg#14278 to openhouse-1.5.2. Adds stream-results
option to DeleteOrphanFilesSparkAction to prevent driver OOM when
removing large numbers of orphan files. Instead of collecting all orphan
file paths into driver memory, files are streamed partition-by-partition
using toLocalIterator() and deleted in batches of 100K.

When enabled, the result contains a sample of up to 20,000 file paths.
The total count of deleted files is logged.

Co-authored-by: Dushyant Kumar <dukumar@linkedin.biz>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DeleteOrphanFiles or ExpireSnapshots outofmemory

4 participants