Skip to content

Flink: Enable multiple flink sinks for the same table in the same job#6528

Merged
pvary merged 3 commits into
apache:masterfrom
pvary:multisink
Jan 6, 2023
Merged

Flink: Enable multiple flink sinks for the same table in the same job#6528
pvary merged 3 commits into
apache:masterfrom
pvary:multisink

Conversation

@pvary

@pvary pvary commented Jan 5, 2023

Copy link
Copy Markdown
Contributor

Currently the Flink Iceberg commits are identifier by the table and the JobID. This could become problematic when there are multiple sinks for the same table for the same job.

Let's imagine the following sequence of events:

  • Sink1 notifyCheckpointComplete(99) commits checkpoint the changes to the Iceberg table, and marks 99 as the latest checkpoint which was committed
  • Sink1 notifyCheckpointComplete(99) commits checkpoint the changes to the Iceberg table, and marks 99 as the latest checkpoint which was committed
  • New checkpoint is created with the id 100
  • Sink1 notifyCheckpointComplete(100) commits checkpoint the changes to the Iceberg table, and marks 100 as the latest checkpoint which was committed
  • Job is cancelled
  • Job is restarted
  • Sink1 sees that the latest checkpoint which is committed is 100, so nothing to do
  • Sink2 sees that the latest checkpoint which is committed is 100, so nothing to do - Which is wrong as Sink2 never committed the relevant changes to the table

To fix this added another identifier operatorUniqueId is added to the snapshot summary when checking for the commits, and this operatorId is also checked on recovery.

The PR contains 2 additional unit tests to check the scenario.

@pvary pvary requested a review from stevenzwu January 5, 2023 16:51
@github-actions github-actions Bot added the flink label Jan 5, 2023
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString();
this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a problem. it gets the jobVertexId (a UUID like string). I verified it by setting the uidPrefix=iceberg-sink in TestFlinkIcebergSink.

    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
        ...
        .uidPrefix("iceberg-sink")
        .append();

Then I can see the operatorUniqueId value as jobVertexId of 0e4eaa3db14ae3b6752b25172fd05c50. It can change when the job graph changed (e.g. adding a stateless map operator). Then we won't be able to match the operator id properly.

Ideally, we want to use the operator uid from Flink, which can be set by users. See the code from FlinkSink.

      if (uidPrefix != null) {
        committerStream = committerStream.uid(uidPrefix + "-committer");
      }

if uid is not provided by application, Flink automatically generates a UUID by default. Setting uid for stateful operator (like Iceberg committer) is a best practice. Flink uses uid to match state description. if uid changed, old state won't be matched and restored. uid works perfectly for this purpose. But I am not sure what is the best way to retrieve the uid in the initializeState method.

I can see the operator name from debugger, which is also set by FlinkSink based on uidPrefix. It will be unique as uidPrefix should be unique. Nonetheless, it is still not the ideal candidate/replacement for uid though.
image

@stevenzwu stevenzwu Jan 5, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did more experiment. if the uidPrefix is set in FlinkSink, then it seems that getOperatorUniqueID does return a stable ID even if the job DAG changed.

if uidPrefix is not set in FlinkSink, DAG change leads to unstable/different values from getOperatorUniqueID. This is not really a problem, because it is the same situation if we use operator uid. we should always set uidPrefix for FlinkSink as best practice for stateful committer operator.

In summary, using getOperatorUniqueID seems acceptable to me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @hililiwei @openinx and see if they have other suggestions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operatorUniqueId is deterministaclly generated based on the uid of the operator which is used already by Flink to assign the state to this operator. This is competely correct.

@gyfora gyfora left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm +1


private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
private static final String OPERATOR_UNIQUE_ID = "flink.operator-unique-id";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would flink.operator-id be sufficient? it also matches the job-id style. id usually implies unique.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, the constant name can be FLINK_OPERATOR_ID

@stevenzwu stevenzwu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. left a nit comment on the naming for consistency.

@pvary pvary merged commit 8828ac1 into apache:master Jan 6, 2023
@pvary

pvary commented Jan 6, 2023

Copy link
Copy Markdown
Contributor Author

Merged the PR.
Thanks for the review @stevenzwu and @gyfora!

@chenwyi2

chenwyi2 commented Sep 4, 2024

Copy link
Copy Markdown

there is a problem, since the key of checkpoint state is unique, but two sinks will have the same checkpoint id?

@pvary

pvary commented Sep 4, 2024

Copy link
Copy Markdown
Contributor Author

@chenwyi2: 2 sinks have a different operatorid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants