-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Core: Load snapshot after it has been committed to prevent accidental cleanup of files #15511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,7 @@ | |
| import java.util.UUID; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReferenceArray; | ||
| import java.util.function.Consumer; | ||
| import java.util.function.Function; | ||
|
|
@@ -455,8 +455,8 @@ protected TableMetadata refresh() { | |
| @Override | ||
| @SuppressWarnings("checkstyle:CyclomaticComplexity") | ||
| public void commit() { | ||
| // this is always set to the latest commit attempt's snapshot | ||
| AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>(); | ||
| // this is always set to the latest commit attempt's snapshot id. | ||
| AtomicLong newSnapshotId = new AtomicLong(-1L); | ||
| try (Timed ignore = commitMetrics().totalDuration().start()) { | ||
| try { | ||
| Tasks.foreach(ops) | ||
|
|
@@ -471,7 +471,7 @@ public void commit() { | |
| .run( | ||
| taskOps -> { | ||
| Snapshot newSnapshot = apply(); | ||
| stagedSnapshot.set(newSnapshot); | ||
| newSnapshotId.set(newSnapshot.snapshotId()); | ||
| TableMetadata.Builder update = TableMetadata.buildFrom(base); | ||
| if (base.snapshot(newSnapshot.snapshotId()) != null) { | ||
| // this is a rollback operation | ||
|
|
@@ -509,22 +509,29 @@ public void commit() { | |
| throw e; | ||
| } | ||
|
|
||
| // at this point, the commit must have succeeded so the stagedSnapshot is committed | ||
| Snapshot committedSnapshot = stagedSnapshot.get(); | ||
| try { | ||
| LOG.info( | ||
| "Committed snapshot {} ({})", | ||
| committedSnapshot.snapshotId(), | ||
| getClass().getSimpleName()); | ||
| LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName()); | ||
|
|
||
| // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by | ||
| // id in case another commit was added between this commit and the refresh. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to mention the case we're fixing: that we may not know which attempt succeded in some cases and we will only clean up the one that actually did.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense, added some additional wording around this |
||
| // it might not be known which commit attempt succeeded in some cases, so this only cleans | ||
| // up the one that actually did succeed. | ||
| Snapshot saved = ops.refresh().snapshot(newSnapshotId.get()); | ||
| if (saved != null) { | ||
| if (cleanupAfterCommit()) { | ||
| cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io()))); | ||
| } | ||
|
|
||
| if (cleanupAfterCommit()) { | ||
| cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io()))); | ||
| } | ||
| // also clean up unused manifest lists created by multiple attempts | ||
| for (String manifestList : manifestLists) { | ||
| if (!committedSnapshot.manifestListLocation().equals(manifestList)) { | ||
| deleteFile(manifestList); | ||
| // also clean up unused manifest lists created by multiple attempts | ||
| for (String manifestList : manifestLists) { | ||
| if (!saved.manifestListLocation().equals(manifestList)) { | ||
| deleteFile(manifestList); | ||
| } | ||
| } | ||
| } else { | ||
| // saved may not be present if the latest metadata couldn't be loaded due to eventual | ||
| // consistency problems in refresh. in that case, don't clean up. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know that I agree here. If we can't load the latest metadata, it should throw an exception. But anything could happen so unless we know exactly what was committed, it is safer to skip cleanup.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); | ||
| } | ||
| } catch (Throwable e) { | ||
| LOG.warn( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,9 @@ | |
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.nio.file.Paths; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import javax.annotation.Nonnull; | ||
|
|
@@ -158,4 +160,68 @@ public void testCommitValidationWithCustomSummaryProperties() throws IOException | |
| // Verify the table wasn't updated | ||
| assertThat(table.snapshots()).hasSize(1); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the test mainly hits the |
||
| // Uses a custom TableOps that returns stale metadata (without the new snapshot) on the | ||
| // first refresh() after commit, simulating eventual consistency. Verifies that commit succeeds | ||
| // and that the committed data is visible once the table is refreshed again | ||
| String tableName = "stale-table-on-first-refresh"; | ||
| TestTables.TestTableOperations ops = opsWithStaleRefreshAfterCommit(tableName, tableDir); | ||
| TestTables.TestTable tableWithStaleRefresh = | ||
| TestTables.create( | ||
| tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops); | ||
|
|
||
| // the first refresh() after the commit will return stale metadata (without this snapshot), so | ||
| // SnapshotProducer will skip cleanup to avoid accidentally deleting files that are part of the | ||
| // committed snapshot but commit still succeeds | ||
| tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit(); | ||
|
|
||
| // Refresh again to get the real metadata; the snapshot must be visible now | ||
| tableWithStaleRefresh.ops().refresh(); | ||
| Snapshot snapshot = tableWithStaleRefresh.currentSnapshot(); | ||
| assertThat(snapshot) | ||
| .as("Committed snapshot must be visible after refresh (eventual consistency resolved)") | ||
| .isNotNull(); | ||
|
|
||
| File metadata = Paths.get(tableDir.getPath(), "metadata").toFile(); | ||
| assertThat(snapshot.allManifests(tableWithStaleRefresh.io())) | ||
| .isNotEmpty() | ||
| .allSatisfy( | ||
| manifest -> assertThat(metadata.listFiles()).contains(new File(manifest.path()))); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a TableOperations that returns stale metadata (without the newly committed snapshot) on | ||
| * the first refresh() after a commit. This simulates eventual consistency where the committed | ||
| * snapshot is not yet visible. Used to verify that when the snapshot cannot be loaded after | ||
| * commit, cleanup is skipped to avoid accidentally deleting files that are part of the committed | ||
| * snapshot. | ||
| */ | ||
| private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit( | ||
| String name, File location) { | ||
| return new TestTables.TestTableOperations(name, location) { | ||
| private TableMetadata metadataToReturnOnNextRefresh; | ||
|
|
||
| @Override | ||
| public void commit(TableMetadata base, TableMetadata updatedMetadata) { | ||
| super.commit(base, updatedMetadata); | ||
| if (base != null) { | ||
| // return stale metadata on the first refresh() call | ||
| this.metadataToReturnOnNextRefresh = base; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public TableMetadata refresh() { | ||
| if (metadataToReturnOnNextRefresh != null) { | ||
| this.current = metadataToReturnOnNextRefresh; | ||
| this.metadataToReturnOnNextRefresh = null; | ||
| return current; | ||
| } | ||
|
|
||
| return super.refresh(); | ||
| } | ||
| }; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3659,7 +3659,7 @@ public void testNumLoadTableCallsForMergeAppend() { | |
| table.newAppend().appendFile(FILE_A).commit(); | ||
|
|
||
| // loadTable is executed once | ||
| Mockito.verify(adapter) | ||
| Mockito.verify(adapter, times(2)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current loads are to refresh prior to commit and to refresh after commit?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes, exactly |
||
| .execute(matches(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); | ||
|
|
||
| // CommitReport reflects the table state after the commit | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set the snapshot ID here? It should be available everywhere using
snapshotId().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really just restoring the behavior to how it was prior to 39373d0 so I didn't do any other modifications to the code other than adding a test