Core: Make sequence number conflicts retryable in concurrent commits#15126
Conversation
38e36ca to
59a899f
Compare
5f76559 to
b245667
Compare
|
Thanks @agnes-xinyi-lu will take a look with fresh eyes tomorrow. |
b245667 to
5ace567
Compare
|
@amogh-jahagirdar @singhpk234 any more comments? Could we get this in this week? |
|
@amogh-jahagirdar @singhpk234 can you take another look ? |
There was a problem hiding this comment.
Thanks @agnes-xinyi-lu , I think this looks good, I just have some suggestions on how to better improve the test so we can have some stronger assertions, but I think it's reasonable as is. I think we should get some other reviews as well
I've also went ahead and added it to the 1.11 milestone since I think it's an important improvement.
| import org.apache.iceberg.exceptions.ValidationException; | ||
|
|
||
| /** | ||
| * A {@link ValidationException} that indicates the client can retry with refreshed metadata. |
There was a problem hiding this comment.
This is a bit too specific to the REST context because it mentions the client.
I think that this should focus on what the problem is: a validation failed but the commit can be fixed and retried. This is specifically not a conflict.
| // Wrap in ValidationFailureException to skip server retry, return to client as | ||
| // CommitFailedException so the client can retry with refreshed metadata. | ||
| throw new ValidationFailureException( | ||
| new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); |
There was a problem hiding this comment.
This should not rethrow a different exception. Instead, add RetryableValidationException to exceptions passed to Tasks.onlyRetryOn.
Also, this commit message is incorrect. We are not throwing CommitFailedException because there was not a commit conflict.
There was a problem hiding this comment.
@rdblue this exception is only retryable on the client side, server side retries will not help because snapshot is provided by the client side.
If we want to change SnapshotProducer to retry on this RetryableValidationException, then should we create a different status code in the SPEC other than 409 which is CommitFailedException? Or should we keep the wrapping of CommitFailedException(but update message to something more specific like Retryable validation failure) and avoid the spec change?
There was a problem hiding this comment.
@rdblue can you please take a look again? It's been a while and we really want to get the fix in. Thanks!
There was a problem hiding this comment.
I talked to @rdblue yesterday, I think we're now all on the same page that the current approach is best as is. We technically could explore other HTTP status codes (the best example I came up with is 412) but the problem is there's going to be all kinds of older clients that wouldn't be able to handle that but all those clients would be able to handle 409s. Kicking back a commit failed to a client with a 409 is probably the most general solution that's also not very intrusive.
If we could update the tests (commented below), I think we'd be good to go ahead on this change.
There was a problem hiding this comment.
Thanks so much, @amogh-jahagirdar! I've updated the PR with the tests from your other comment — really appreciate you taking the time to rewrite them.
24c66b4 to
69eeca3
Compare
| * (non-retryable) 3. The REST catalog properly handles concurrent modifications across branches | ||
| */ | ||
| @Test | ||
| public void testConcurrentAppendsOnMultipleBranches() { |
There was a problem hiding this comment.
I think that this test is much too complicated. Why can't the narrow scenario you're trying to test be exercised with specific request objects?
There was a problem hiding this comment.
This is my bad, I suggested this test originally because I wanted a more realistic test but after looking with fresh eyes it is more complex than it's worth. At it the essence, what we want to verify is that in the scenario of a conflicting sequence number, that the client gets back a commit failed exception and not a validation exception. We technically don't need N commits on different branches and assert how many retries really happened etc to prove that. The trick is in how do we actually simulate a single concurrent backend commit to another branch which advances the seq number which we can achieve through a spy on the adapter (which we do in the other tests anyways) which invokes the fake concurrent commit on an actual commit. Here is what I have
@Test
public void testSequenceNumberConflictThrowsCommitFailed() {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog = catalog(adapter);
catalog.createNamespace(TABLE.namespace());
catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create();
DataFile fileOnMain =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-file1.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
catalog.loadTable(TABLE).newFastAppend().appendFile(fileOnMain).commit();
DataFile fileOnAnotherBranch =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-conflicting.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
// Before the next commit is processed by the server, advance the server's lastSequenceNumber
// by committing to a different branch. This simulates a concurrent request to a different branch
// that "beats" the commit to main.
Mockito.doAnswer(
invocation -> {
backendCatalog
.loadTable(TABLE)
.newFastAppend()
.appendFile(fileOnAnotherBranch)
.toBranch("other")
.commit();
return invocation.callRealMethod();
})
.when(adapter)
.execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), any(), any(), any());
DataFile anotherFileOnMain =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-file2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
assertThatThrownBy(
() -> catalog.loadTable(TABLE).newFastAppend().appendFile(anotherFileOnMain).commit())
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Validation failed, please retry");
}
There was a problem hiding this comment.
I do think though we should have 2 unit tests in TestTableMetadata, 1 which asserts a RetryableValidationException is thrown in case of row ID assignment validation failure and the other for a sequence number case. The test in TestRestCatalog asserts what a real client would see in these cases (a CommitFailedException).
These are the tests I have for TestTableMetadata:
@Test
public void testAddSnapshotWithStaleSequenceNumberIsRetryable() {
TableMetadata base = TableMetadata.newTableMetadata(
TEST_SCHEMA, PartitionSpec.unpartitioned(), "location", ImmutableMap.of());
// Advance lastSequenceNumber to 1 by adding a root snapshot
Snapshot s1 = new BaseSnapshot(
1, 1L, null, System.currentTimeMillis(), null, null, null, "file:/s1.avro", null, null, null);
TableMetadata withS1 = TableMetadata.buildFrom(base).addSnapshot(s1).build();
// A snapshot with seqNum=1 and non-null parentId is stale (1 is not > lastSequenceNumber=1)
Snapshot staleSnapshot = new BaseSnapshot(
1, 2L, 1L, System.currentTimeMillis(), null, null, null, "file:/s2.avro", null, null, null);
assertThatThrownBy(() -> TableMetadata.buildFrom(withS1).addSnapshot(staleSnapshot))
.isInstanceOf(RetryableValidationException.class)
.hasMessageContaining("Cannot add snapshot with sequence number");
}
@Test
public void testAddSnapshotWithStaleFirstRowIdIsRetryable() {
TableMetadata base = TableMetadata.newTableMetadata(
TEST_SCHEMA, PartitionSpec.unpartitioned(), "location",
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"));
// Advance nextRowId to 5 by adding a snapshot that allocates 5 rows
Snapshot s1 = new BaseSnapshot(
1, 1L, null, System.currentTimeMillis(), null, null, null, "file:/s1.avro", 0L, 5L, null);
TableMetadata withS1 = TableMetadata.buildFrom(base).addSnapshot(s1).build();
// nextRowId is now 5
// A snapshot with firstRowId=0 is stale (0 < nextRowId=5)
Snapshot staleSnapshot = new BaseSnapshot(
2, 2L, 1L, System.currentTimeMillis(), null, null, null, "file:/s2.avro", 0L, 1L, null);
assertThatThrownBy(() -> TableMetadata.buildFrom(withS1).addSnapshot(staleSnapshot))
.isInstanceOf(RetryableValidationException.class)
.hasMessageContaining("Cannot add a snapshot, first-row-id is behind table next-row-id");
}
When multiple processes concurrently commit to different branches of the
same table through the REST catalog, sequence number validation failures
in TableMetadata.addSnapshot() were throwing non-retryable ValidationException
instead of retryable CommitFailedException.
This fix catches the sequence number validation error in CatalogHandlers.commit()
and wraps it in ValidationFailureException(CommitFailedException) to:
- Skip server-side retry (which won't help since sequence number is in the request)
- Return CommitFailedException to the client so it can retry with refreshed metadata
…ents for the new exception
69eeca3 to
675fe5a
Compare
|
Thanks for following through and your patience @agnes-xinyi-lu , I'll go ahead and merge. Thank you @Parth-Brahmbhatt @rdblue @singhpk234 for reviewing |
When multiple processes concurrently commit to different branches of the
same table through the REST catalog, sequence number validation failures
in TableMetadata.addSnapshot() were throwing non-retryable ValidationException
instead of retryable CommitFailedException.
This fix catches the sequence number validation error in CatalogHandlers.commit()
and wraps it in ValidationFailureException(CommitFailedException) to:
- Skip server-side retry (which won't help since sequence number is in the request)
- Return CommitFailedException to the client so it can retry with refreshed metadata
Issue #15001