Skip to content

Spark: Write DVs for V3 MoR tables#11561

Merged
amogh-jahagirdar merged 1 commit into
apache:mainfrom
amogh-jahagirdar:spark-write-dv-files
Dec 6, 2024
Merged

Spark: Write DVs for V3 MoR tables#11561
amogh-jahagirdar merged 1 commit into
apache:mainfrom
amogh-jahagirdar:spark-write-dv-files

Conversation

@amogh-jahagirdar

@amogh-jahagirdar amogh-jahagirdar commented Nov 15, 2024

Copy link
Copy Markdown
Contributor

This change adds support for writing DVs in SparkPositionDeltaWrite for V3 tables.

@amogh-jahagirdar

Copy link
Copy Markdown
Contributor Author

Still some failing tests, and figuring out a good pattern to extend the existing Delete/Merge/Update tests to run against DVs. Something also worth thinking about for V3 is preventing partition granularity from being set since it's at odds with DVs.
We could also take a stance that when an upgrade to V3 is performed, we invalidate the delete granularity property completely since in V3 we should only be writing DVs and not position deletes (the granularity property is really only relevant for V2 pos deletes)

Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
@github-actions github-actions Bot removed the MR label Nov 20, 2024
@amogh-jahagirdar amogh-jahagirdar force-pushed the spark-write-dv-files branch 2 times, most recently from 5e8e727 to 0f00dfc Compare November 20, 2024 18:03
@amogh-jahagirdar amogh-jahagirdar changed the title (WIP) Write DVs in Spark for V3 tables Spark: Write DVs for V3 tables Nov 20, 2024
@amogh-jahagirdar amogh-jahagirdar changed the title Spark: Write DVs for V3 tables Spark: Write DVs for V3 MoR tables Nov 20, 2024
@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review November 20, 2024 18:42
Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java Outdated

@Override
public void close() throws IOException {
fileWriter.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we usually check if result is null to see if the writer was closed and we are not re-assigning the result object. It probably works in this case (knowing the underlying implementation), but I wonder whether making this check explicit will make the behavior of this method clear even without checking how the delegate writer works.

if (result == null) {
  fileWriter.close();
  this.result = fileWriter.result();
}

Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
FileScanTask fileScanTask = task.asFileScanTask();
for (DeleteFile deleteFile : fileScanTask.deletes()) {
if (ContentFileUtil.isFileScoped(deleteFile)) {
if (ContentFileUtil.isFileScoped(deleteFile) || ContentFileUtil.isDV(deleteFile)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't DVs considered file-scoped? I think isFileScoped will always be true for DVs (please check).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if DVs are enabled, we have to include all position deletes, not just file-scoped. The first ever produced DV must include all previous deletes. The only difference is that we can drop file-scoped from the table state (decided in the writer) and still have to keep partition-scoped deletes as they may apply to other data files.

Can we please test migration to DVs for tables that have a mix of file-scoped and partition-scoped position deletes?

@amogh-jahagirdar amogh-jahagirdar Nov 29, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, yes to both points. the file scoped Util correctly handles the DV case by checking the referenced data file, and we for DVs we do need to merge all existing position deletes, regardless of scope. I've fixed that and added a test case for update/delete/merge which will create a mix of file scoped and partition scoped deletes, and then finally produce a DV which has the positions merged from these existing deletes.

Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
@amogh-jahagirdar amogh-jahagirdar force-pushed the spark-write-dv-files branch 4 times, most recently from 0be3306 to a619d51 Compare November 30, 2024 00:16

@aokolnychyi aokolnychyi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. Feel free to merge whenever you are ready, @amogh-jahagirdar. Great work!

Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java Outdated
Comment thread core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java Outdated
for (DeleteFile deleteFile : fileScanTask.deletes()) {
if (ContentFileUtil.isFileScoped(deleteFile)) {
// Both file-scoped and partition-scoped position deletes must be rewritten for DVs
if (forDVs || ContentFileUtil.isFileScoped(deleteFile)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about equality deletes? Those should be ignored. If so, what about shouldRewrite method?

for (DeleteFile deleteFile : fileScanTask.deletes()) {
  if (shouldRewrite(deleteFile, forDVs)) {
    ...
  }
}
private boolean shouldRewrite(DeleteFile deleteFile, boolean dvsEnabled) {
  ...
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also cover this with tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I fixed to avoid broadcasting eq. deletes unnecessarily but need to figure out how to add a test which would inspect the broadcast contents, will a publish a separate PR for that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants