Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
private static final String OPERATOR_ID = "flink.operator-id";

// The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always
// increasing, so we could correctly commit all the data files whose checkpoint id is greater than
Expand Down Expand Up @@ -97,6 +98,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

// It will have an unique identifier for one job.
private transient String flinkJobId;
private transient String operatorUniqueId;
private transient Table table;
private transient IcebergFilesCommitterMetrics committerMetrics;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
Expand Down Expand Up @@ -134,6 +136,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString();
this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a problem. it gets the jobVertexId (a UUID like string). I verified it by setting the uidPrefix=iceberg-sink in TestFlinkIcebergSink.

    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
        ...
        .uidPrefix("iceberg-sink")
        .append();

Then I can see the operatorUniqueId value as jobVertexId of 0e4eaa3db14ae3b6752b25172fd05c50. It can change when the job graph changed (e.g. adding a stateless map operator). Then we won't be able to match the operator id properly.

Ideally, we want to use the operator uid from Flink, which can be set by users. See the code from FlinkSink.

      if (uidPrefix != null) {
        committerStream = committerStream.uid(uidPrefix + "-committer");
      }

if uid is not provided by application, Flink automatically generates a UUID by default. Setting uid for stateful operator (like Iceberg committer) is a best practice. Flink uses uid to match state description. if uid changed, old state won't be matched and restored. uid works perfectly for this purpose. But I am not sure what is the best way to retrieve the uid in the initializeState method.

I can see the operator name from debugger, which is also set by FlinkSink based on uidPrefix. It will be unique as uidPrefix should be unique. Nonetheless, it is still not the ideal candidate/replacement for uid though.
image

@stevenzwu stevenzwu Jan 5, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did more experiment. if the uidPrefix is set in FlinkSink, then it seems that getOperatorUniqueID does return a stable ID even if the job DAG changed.

if uidPrefix is not set in FlinkSink, DAG change leads to unstable/different values from getOperatorUniqueID. This is not really a problem, because it is the same situation if we use operator uid. we should always set uidPrefix for FlinkSink as best practice for stateful committer operator.

In summary, using getOperatorUniqueID seems acceptable to me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @hililiwei @openinx and see if they have other suggestions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operatorUniqueId is deterministaclly generated based on the uid of the operator which is used already by Flink to assign the state to this operator. This is competely correct.


// Open the table loader and load the table.
this.tableLoader.open();
Expand All @@ -147,7 +150,6 @@ public void initializeState(StateInitializationContext context) throws Exception

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
this.manifestOutputFileFactory =
FlinkManifestUtil.createOutputFileFactory(
table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
Expand Down Expand Up @@ -176,15 +178,17 @@ public void initializeState(StateInitializationContext context) throws Exception
// flink job even if it's restored from a snapshot created by another different flink job, so
// it's safe to assign the max committed checkpoint id from restored flink job to the current
// flink job.
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
this.maxCommittedCheckpointId =
getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);

NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
commitUpToCheckpoint(
uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, maxUncommittedCheckpointId);
}
}
}
Expand Down Expand Up @@ -226,7 +230,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
LOG.info(
Expand All @@ -237,7 +241,10 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
}

private void commitUpToCheckpoint(
NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId)
NavigableMap<Long, byte[]> deltaManifestsMap,
String newFlinkJobId,
String operatorId,
long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
Expand All @@ -257,7 +264,7 @@ private void commitUpToCheckpoint(
}

CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
Expand All @@ -267,14 +274,15 @@ private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
replacePartitions(pendingResults, summary, newFlinkJobId, checkpointId);
replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
} else {
commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
}
Expand Down Expand Up @@ -305,6 +313,7 @@ private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files.");
Expand All @@ -317,13 +326,19 @@ private void replacePartitions(
}

commitOperation(
dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, checkpointId);
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
checkpointId);
}

private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
Expand All @@ -334,7 +349,7 @@ private void commitDeltaTxn(
"Should have no referenced data files for append.");
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId);
commitOperation(appendFiles, summary, "append", newFlinkJobId, operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
Expand All @@ -355,7 +370,7 @@ private void commitDeltaTxn(

Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey());
commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey());
}
}
}
Expand All @@ -365,6 +380,7 @@ private void commitOperation(
CommitSummary summary,
String description,
String newFlinkJobId,
String operatorId,
long checkpointId) {
LOG.info(
"Committing {} for checkpoint {} to table {} with summary: {}",
Expand All @@ -377,6 +393,7 @@ private void commitOperation(
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
Expand All @@ -402,7 +419,7 @@ public void endInput() throws IOException {
dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
writeResultsOfCurrentCkpt.clear();

commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId);
}

/**
Expand Down Expand Up @@ -454,14 +471,16 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}

static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
Snapshot snapshot = table.currentSnapshot();
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
if (flinkJobId.equals(snapshotFlinkJobId)) {
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
Expand Down
Loading