Flink: Enable multiple flink sinks for the same table in the same job#6528
Conversation
| public void initializeState(StateInitializationContext context) throws Exception { | ||
| super.initializeState(context); | ||
| this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); | ||
| this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID(); |
There was a problem hiding this comment.
This can be a problem. it gets the jobVertexId (a UUID like string). I verified it by setting the uidPrefix=iceberg-sink in TestFlinkIcebergSink.
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
...
.uidPrefix("iceberg-sink")
.append();
Then I can see the operatorUniqueId value as jobVertexId of 0e4eaa3db14ae3b6752b25172fd05c50. It can change when the job graph changed (e.g. adding a stateless map operator). Then we won't be able to match the operator id properly.
Ideally, we want to use the operator uid from Flink, which can be set by users. See the code from FlinkSink.
if (uidPrefix != null) {
committerStream = committerStream.uid(uidPrefix + "-committer");
}
if uid is not provided by application, Flink automatically generates a UUID by default. Setting uid for stateful operator (like Iceberg committer) is a best practice. Flink uses uid to match state description. if uid changed, old state won't be matched and restored. uid works perfectly for this purpose. But I am not sure what is the best way to retrieve the uid in the initializeState method.
I can see the operator name from debugger, which is also set by FlinkSink based on uidPrefix. It will be unique as uidPrefix should be unique. Nonetheless, it is still not the ideal candidate/replacement for uid though.

There was a problem hiding this comment.
I did more experiment. if the uidPrefix is set in FlinkSink, then it seems that getOperatorUniqueID does return a stable ID even if the job DAG changed.
if uidPrefix is not set in FlinkSink, DAG change leads to unstable/different values from getOperatorUniqueID. This is not really a problem, because it is the same situation if we use operator uid. we should always set uidPrefix for FlinkSink as best practice for stateful committer operator.
In summary, using getOperatorUniqueID seems acceptable to me.
There was a problem hiding this comment.
cc @hililiwei @openinx and see if they have other suggestions.
There was a problem hiding this comment.
operatorUniqueId is deterministaclly generated based on the uid of the operator which is used already by Flink to assign the state to this operator. This is competely correct.
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); | ||
| private static final String FLINK_JOB_ID = "flink.job-id"; | ||
| private static final String OPERATOR_UNIQUE_ID = "flink.operator-unique-id"; |
There was a problem hiding this comment.
nit: would flink.operator-id be sufficient? it also matches the job-id style. id usually implies unique.
There was a problem hiding this comment.
similarly, the constant name can be FLINK_OPERATOR_ID
stevenzwu
left a comment
There was a problem hiding this comment.
LGTM. left a nit comment on the naming for consistency.
|
Merged the PR. |
|
there is a problem, since the key of checkpoint state is unique, but two sinks will have the same checkpoint id? |
|
@chenwyi2: 2 sinks have a different operatorid |
Currently the Flink Iceberg commits are identifier by the
tableand theJobID. This could become problematic when there are multiple sinks for the same table for the same job.Let's imagine the following sequence of events:
notifyCheckpointComplete(99)commits checkpoint the changes to the Iceberg table, and marks99as the latest checkpoint which was committednotifyCheckpointComplete(99)commits checkpoint the changes to the Iceberg table, and marks99as the latest checkpoint which was committed100notifyCheckpointComplete(100)commits checkpoint the changes to the Iceberg table, and marks100as the latest checkpoint which was committed100, so nothing to do100, so nothing to do - Which is wrong as Sink2 never committed the relevant changes to the tableTo fix this added another identifier
operatorUniqueIdis added to the snapshot summary when checking for the commits, and this operatorId is also checked on recovery.The PR contains 2 additional unit tests to check the scenario.