Core: Data loss after compaction #2195#2196
Conversation
|
@aokolnychyi, can you take a look at this? |
|
I am sorry, this is a known bug,I had found the bug when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan. |
|
@zhangjun0x01, next time please highlight that there is a correctness problem. I didn't know that #1762 fixed a correctness problem or we would have prioritized it and made sure it was in the 0.11.0 release. Thanks for pointing us to that issue, we will take a look at both alternative solutions. |
@zhangjun0x01 |
|
@Stephen-Robin I think there is no need to split the large file, because if the file size exceeds the target size, it will be automatically split into multiple |
I'm very sorry, this problem should be resolved before 0.11.0 release. When I submitted this pr, I requested openinx to help me to review, maybe he be busy, I forgot this pr when 0.11.0 was released, I must pay attention to this problem next time. |
|
You're welcome to subscribe dev@ mailing list and participate discussion, RC verification, etc. Github mention is easy to slip through, so once you find some urgent things like regression or correctness, dev@ mailing list (or Slack channel) is appropriate place to share. |
@zhangjun0x01
|
yes,I thought wrong
it sounds make sense |
33cee26 to
709b5b2
Compare
709b5b2 to
e04d456
Compare
|
I think this PR is pretty much ready to go other than a few nits |
|
My quick notes on this issue: |
|
lgtm, @aokolnychyi do you have any comments? |
|
|
||
| Actions actions = Actions.forTable(table); | ||
|
|
||
| long targetSizeInBytes = file.length() - 10; |
There was a problem hiding this comment.
It would be easier to understand if this were set up where file is used to create dataFile. Or, this could use the length of dataFile instead so we don't need to make sure dataFile and file have the same length.
There was a problem hiding this comment.
I just don’t understand this very well, I need to set splitTargetSize to a value smaller than the largest file
|
|
||
| CloseableIterable<FileScanTask> tasks = table.newScan().planFiles(); | ||
| List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); | ||
| Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size()); |
There was a problem hiding this comment.
Minor: I think we should refer to these in the context as "files" not "tasks" because tasks are usually what we get after splitting and combining.
There was a problem hiding this comment.
Ah yeah I think I was mistaken here, I think we may be generating multiple files via the writeRecords method which could potentially parallelize the write resulting in multiple files. (This is for the 2 record file)
Instead we would need to do a repartition(1) to insure a single file is written.
In my internal test I just did the repartition to make sure we had a single file writes
| fileAppender.add(record); | ||
| excepted.add(record); | ||
| } | ||
| } |
There was a problem hiding this comment.
Why does this create an appender instead of using writeRecords?
I think this could easily find the largest data file from planFiles and base the length on that instead.
There was a problem hiding this comment.
As I noted above, you may need a modified "writeRecords" which uses a single partition if you want to generate 1 file.
There was a problem hiding this comment.
Okay, thank you for your comments, I will make changes immediately
| Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size()); | ||
|
|
||
| long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count(); | ||
| List<Object[]> rewrittenRecords = sql("SELECT * from rows sort by c2"); |
There was a problem hiding this comment.
I'm not very comfortable with using the same view to load original and rewritten records. Can you create a separate view for the rewritten data? That way we avoid any weird caching behavior.
|
|
||
| CloseableIterable<FileScanTask> tasks = table.newScan().planFiles(); | ||
| List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); | ||
| Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size()); |
There was a problem hiding this comment.
This appends one file directly (the big one, dataFile) and one using writeRecords. Why are there 3 files in the table at this point?
There was a problem hiding this comment.
yes. 1 maxSizeFile + 2 row data file
| .splitOpenFileCost(1) | ||
| .execute(); | ||
|
|
||
| Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size()); |
There was a problem hiding this comment.
What are the 4 data files that this action deletes? Is one of the 3 from above duplicated?
There was a problem hiding this comment.
yeah, I'm wondering if I need to perform deduplication
There was a problem hiding this comment.
I wonder if this is an error in RewriteDatafile Actions, where it should use a set to collect the deleted files. Like we are getting 1 record of "delete" for each split in the large file.
|
|
||
| CloseableIterable<FileScanTask> tasks = table.newScan().planFiles(); | ||
| List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); | ||
| DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes)); |
|
Thank you for fixing this, @Stephen-Robin! I've tagged this for inclusion in the 0.11.1 patch release. We should probably do that soon since we have a correctness bug. |
For details, please refer to #2195