Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -455,8 +455,8 @@ protected TableMetadata refresh() {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
// this is always set to the latest commit attempt's snapshot
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
Expand All @@ -471,7 +471,7 @@ public void commit() {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
stagedSnapshot.set(newSnapshot);
newSnapshotId.set(newSnapshot.snapshotId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set the snapshot ID here? It should be available everywhere using snapshotId().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really just restoring the behavior to how it was prior to 39373d0 so I didn't do any other modifications to the code other than adding a test

TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
Expand Down Expand Up @@ -509,22 +509,29 @@ public void commit() {
throw e;
}

// at this point, the commit must have succeeded so the stagedSnapshot is committed
Snapshot committedSnapshot = stagedSnapshot.get();
try {
LOG.info(
"Committed snapshot {} ({})",
committedSnapshot.snapshotId(),
getClass().getSimpleName());
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to mention the case we're fixing: that we may not know which attempt succeded in some cases and we will only clean up the one that actually did.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, added some additional wording around this

// it might not be known which commit attempt succeeded in some cases, so this only cleans
// up the one that actually did succeed.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
if (cleanupAfterCommit()) {
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
}

if (cleanupAfterCommit()) {
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
}
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
}
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that I agree here. If we can't load the latest metadata, it should throw an exception. But anything could happen so unless we know exactly what was committed, it is safer to skip cleanup.

@nastra nastra Mar 16, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really just restoring the behavior + wording to how it was prior to 39373d0 so I didn't do any other modifications to the code other than adding a test.
The wording itself here was added a few years ago by 5300d27 and the commit details mention that no exception should be thrown

LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (Throwable e) {
LOG.warn(
Expand Down
66 changes: 66 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -158,4 +160,68 @@ public void testCommitValidationWithCustomSummaryProperties() throws IOException
// Verify the table wasn't updated
assertThat(table.snapshots()).hasSize(1);
}

@TestTemplate
public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit() {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test mainly hits the else branch in SnapshotProducer and thus skips manifest cleanup but I wasn't able to reproduce the theoretical issue that stuff is accidentally cleaned up and causes corrupt metadata so far

// Uses a custom TableOps that returns stale metadata (without the new snapshot) on the
// first refresh() after commit, simulating eventual consistency. Verifies that commit succeeds
// and that the committed data is visible once the table is refreshed again
String tableName = "stale-table-on-first-refresh";
TestTables.TestTableOperations ops = opsWithStaleRefreshAfterCommit(tableName, tableDir);
TestTables.TestTable tableWithStaleRefresh =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops);

// the first refresh() after the commit will return stale metadata (without this snapshot), so
// SnapshotProducer will skip cleanup to avoid accidentally deleting files that are part of the
// committed snapshot but commit still succeeds
tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();

// Refresh again to get the real metadata; the snapshot must be visible now
tableWithStaleRefresh.ops().refresh();
Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
assertThat(snapshot)
.as("Committed snapshot must be visible after refresh (eventual consistency resolved)")
.isNotNull();

File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
.isNotEmpty()
.allSatisfy(
manifest -> assertThat(metadata.listFiles()).contains(new File(manifest.path())));
}

/**
* Creates a TableOperations that returns stale metadata (without the newly committed snapshot) on
* the first refresh() after a commit. This simulates eventual consistency where the committed
* snapshot is not yet visible. Used to verify that when the snapshot cannot be loaded after
* commit, cleanup is skipped to avoid accidentally deleting files that are part of the committed
* snapshot.
*/
private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
String name, File location) {
return new TestTables.TestTableOperations(name, location) {
private TableMetadata metadataToReturnOnNextRefresh;

@Override
public void commit(TableMetadata base, TableMetadata updatedMetadata) {
super.commit(base, updatedMetadata);
if (base != null) {
// return stale metadata on the first refresh() call
this.metadataToReturnOnNextRefresh = base;
}
}

@Override
public TableMetadata refresh() {
if (metadataToReturnOnNextRefresh != null) {
this.current = metadataToReturnOnNextRefresh;
this.metadataToReturnOnNextRefresh = null;
return current;
}

return super.refresh();
}
};
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public static class TestTableOperations implements TableOperations {
private final String tableName;
private final File metadata;
private final FileIO fileIO;
private TableMetadata current = null;
protected TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3659,7 +3659,7 @@ public void testNumLoadTableCallsForMergeAppend() {
table.newAppend().appendFile(FILE_A).commit();

// loadTable is executed once
Mockito.verify(adapter)
Mockito.verify(adapter, times(2))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've verified that this was called 3 times prior to c6b252e and prior to #10523. Given that we're adding back an additional table load this is now being called twice

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current loads are to refresh prior to commit and to refresh after commit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current loads are to refresh prior to commit and to refresh after commit?

yes, exactly

.execute(matches(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any());

// CommitReport reflects the table state after the commit
Expand Down
Loading