Skip to content

Core: Data loss after compaction #2195#2196

Merged
rdblue merged 6 commits into
apache:masterfrom
Stephen-Robin:rewriteDataFileFix
Feb 4, 2021
Merged

Core: Data loss after compaction #2195#2196
rdblue merged 6 commits into
apache:masterfrom
Stephen-Robin:rewriteDataFileFix

Conversation

@Stephen-Robin

Copy link
Copy Markdown
Contributor

For details, please refer to #2195

@github-actions github-actions Bot added the core label Feb 2, 2021
@rdblue

rdblue commented Feb 2, 2021

Copy link
Copy Markdown
Contributor

@aokolnychyi, can you take a look at this?

@rdblue rdblue requested a review from aokolnychyi February 2, 2021 23:54
@zhangjun0x01

Copy link
Copy Markdown
Contributor

I am sorry, this is a known bug,I had found the bug when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan.

@rdblue

rdblue commented Feb 3, 2021

Copy link
Copy Markdown
Contributor

@zhangjun0x01, next time please highlight that there is a correctness problem. I didn't know that #1762 fixed a correctness problem or we would have prioritized it and made sure it was in the 0.11.0 release. Thanks for pointing us to that issue, we will take a look at both alternative solutions.

@Stephen-Robin

Stephen-Robin commented Feb 3, 2021

Copy link
Copy Markdown
Contributor Author

I am sorry, this is a known bug,I had found the bug when I did the Rewrite Action,and I had open a PR #1762 , just not merged ,the purpose of this rewrite Action is to compaction small files, so I think it is more reasonable to exclude data files which size > the target size during table scan.

@zhangjun0x01
Hi zhangjun, I found that large files exceeding the threshold were filtered out in PR.#1762 , Perhapsd the rewrite data file operation should not only includes small file merging, but also large files should be segmented and rewritten. This PR has already rewritten large files after segmentation. What do you think about this, and thanks rdblue for pushing this issue.

@zhangjun0x01

Copy link
Copy Markdown
Contributor

@Stephen-Robin I think there is no need to split the large file, because if the file size exceeds the target size, it will be automatically split into multiple CombinedScanTasks when reading, and read concurrently, instead of having a task to read the large file. Splitting a large file into multiple small files will make the Rewrite Action consume more resources, and too many small files are not friendly to hdfs. what do you think about ?

@zhangjun0x01

Copy link
Copy Markdown
Contributor

@zhangjun0x01, next time please highlight that there is a correctness problem. I didn't know that #1762 fixed a correctness problem or we would have prioritized it and made sure it was in the 0.11.0 release. Thanks for pointing us to that issue, we will take a look at both alternative solutions.

I'm very sorry, this problem should be resolved before 0.11.0 release. When I submitted this pr, I requested openinx to help me to review, maybe he be busy, I forgot this pr when 0.11.0 was released, I must pay attention to this problem next time.

@HeartSaVioR

Copy link
Copy Markdown
Contributor

You're welcome to subscribe dev@ mailing list and participate discussion, RC verification, etc. Github mention is easy to slip through, so once you find some urgent things like regression or correctness, dev@ mailing list (or Slack channel) is appropriate place to share.

@Stephen-Robin

Stephen-Robin commented Feb 3, 2021

Copy link
Copy Markdown
Contributor Author

@Stephen-Robin Splitting a large file into multiple small files will make the Rewrite Action consume more resources, and too many small files are not friendly to hdfs. what do you think about ?

@zhangjun0x01
I think it makes sense to split large files into smaller files.

  1. the user’s goal is to get the expected file size.
  2. When the split target size is set to such as 256M, rewriting large files after splitting will not significantly increase the load of hdfs.
  3. After splitting into small files, it is convenient for filtering by file metadata in manifest.
  4. We can also add an option to let users decide whether to split large files

@zhangjun0x01

Copy link
Copy Markdown
Contributor
  1. When the split target size is set to such as 256M, rewriting large files after splitting will not significantly increase the load of hdfs.

yes,I thought wrong

  1. After splitting into small files, it is convenient for filtering by file metadata in manifest.

it sounds make sense

@RussellSpitzer

Copy link
Copy Markdown
Member

I think this PR is pretty much ready to go other than a few nits

@RussellSpitzer

RussellSpitzer commented Feb 3, 2021

Copy link
Copy Markdown
Member

My quick notes on this issue:

Previously when computing the rewrite tasks for RewriteDataFiles the code
would ignore scan tasks which referred to a single file. This is an issue because
large files could be potentitally split into multiple read tasks. If one
slice of a large file was combined with a slice from another file, that
secition would be rewritten with the other file, but the other slices would be ignored.

For example given 2 files
File A - 100 Bytes
File B - 10 Bytes

If the target split size was 60 bytes we would end up with 3 tasks
A : 1 - 60
A : 61 - 100
B : 0 - 10

Which would be combined into

(A : 1 - 60)
(A : 61 -100, B : 0 -10)

The first task would be discarded since it only refered to one file. The
second task would be rewritten, which would end with deleting file A and B.

I believe the original intent was to ignore single file scan tasks as it was assumed these would
be unchanged files. But if a single file scan task only contains a partial scan of a file it must 
be rewritten since it represents a new smaller file that needs to be rewritten.

Normally this doesn't cause data loss since an ignored file won't be deleted, but if a split is
combined with another file, then that triggers the delete of the large file, even though several
splits of the large file will not have been written into new files.

@RussellSpitzer

Copy link
Copy Markdown
Member

lgtm, @aokolnychyi do you have any comments?


Actions actions = Actions.forTable(table);

long targetSizeInBytes = file.length() - 10;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier to understand if this were set up where file is used to create dataFile. Or, this could use the length of dataFile instead so we don't need to make sure dataFile and file have the same length.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just don’t understand this very well, I need to set splitTargetSize to a value smaller than the largest file


CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I think we should refer to these in the context as "files" not "tasks" because tasks are usually what we get after splitting and combining.

@RussellSpitzer RussellSpitzer Feb 3, 2021

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I think I was mistaken here, I think we may be generating multiple files via the writeRecords method which could potentially parallelize the write resulting in multiple files. (This is for the 2 record file)

Instead we would need to do a repartition(1) to insure a single file is written.

In my internal test I just did the repartition to make sure we had a single file writes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fileAppender.add(record);
excepted.add(record);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this create an appender instead of using writeRecords?

I think this could easily find the largest data file from planFiles and base the length on that instead.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I noted above, you may need a modified "writeRecords" which uses a single partition if you want to generate 1 file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thank you for your comments, I will make changes immediately

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());

long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
List<Object[]> rewrittenRecords = sql("SELECT * from rows sort by c2");

@rdblue rdblue Feb 3, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very comfortable with using the same view to load original and rewritten records. Can you create a separate view for the rewritten data? That way we avoid any weird caching behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
Assert.assertEquals("Should have 3 scan tasks before rewrite", 3, dataFiles.size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appends one file directly (the big one, dataFile) and one using writeRecords. Why are there 3 files in the table at this point?

@Stephen-Robin Stephen-Robin Feb 3, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. 1 maxSizeFile + 2 row data file

.splitOpenFileCost(1)
.execute();

Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the 4 data files that this action deletes? Is one of the 3 from above duplicated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm wondering if I need to perform deduplication

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is an error in RewriteDatafile Actions, where it should use a set to collect the deleted files. Like we are getting 1 record of "delete" for each split in the large file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this one later.

@rdblue rdblue added this to the Java 0.11.1 Release milestone Feb 4, 2021

CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good.

@rdblue rdblue merged commit e2a0ba4 into apache:master Feb 4, 2021
@rdblue

rdblue commented Feb 4, 2021

Copy link
Copy Markdown
Contributor

Thank you for fixing this, @Stephen-Robin! I've tagged this for inclusion in the 0.11.1 patch release. We should probably do that soon since we have a correctness bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants