[Core] Add max files rewrite option for RewriteAction #12824
Conversation
6bbeb57 to
e8eb111
Compare
|
@manuzhang , resolved conflicts and update the branch and the PR is ready for review |
|
Issue : #12832 |
5abd324 to
f53b205
Compare
Could you elaborate on the use case that this is trying to solve? Is this to limit the resources a single job needed to reduce failures? |
sririshindra
left a comment
There was a problem hiding this comment.
If the concern of the PR is to avoid the rewrite data files procedure to take a long time,
The MAX_FILES_TO_REWRITE option doesn't necessarily need to be applied across multiple partitions. Users can enable partial progress and maybe MAX_FILES_TO_REWRITE option be should be applied to per partition rather than the whole table.
Also, maybe the size of the files should be considered instead of the number of files as a criteria for cutoff. That way the criteria would be to compress as many small files to big files as possible without overwhelming the compute.
For instance, let's say the value of MAX_FILES_TO_REWRITE is set to 1000. But all 1000 files are really small say (100 KB), and rewriting all 1000 (as allowed by MAX_FILES_TO_REWRITE) files (say within one partition. assume there are 10000 small files that needs to be compressed overall) selected will still only result in a larger file of size 100MB.
But Iceberg recommends (I think it's the default) the target-file-size to be 512 MB. So, basically we are missing an opportunity to compress an additional 412 MB worth of small files into a larger file.
If I remember correctly, the rewrite_data_files procedure will return the number of data files it has rewritten as part of the output. Maybe we should also return the number of data files that are yet to be rewritten (but not rewritten due to MAX_FILES_TO_REWRITE option) as part of the output as well, so that the user can make an informed decision as to how to plan their next rewrite operation.
| Stream<RewriteFileGroup> toGroupStream( | ||
| RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) { | ||
| return groupsByPartition.entrySet().stream() | ||
| if (maxFilesToRewrite == null) { |
There was a problem hiding this comment.
Actually can we instead set this value to LONG.MAX_FILES_TO_REWRITE by default, so that you don't have to have two code blocks to do the same thing. If the user doesn't specify this option, then MAX_FILES_TO_REWRITE is set to the default. So, along as the number of files being processed doesn't't exceed LONG.MAX_FILES_TO_REWRITE your implementation for the case when MAX_FILES_TO_REWRITE is set to some value will suffice.
If you instead decide to make the criteria something like MAX_FILES_SIZE_TO_REWRITE then the same trick works there as well.
There was a problem hiding this comment.
Good point ! My motivation here is to keep the option as simple and straightforward as possible. Having an upper bound ( I am assuming you meant setting default value of param MAX_FILES_TO_REWRITE as LONG.MAX_VALUE) which would add a side effect to this functionality by limiting the number of file to 2^^63-1 . However unlikely that is, I dont believe optional parameters should interfere in the default behavior and should be isolated for the sake of consistency and clarity.
|
@yogevyuval , The goal here is to provide user an option to limit the number of files to be rewritten (either through compaction , data rewrite etc) . In a use case (like mine) where there are 1 billion plus files in a lake house, the user might want to iteratively run compaction to reduce the file count to an acceptable level rather than going all in at the very first time. This option should help improve rewrite spark jobs and the users can tune this param to optimize scale and reliability |
|
@anuragmantri could you take a look whenever you get a chance please? |
|
@manuzhang , Could you please review this since I believe have addressed all the above issues / questions? |
|
Also @RussellSpitzer - If you have a minute. |
manuzhang
left a comment
There was a problem hiding this comment.
@coderfender You might want to update on the dev list or slack for wider audience.
|
@manuzhang , Already pinged in the dev channel to get some feedback : https://apache-iceberg.slack.com/archives/C03LG1D563F/p1744871503652659 . Let me bump the message again . |
d3cce44 to
1c6ed65
Compare
650d730 to
984c8bc
Compare
|
Rebased the branch with main |
|
@pvary , please take look whenever you get a chance and I would love to make any other changes your recommend |
pvary
left a comment
There was a problem hiding this comment.
+1 from my side
One small change and mostly a question, maybe a method name change or comment
|
Merged to main. @coderfender: Could you please create the backport PRs for Spark and Flink? Thanks, Peter |
|
Thank you @pvary @RussellSpitzer @anuragmantri . I will start working on the documentation changes and raise a PR soon |
|
Remember to "forward port" too now that we have a 4.0 Module :) |
|
Sure , I will create another PR to support backport / forward port this functionality. @RussellSpitzer , @pvary |
|
I have no problem with doing all the other changes at once, I just don't like having them all in the original PR because it's harder to track changes |
|
Sure I will create a new PR just for the porting changes |
This is a new option when re-writing data files (Spark Actions) to provide user the ability to limit the number of files re-written to potentially reduce file OPS . This option is named as
max-files-to-rewritewhich takes a positive integer as an input, truncates the file tasks until the value is reached. In case the table has fewer files than the parameter value, all the files are processed for re-write option. A property check to ensure that no value less than 0 has also been put in place to ensure early failure.Implementation :
planmethod inBinPackRewriteFilePlannerhas been refactored to truncate the list of file scan tasks (and there by files to be processed)fileCountRunneris used to update counter as theStructLikeMap<List<List<FileScanTask>>> planis processed in parallelselectedFileGroupsis leveraged to hold the final file groups.Testing :
TestBinPackRewriteFilePlanner::testRewriteMaxFilesOptionis written to handle upper bound use case where the valuemax-files-to-rewrite> total number of files in the tableTestBinPackRewriteFilePlanner::testRewriteMaxFilesOptionInequalityis written to handle equality use case where the valuemax-files-to-rewrite> total number of files in the table and the resulting data files after rewrite are less thanmax-files-to-rewritetextMaxFilesRewriteToOnlyTruncateNeededPartitionsis written to ensure that only needed partitions truncated.testInvalidMaxFilesRewriteParamis written to ensure that all validations (along with error messages) are working as expected