Flink: Prevent recreation of ManifestOutputFileFactory during flushing#14358
Conversation
DynamicWriteResultAggregator uses the ManifestOutputFileFactory class to write a temporary manifest. For the Dynamic Sink we want to support writing to a vast amount of tables, even during a single checkpoint. So we avoid storing all factories and use a cache with an eviction policy. The problem is that if the factory for a given table is evicted during a checkpoint flush while there could still be writes for that factory being processed. In that case the same output directory will be generated again which leads to overwriting already written manifests files. We must avoid recreating the output file factory during checkpoint flushing. It is fine to drop the factories due to cache eviction afterwards, as the output paths for factories are scoped by checkpoint id.
|
Thanks for the fix @mxm! |
…e-creation during flushing
|
@mxm: Do we have the same issue for the writers? |
|
Maybe we could add a new parameter to the and if the UUID is provided then we can add it to the end of the filename, like: This would mean that the path generated by the normal sink is not change, but we can change it for every dynamically generated file names. |
Yes. The cache for RowDataTaskWriterFactory suffers from the same issue, albeit not as severe due it already using the LRUCache and not a time-based expiration. It uses an underlying OutputFileFactory which also uses an zero-based integer for the file suffix. It can be configured though to use a suffix, which is what we should do. |
Originally, I was a bit hesitant to change the file names, but we agreed on this approach going forward in #14358 (comment). I'll update the PR as suggested by you above to retain the same file names for the non-dynamic IcebergSink. |
Do we plan to fix that in another PR, or we fix the |
|
I would fix this here. I started drafting a solution for DynamicWriter, but I got stuck while testing. I need a bit more time. |
|
I conclude that a fix in DynamicWriter is not required. The reason is that the OutputFileFactory (in Iceberg core), in contrast to ManifestOutputFileFactory, is already scoped via a random uuid for the operation id: . We call it from here:I've pushed a test to verify that. |
The funny thing is that the unique part there is |
Yes, I discovered that when I tried using the suffix to insert a UUID, but the file path already contained a UUID which kept changing. I suppose we could replace the operator id from ManifestOutputFileFactory, but that's maybe something for another day. |
|
Merged to main. |
|
@huaxingao Thanks for the reminder. Here it is: #14571 |
…g flushing (apache#14385) backports apache#14358
…g flushing (apache#14385) backports apache#14358
DynamicWriteResultAggregator uses the ManifestOutputFileFactory class to write a temporary manifest. For the Dynamic Sink we want to support writing to a vast amount of tables, even during a single checkpoint. So we avoid storing all factories and use a cache with an eviction policy.
The problem is that if the factory for a given table is evicted during a checkpoint flush while there could still be writes for that factory being processed. In that case the same output directory will be generated again which leads to overwriting already written manifests files.
We must avoid recreating the output file factory during checkpoint flushing. It is fine to drop the factories due to cache eviction afterwards, as the output paths for factories are scoped by checkpoint id.