Skip to content

Flink: Prevent recreation of ManifestOutputFileFactory during flushing#14358

Merged
pvary merged 5 commits into
apache:mainfrom
mxm:deltamanifest-file-scoping
Oct 20, 2025
Merged

Flink: Prevent recreation of ManifestOutputFileFactory during flushing#14358
pvary merged 5 commits into
apache:mainfrom
mxm:deltamanifest-file-scoping

Conversation

@mxm

@mxm mxm commented Oct 17, 2025

Copy link
Copy Markdown
Contributor

DynamicWriteResultAggregator uses the ManifestOutputFileFactory class to write a temporary manifest. For the Dynamic Sink we want to support writing to a vast amount of tables, even during a single checkpoint. So we avoid storing all factories and use a cache with an eviction policy.

The problem is that if the factory for a given table is evicted during a checkpoint flush while there could still be writes for that factory being processed. In that case the same output directory will be generated again which leads to overwriting already written manifests files.

We must avoid recreating the output file factory during checkpoint flushing. It is fine to drop the factories due to cache eviction afterwards, as the output paths for factories are scoped by checkpoint id.

DynamicWriteResultAggregator uses the ManifestOutputFileFactory class to write a
temporary manifest. For the Dynamic Sink we want to support writing to a vast
amount of tables, even during a single checkpoint. So we avoid storing all
factories and use a cache with an eviction policy.

The problem is that if the factory for a given table is evicted during a
checkpoint flush while there could still be writes for that factory being processed. In
that case the same output directory will be generated again which leads to
overwriting already written manifests files.

We must avoid recreating the output file factory during checkpoint
flushing. It is fine to drop the factories due to cache eviction afterwards,
as the output paths for factories are scoped by checkpoint id.
@pvary

pvary commented Oct 17, 2025

Copy link
Copy Markdown
Contributor

Thanks for the fix @mxm!
Good catch, and good analysis!

@pvary pvary added this to the Iceberg 1.10.1 milestone Oct 17, 2025
@github-actions github-actions Bot added the build label Oct 17, 2025
@pvary

pvary commented Oct 20, 2025

Copy link
Copy Markdown
Contributor

@mxm: Do we have the same issue for the writers?

@pvary

pvary commented Oct 20, 2025

Copy link
Copy Markdown
Contributor

Maybe we could add a new parameter to the ManifestOutputFileFactory constructor, like:

  ManifestOutputFileFactory(
      Supplier<Table> tableSupplier,
      Map<String, String> props,
      String flinkJobId,
      String operatorUniqueId,
      int subTaskId,
      long attemptNumber,
      UUID uniqueId) {

and if the UUID is provided then we can add it to the end of the filename, like:

  private String generatePath(long checkpointId) {
    return FileFormat.AVRO.addExtension(
        String.format(
            Locale.ROOT,
            "%s-%s-%05d-%d-%d-%05d-%s",
            flinkJobId,
            operatorUniqueId,
            subTaskId,
            attemptNumber,
            checkpointId,
            fileCount.incrementAndGet(),
            uniqueId));
  }

This would mean that the path generated by the normal sink is not change, but we can change it for every dynamically generated file names.

@mxm

mxm commented Oct 20, 2025

Copy link
Copy Markdown
Contributor Author

@mxm: Do we have the same issue for the writers?

Yes. The cache for RowDataTaskWriterFactory suffers from the same issue, albeit not as severe due it already using the LRUCache and not a time-based expiration. It uses an underlying OutputFileFactory which also uses an zero-based integer for the file suffix. It can be configured though to use a suffix, which is what we should do.

@mxm

mxm commented Oct 20, 2025

Copy link
Copy Markdown
Contributor Author

Maybe we could add a new parameter to the ManifestOutputFileFactory constructor, like:

  ManifestOutputFileFactory(
      Supplier<Table> tableSupplier,
      Map<String, String> props,
      String flinkJobId,
      String operatorUniqueId,
      int subTaskId,
      long attemptNumber,
      UUID uniqueId) {

and if the UUID is provided then we can add it to the end of the filename, like:

  private String generatePath(long checkpointId) {
    return FileFormat.AVRO.addExtension(
        String.format(
            Locale.ROOT,
            "%s-%s-%05d-%d-%d-%05d-%s",
            flinkJobId,
            operatorUniqueId,
            subTaskId,
            attemptNumber,
            checkpointId,
            fileCount.incrementAndGet(),
            uniqueId));
  }

This would mean that the path generated by the normal sink is not change, but we can change it for every dynamically generated file names.

Originally, I was a bit hesitant to change the file names, but we agreed on this approach going forward in #14358 (comment). I'll update the PR as suggested by you above to retain the same file names for the non-dynamic IcebergSink.

@pvary

pvary commented Oct 20, 2025

Copy link
Copy Markdown
Contributor

@mxm: Do we have the same issue for the writers?

Yes. The cache for RowDataTaskWriterFactory suffers from the same issue, albeit not as severe due it already using the LRUCache and not a time-based expiration. It uses an underlying OutputFileFactory which also uses an zero-based integer for the file suffix. It can be configured though to use a suffix, which is what we should do.

Do we plan to fix that in another PR, or we fix the DynamicWriter in this PR?

@mxm

mxm commented Oct 20, 2025

Copy link
Copy Markdown
Contributor Author

I would fix this here. I started drafting a solution for DynamicWriter, but I got stuck while testing. I need a bit more time.

@mxm

mxm commented Oct 20, 2025

Copy link
Copy Markdown
Contributor Author

I conclude that a fix in DynamicWriter is not required. The reason is that the OutputFileFactory (in Iceberg core), in contrast to ManifestOutputFileFactory, is already scoped via a random uuid for the operation id:

this.operationId = UUID.randomUUID().toString();
. We call it from here:
OutputFileFactory.builderFor(table, taskId, attemptId)

I've pushed a test to verify that.

@pvary

pvary commented Oct 20, 2025

Copy link
Copy Markdown
Contributor

I conclude that a fix in DynamicWriter is not required. The reason is that the OutputFileFactory (in Iceberg core), in contrast to ManifestOutputFileFactory, is already scoped via a random uuid for the operation id:

this.operationId = UUID.randomUUID().toString();

. We call it from here:

OutputFileFactory.builderFor(table, taskId, attemptId)

I've pushed a test to verify that.

The funny thing is that the unique part there is operationId and the suffix is different (not used).

@pvary pvary left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending tests

@mxm

mxm commented Oct 20, 2025

Copy link
Copy Markdown
Contributor Author

The funny thing is that the unique part there is operationId and the suffix is different (not used).

Yes, I discovered that when I tried using the suffix to insert a UUID, but the file path already contained a UUID which kept changing. I suppose we could replace the operator id from ManifestOutputFileFactory, but that's maybe something for another day.

@pvary pvary merged commit 911a486 into apache:main Oct 20, 2025
18 checks passed
@pvary

pvary commented Oct 20, 2025

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks for the fix @mxm, for @amogh-jahagirdar for the review and @zncleon for reporting it!

@mxm mxm deleted the deltamanifest-file-scoping branch October 21, 2025 08:47
mxm added a commit to mxm/iceberg that referenced this pull request Oct 21, 2025
mxm added a commit to mxm/iceberg that referenced this pull request Oct 21, 2025
pvary pushed a commit that referenced this pull request Oct 21, 2025
@huaxingao

Copy link
Copy Markdown
Contributor

@mxm Could you please back-port both this change and #14385 to 1.10.x? Thanks a lot!

@mxm

mxm commented Nov 12, 2025

Copy link
Copy Markdown
Contributor Author

@huaxingao Thanks for the reminder. Here it is: #14571

thomaschow pushed a commit to thomaschow/iceberg that referenced this pull request Jan 19, 2026
thomaschow pushed a commit to thomaschow/iceberg that referenced this pull request Jan 19, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants