Skip to content

[Core] Add max files rewrite option for RewriteAction #12824

Merged
pvary merged 26 commits into
apache:mainfrom
coderfender:add_option_to_write_max_files_overwrite
May 16, 2025
Merged

[Core] Add max files rewrite option for RewriteAction #12824
pvary merged 26 commits into
apache:mainfrom
coderfender:add_option_to_write_max_files_overwrite

Conversation

@coderfender

@coderfender coderfender commented Apr 17, 2025

Copy link
Copy Markdown
Contributor

This is a new option when re-writing data files (Spark Actions) to provide user the ability to limit the number of files re-written to potentially reduce file OPS . This option is named as max-files-to-rewrite which takes a positive integer as an input, truncates the file tasks until the value is reached. In case the table has fewer files than the parameter value, all the files are processed for re-write option. A property check to ensure that no value less than 0 has also been put in place to ensure early failure.

Implementation :

  1. plan method in BinPackRewriteFilePlanner has been refactored to truncate the list of file scan tasks (and there by files to be processed)
  2. An atomic integer (to ensure consistency in streams) called fileCountRunner is used to update counter as the StructLikeMap<List<List<FileScanTask>>> plan is processed in parallel
  3. In case the size of entire fileScanTask list in a partition is > maxFilesToRewrite + fileCountRunner, the fileScanTask list is truncated to only add the files until the maxFilesToRewrite value is reached.
  4. selectedFileGroups is leveraged to hold the final file groups.

Testing :

  1. TestBinPackRewriteFilePlanner::testRewriteMaxFilesOption is written to handle upper bound use case where the value max-files-to-rewrite > total number of files in the table
  2. TestBinPackRewriteFilePlanner::testRewriteMaxFilesOptionInequality is written to handle equality use case where the value max-files-to-rewrite > total number of files in the table and the resulting data files after rewrite are less than max-files-to-rewrite
  3. textMaxFilesRewriteToOnlyTruncateNeededPartitions is written to ensure that only needed partitions truncated.
  4. testInvalidMaxFilesRewriteParam is written to ensure that all validations (along with error messages) are working as expected

@coderfender coderfender force-pushed the add_option_to_write_max_files_overwrite branch from 6bbeb57 to e8eb111 Compare April 17, 2025 06:28
@coderfender coderfender marked this pull request as ready for review April 17, 2025 14:13
@coderfender

coderfender commented Apr 17, 2025

Copy link
Copy Markdown
Contributor Author

@manuzhang , resolved conflicts and update the branch and the PR is ready for review

@coderfender

Copy link
Copy Markdown
Contributor Author

Issue : #12832

@coderfender coderfender force-pushed the add_option_to_write_max_files_overwrite branch from 5abd324 to f53b205 Compare April 17, 2025 15:05
@yogevyuval

Copy link
Copy Markdown
Contributor

This is a new option when re-writing data files (Spark Actions) to provide user the ability to limit the number of files re-written to potentially reduce file OPS . This option is named as max-files-to-rewrite which takes a positive integer as an input, truncates the file tasks until the value is reached. In case the table has fewer files than the parameter value, all the files are processed for re-write option. A property check to ensure that no value less than 1 has also been put in place to ensure early failure.

Implementation :

  1. toGroupStream method in RewriteDataFilesSparkAction has been refactored to truncate the list of file scan tasks (and there by files to be processed)
  2. An atomic integer (to ensure consistency in parallel streams) called fileCountRunner is used to update counter as the groupsByPartition is processed in parallel
  3. In case the size of entire fileScanTask list in a partition is > maxFilesToRewrite + fileCountRunner, the fileScanTask list is truncated to only add the files until the maxFilesToRewrite value is reached.
  4. selectedFileGroups is leveraged to hold the final file groups.

Testing :

  1. TestRewriteDataFilesAction::testRewriteMaxFilesOption is written to handle upper bound use case where the value max-files-to-rewrite > total number of files in the table
  2. TestRewriteDataFilesAction::testRewriteMaxFilesOptionEquality is written to handle equality use case where the value max-files-to-rewrite < total number of files in the table and the resulting data files after rewrite are equal to max-files-to-rewrite

Could you elaborate on the use case that this is trying to solve? Is this to limit the resources a single job needed to reduce failures?

@sririshindra sririshindra left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the concern of the PR is to avoid the rewrite data files procedure to take a long time,
The MAX_FILES_TO_REWRITE option doesn't necessarily need to be applied across multiple partitions. Users can enable partial progress and maybe MAX_FILES_TO_REWRITE option be should be applied to per partition rather than the whole table.

Also, maybe the size of the files should be considered instead of the number of files as a criteria for cutoff. That way the criteria would be to compress as many small files to big files as possible without overwhelming the compute.

For instance, let's say the value of MAX_FILES_TO_REWRITE is set to 1000. But all 1000 files are really small say (100 KB), and rewriting all 1000 (as allowed by MAX_FILES_TO_REWRITE) files (say within one partition. assume there are 10000 small files that needs to be compressed overall) selected will still only result in a larger file of size 100MB.

But Iceberg recommends (I think it's the default) the target-file-size to be 512 MB. So, basically we are missing an opportunity to compress an additional 412 MB worth of small files into a larger file.

If I remember correctly, the rewrite_data_files procedure will return the number of data files it has rewritten as part of the output. Maybe we should also return the number of data files that are yet to be rewritten (but not rewritten due to MAX_FILES_TO_REWRITE option) as part of the output as well, so that the user can make an informed decision as to how to plan their next rewrite operation.

Stream<RewriteFileGroup> toGroupStream(
RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
return groupsByPartition.entrySet().stream()
if (maxFilesToRewrite == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually can we instead set this value to LONG.MAX_FILES_TO_REWRITE by default, so that you don't have to have two code blocks to do the same thing. If the user doesn't specify this option, then MAX_FILES_TO_REWRITE is set to the default. So, along as the number of files being processed doesn't't exceed LONG.MAX_FILES_TO_REWRITE your implementation for the case when MAX_FILES_TO_REWRITE is set to some value will suffice.

If you instead decide to make the criteria something like MAX_FILES_SIZE_TO_REWRITE then the same trick works there as well.

@coderfender coderfender Apr 17, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point ! My motivation here is to keep the option as simple and straightforward as possible. Having an upper bound ( I am assuming you meant setting default value of param MAX_FILES_TO_REWRITE as LONG.MAX_VALUE) which would add a side effect to this functionality by limiting the number of file to 2^^63-1 . However unlikely that is, I dont believe optional parameters should interfere in the default behavior and should be isolated for the sake of consistency and clarity.

@coderfender

Copy link
Copy Markdown
Contributor Author

@yogevyuval , The goal here is to provide user an option to limit the number of files to be rewritten (either through compaction , data rewrite etc) . In a use case (like mine) where there are 1 billion plus files in a lake house, the user might want to iteratively run compaction to reduce the file count to an acceptable level rather than going all in at the very first time. This option should help improve rewrite spark jobs and the users can tune this param to optimize scale and reliability

@coderfender

Copy link
Copy Markdown
Contributor Author

@anuragmantri could you take a look whenever you get a chance please?

@coderfender

Copy link
Copy Markdown
Contributor Author

@manuzhang , Could you please review this since I believe have addressed all the above issues / questions?

@anuragmantri

Copy link
Copy Markdown
Collaborator

Also @RussellSpitzer - If you have a minute.

@manuzhang manuzhang left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderfender You might want to update on the dev list or slack for wider audience.

@coderfender

Copy link
Copy Markdown
Contributor Author

@manuzhang , Already pinged in the dev channel to get some feedback : https://apache-iceberg.slack.com/archives/C03LG1D563F/p1744871503652659 . Let me bump the message again .
Thank you for reviewing and please let me know if you see any issues with the code which I might have been missing

Comment thread api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java Outdated
@coderfender coderfender force-pushed the add_option_to_write_max_files_overwrite branch from d3cce44 to 1c6ed65 Compare April 24, 2025 19:19
@coderfender coderfender force-pushed the add_option_to_write_max_files_overwrite branch from 650d730 to 984c8bc Compare May 15, 2025 02:21
@coderfender

Copy link
Copy Markdown
Contributor Author

Rebased the branch with main

@coderfender

coderfender commented May 15, 2025

Copy link
Copy Markdown
Contributor Author

@pvary , please take look whenever you get a chance and I would love to make any other changes your recommend
Thank you

Comment thread core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java Outdated

@pvary pvary left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from my side
One small change and mostly a question, maybe a method name change or comment

@pvary pvary merged commit c247896 into apache:main May 16, 2025
42 checks passed
@pvary

pvary commented May 16, 2025

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks for all the work @coderfender on the PR, and @RussellSpitzer for the review!

@coderfender: Could you please create the backport PRs for Spark and Flink?

Thanks, Peter

@coderfender

Copy link
Copy Markdown
Contributor Author

Thank you @pvary @RussellSpitzer @anuragmantri . I will start working on the documentation changes and raise a PR soon

@RussellSpitzer

Copy link
Copy Markdown
Member

Remember to "forward port" too now that we have a 4.0 Module :)

@coderfender

Copy link
Copy Markdown
Contributor Author

Sure , I will create another PR to support backport / forward port this functionality. @RussellSpitzer , @pvary
Spark - v 3.4 and 3.5 are already done so I will only have to make changes to support v4.0. Re : Flink I will have to make changes to support 1.19 and v2.0 and I am hoping all these changes can go in PR ?

@RussellSpitzer

Copy link
Copy Markdown
Member

I have no problem with doing all the other changes at once, I just don't like having them all in the original PR because it's harder to track changes

@coderfender

Copy link
Copy Markdown
Contributor Author

Sure I will create a new PR just for the porting changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants