Skip to content

Flink: Supports delete orphan files in TableMaintenance#13302

Merged
pvary merged 21 commits into
apache:mainfrom
Guosmilesmile:deleteOrphanFiles
Aug 21, 2025
Merged

Flink: Supports delete orphan files in TableMaintenance#13302
pvary merged 21 commits into
apache:mainfrom
Guosmilesmile:deleteOrphanFiles

Conversation

@Guosmilesmile

Copy link
Copy Markdown
Contributor

This PR aims to add support for deleting orphan files in TableMaintenance for Flink.

The relevant design document can be found at https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?tab=t.0#heading=h.qirf2hu9bhwb

tableLoader.open();
Table table = tableLoader.loadTable();
Preconditions.checkArgument(
table.io() instanceof SupportsPrefixOperations,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, in Flink I only support SupportsPrefixOperations IO. I believe we can add support for all IO types in a separate PR. If necessary, we can address it in this PR as well, but that would make things more complex.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be problematic if we have really big tables. In this case all of the files are read into a big List, which could cause OOM.

I don't really have a nice solution for this, but I think it is important to check if we have a better solution for this.

Comment on lines +80 to +82
Table tableOri = tableLoader.loadTable();
this.table =
MetadataTableUtils.createMetadataTableInstance(tableOri, MetadataTableType.ALL_FILES);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract this from the operator, and provide this in the constructor.
It would be nice if this planner could remain independent of which table we read, and the query/table could be provided as a parameter

@Guosmilesmile Guosmilesmile force-pushed the deleteOrphanFiles branch 2 times, most recently from 800da96 to 9c0c33d Compare June 18, 2025 08:55
* @param pathFilter Filter to identify hidden paths
* @return List to collect matching file locations
*/
public static List<String> listDirRecursivelyWithFileIO(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these non-consumer based public APIs?

Comment thread core/src/main/java/org/apache/iceberg/util/FileSystemWalker.java
Comment thread core/src/main/java/org/apache/iceberg/util/FileSystemWalker.java Outdated

public FileURI() {}

public String getScheme() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... so we use Flink Java serialization?
Any better suggestion @mxm?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);

In order to follow the previous Spark approach, get and set methods are used here, and Java serialization is applied.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we implement our own FileURI encoder instead of using Encoders.bean?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting question to decide on.
We have metadata location Strings, and file system location Strings emitted from the downstream (FSList, MetadataList) operators. We need to serialize and shuffle them, so they matched by the path component, but need the scheme and the authority for matching, and uriAsString for deleting.

It is enough to have the uriAsString to travel on the wire. For the key, we need to get the path both on the emitter side, and on the AntiJoin side. It could be a fun exercise to check which performs better. OTOH the maintenance tasks are not the performance sensitive, so we can wait with the optimization until it is needed.

I would definitely opt for a Flink serializer for the FileURI class, and we can change it when it is needed. Also using Java serialization is one of the worst solution, so I would try to avoid it whenever it is not strictly needed. Especially that in this case FileURI should not change or the state will be corrupted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's this using Java serialization when the class here is not Serializable? If nothing is specified, Flink will try to use its PoJo serializer, but there are certain requirements like a no-arg constructor (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). Otherwise, we will fall back to the Kryo serialization framework, which is not the worst but of course a dedicated serializer is always going to be better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.. so Kryo, or own serializer then

* @return for chained calls
*/
public Builder prefixMismatchMode(
org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this will make it awkward for the users, as they always have to use fully qualified name, because of the name conflicts. No better idea yet, so just writing down for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the restriction is removed, it will be DeleteOrphanFiles.PrefixMismatchMode. The name DeleteOrphanFiles conflicts with a class in Flink.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compiles for me:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
    private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
    public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
      this.prefixMismatchMode = newPrefixMismatchMode;
      return this;
    }

Then, from another class, this works as well:

import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
      DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, this is indeed feasible.

* @param newCaseSensitive case-sensitive or not
* @return for chained calls
*/
public Builder caseSensitive(boolean newCaseSensitive) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check when would we need this? Seems strange, and I don't understand why would the user need to set this.

@Guosmilesmile Guosmilesmile Jun 21, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);

To use IcebergSourceSplitSerializer, the constructor requires.So I have to exposed to user.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please dig a bit deeper so we understand why the IcebergSourceSplitSerializer needs the caseSensitive flat, and how it is used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IcebergSourceSplit deserialization, caseSensitive needs to be passed down to the underlying layer. I’m not sure if this is to maintain compatibility with different source JSON systems or to support historical data formats. In toJson, caseSensitive is not required, but in fromJson, it needs to be passed.

IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can follow the usage of the caseSensitive in the fromJson method, and you can find, that it is used when deserializing the FileScanTask. Specifically when creating the filter (ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter, caseSensitive);)

ScanContext already contains caseSensitive. We can reuse it instead of requesting it again.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I will do it !

Comment on lines +78 to +79
FileURI valid = foundInTable.value();
FileURI actual = foundInFileSystem.value();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we get here if the 2 authorities are not exactly the same?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.. so the keyBy is done on the getPath. What happens if we have multiple versions in the metadata for the given path? Shall we check that too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the scenario where multiple versions exist for a given path should only occur when the authority changes.

In HDFS, authority changes would happen when foundInFileSystem detects a change (like a namenode-host change), and in that case, those files should not be deleted.
In S3, if the authority changes, I understand it happens when switching between different S3 endpoints or regions, and those files should also not be deleted.
I am mainly following Spark’s logic here:

Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
List<String> orphanFiles =
actualFileIdentDS
.joinWith(validFileIdentDS, joinCond, "leftouter")
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
.collectAsList();

Please let me know if there are any gaps or misunderstandings in my reasoning. Thank you!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very edge case, but let's say that the authority changes, so we have multiple versions of the same path in the table metadata. Also let's say that the equalAuthorities is configured incorrectly. In this case the result will be different based on the order the files are received by the AntiJoin.

Maybe when we receive the 2nd element from the metadata, we could immediately check if the 2 values are matching or, not.


@Override
public void processElement2(StreamRecord<Exception> element) throws Exception {
hasError.add(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it worth the complexity to start dropping filesToDelete immediately after we get an exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two streams are unordered. If an exception occurs in processElement2 and filesToDelete is cleared immediately, there is no guarantee that filesToDelete will be empty when processWatermark is called.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant:

  • If there is an error, then filesToDelete.empty(), and set a boolean flag, that new values are also dropped.
  • We still keep the exception in state, and handling the watermark is the same, or just emit all filesToDelete, since they are empty anyway when there is an error

Comment on lines +99 to +105
iterator.forEachRemaining(
rowData -> {
if (rowData != null && rowData.getString(0) != null) {
out.collect(rowData.getString(0).toString());
}
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically removes the generality of the TableReader. We extract the value of the first column of the row, and emit it as a string.
Do we want to create a parent class as TableReader<R> which gets a parameter (rowData, Collector<R>)->{} method to extract the values, or call this class differently, like FileNameReader for now, and do this if we reuse it in the ManifestRewrite flow?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can first implement an initial version according to Plan One, and then make adjustments based on the specific situation when rewriting the manifest.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is Plan One in this case? 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a parent class as TableReader which gets a parameter (rowData, Collector)->{} method to extract the values

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sounds good. Go ahead and do this.

@Guosmilesmile

Copy link
Copy Markdown
Contributor Author

Rebase into main. Since #13429 merged, I think we can push forword this pr .
@pvary If you have time ,please help to review it again, thanks!

GuoYu.

@pvary

pvary commented Aug 12, 2025

Copy link
Copy Markdown
Contributor

@mxm: One final check?

@mxm

mxm commented Aug 12, 2025

Copy link
Copy Markdown
Contributor

@pvary Ack. Taking a look 👀


public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> {
private String location = null;
private Duration minAge = Duration.ofDays(3);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of arbitrary. Should we bump this to a week at least?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 3-day configuration is aligned with the Spark part. https://iceberg.incubator.apache.org/docs/nightly/spark-procedures/#remove_orphan_files. So I keep the same value.

public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> {
private String location = null;
private Duration minAge = Duration.ofDays(3);
private int planningWorkerPoolSize = 10;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull the default from an existing value for the default worker pool size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use ThreadPools.WORKER_THREAD_POOL_SIZE to install of use a number . Change it .

private Duration minAge = Duration.ofDays(3);
private int planningWorkerPoolSize = 10;
private int deleteBatchSize = 1000;
private int maxListingDepth = 3;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this set so low by default?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration is aligned with the Spark part. So I keep the same value here.

private int planningWorkerPoolSize = 10;
private int deleteBatchSize = 1000;
private int maxListingDepth = 3;
private int maxListingDirectSubDirs = 10;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to limit the total number of files instead of the subdirectories?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Limiting ListingDirectSubDirs and ListingDepth is provided for distributed tasks. However, if we use MaxFilesCount, in my view, we can't further refine the tasks.
  2. On the Spark side, the above approach is used to distribute the folders that cannot be processed from the driver to the workers for distributed searching. But currently, the Flink side implementation is only within a single operator, so I don't think there is much impact here for now.
    WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand (1). Why does a max file count limit the refinement? Doesn't the same apply to filtering by directory depth and number of subdirectories?

Concerning (2), wouldn't we benefit in Flink from processing a limited number of files per table maintenance? Perhaps, this should be a global option, rather than a DeleteOrphan files option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean. If we add a maximum file limit here, it indeed can shorten the execution time of a task, which is equivalent to checking whether some system files are orphan files. After thinking it through, I believe it is necessary. If we are to add this configuration item, I think it should be added at the core level.

Can we not support this feature in this PR, but instead open another PR to design and implement it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be a bit hesitant on limiting the number of files listed by the DeleteOrphanFiles.
The listing needs to list all of the files in the table directory. If we limit the number of files returned, then it would cause erratic behavior - the job runs and succeeds, but the orphan files are not removed. But some cases they might be removed depending on the listing.

If we fear that the listing is too big for a single task, we could use the Spark approach, and distribute the remaining dirs to a downstream operator. I believe that this is a worthwhile improvement, but probably worth to postpone to the next PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary @mxm I think it's feasible to distribute the remaining directories to a downstream operator, and we can do that in the next PR.
Are there any other issues here that we need to address? Can we move forward with this pr?
Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good @Guosmilesmile, but could you explain why we need the limits for listing depth and direct subdirectories? I understand that we want to have some safeguards, but why are these limits so low?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm Because these two configurations can be set by users, I aligned the default values with the Spark part when setting them.

private static final int MAX_DRIVER_LISTING_DEPTH = 3;
private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to highlight where those defaults are coming from?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will do it now !

@Override
public void open(OpenContext openContext) throws Exception {
tableLoader.open();
Table tableOri = tableLoader.loadTable();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this originalTable?
It is usually better to not use abbreviations

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it .

Comment on lines +120 to +139
FileSystemWalker.listDirRecursivelyWithHadoop(
location,
specs,
predicate,
configuration,
maxListingDepth,
maxListingDirectSubDirs,
remainingSubDirs::add,
out::collect);
for (String remainingSubDir : remainingSubDirs) {
FileSystemWalker.listDirRecursivelyWithHadoop(
remainingSubDir,
specs,
predicate,
configuration,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
dir -> {},
out::collect);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not:

          FileSystemWalker.listDirRecursivelyWithHadoop(
              location,
              specs,
              predicate,
              configuration,
              Integer.MAX_VALUE,
              Integer.MAX_VALUE,
              dir -> {},
              out::collect);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we don't need these configurations for now

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. It was carried over when we reused it from Spark. The consideration was not thorough enough....I will remove it . Sorry.

out.collect(new SplitInfo(splitSerializer.getVersion(), splitSerializer.serialize(split)));
}
} catch (Exception e) {
LOG.error("Exception planning scan for {} at {}", table, ctx.timestamp(), e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be warn. The Job will not stop, but we would like to highlight this to the maintainer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fix it now.

}
}
} catch (Exception e) {
LOG.error("Exception listing files for {} at {}", location, ctx.timestamp(), e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be warn. The Job will not stop, but we would like to highlight this to the maintainer

FileURI fileUri = new FileURI(new Path(value).toUri(), equalSchemes, equalAuthorities);
return fileUri.getPath();
} catch (Exception e) {
LOG.error("Uri convert to FileURI error! Uri is {}.", value, e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be warn. The Job will not stop, but we would like to highlight this to the maintainer

public void close() throws Exception {
super.close();
tableLoader.close();
workerPool.shutdown();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a null check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add a check for it .

try (DataIterator<RowData> iterator = rowDataReaderFunction.createDataIterator(split)) {
iterator.forEachRemaining(rowData -> extract(rowData, out));
} catch (Exception e) {
LOG.error("Exception processing split {} at {}", split, ctx.timestamp(), e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn

Comment on lines +41 to +44
@Parameters(name = "usePrefixListing={0}")
public static List<Object[]> parameters() {
return Arrays.asList(new Object[] {true}, new Object[] {false});
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  @Parameters(name = "usePrefixListing = {0}")
    private static Object[][] parameters() {
        return new Object[][] {{true}, {false}};
    }

@pvary pvary merged commit 7d220eb into apache:main Aug 21, 2025
18 checks passed
@pvary

pvary commented Aug 21, 2025

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks for the PR and the many updates @Guosmilesmile and @mxm for the review!

@Guosmilesmile

Copy link
Copy Markdown
Contributor Author

@pvary @mxm Thank you very much for your hard work in the review and your guidance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants