Flink: Supports delete orphan files in TableMaintenance#13302
Conversation
| tableLoader.open(); | ||
| Table table = tableLoader.loadTable(); | ||
| Preconditions.checkArgument( | ||
| table.io() instanceof SupportsPrefixOperations, |
There was a problem hiding this comment.
Currently, in Flink I only support SupportsPrefixOperations IO. I believe we can add support for all IO types in a separate PR. If necessary, we can address it in this PR as well, but that would make things more complex.
There was a problem hiding this comment.
This could be problematic if we have really big tables. In this case all of the files are read into a big List, which could cause OOM.
I don't really have a nice solution for this, but I think it is important to check if we have a better solution for this.
| Table tableOri = tableLoader.loadTable(); | ||
| this.table = | ||
| MetadataTableUtils.createMetadataTableInstance(tableOri, MetadataTableType.ALL_FILES); |
There was a problem hiding this comment.
Maybe extract this from the operator, and provide this in the constructor.
It would be nice if this planner could remain independent of which table we read, and the query/table could be provided as a parameter
800da96 to
9c0c33d
Compare
| * @param pathFilter Filter to identify hidden paths | ||
| * @return List to collect matching file locations | ||
| */ | ||
| public static List<String> listDirRecursivelyWithFileIO( |
There was a problem hiding this comment.
Do we need these non-consumer based public APIs?
|
|
||
| public FileURI() {} | ||
|
|
||
| public String getScheme() { |
There was a problem hiding this comment.
we usually avoid get, set. See: https://iceberg.apache.org/contribute/#method-naming
There was a problem hiding this comment.
Hmm... so we use Flink Java serialization?
Any better suggestion @mxm?
There was a problem hiding this comment.
In order to follow the previous Spark approach, get and set methods are used here, and Java serialization is applied.
There was a problem hiding this comment.
Or should we implement our own FileURI encoder instead of using Encoders.bean?
There was a problem hiding this comment.
That's an interesting question to decide on.
We have metadata location Strings, and file system location Strings emitted from the downstream (FSList, MetadataList) operators. We need to serialize and shuffle them, so they matched by the path component, but need the scheme and the authority for matching, and uriAsString for deleting.
It is enough to have the uriAsString to travel on the wire. For the key, we need to get the path both on the emitter side, and on the AntiJoin side. It could be a fun exercise to check which performs better. OTOH the maintenance tasks are not the performance sensitive, so we can wait with the optimization until it is needed.
I would definitely opt for a Flink serializer for the FileURI class, and we can change it when it is needed. Also using Java serialization is one of the worst solution, so I would try to avoid it whenever it is not strictly needed. Especially that in this case FileURI should not change or the state will be corrupted.
There was a problem hiding this comment.
How's this using Java serialization when the class here is not Serializable? If nothing is specified, Flink will try to use its PoJo serializer, but there are certain requirements like a no-arg constructor (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). Otherwise, we will fall back to the Kryo serialization framework, which is not the worst but of course a dedicated serializer is always going to be better.
There was a problem hiding this comment.
My bad.. so Kryo, or own serializer then
| * @return for chained calls | ||
| */ | ||
| public Builder prefixMismatchMode( | ||
| org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) { |
There was a problem hiding this comment.
Using this will make it awkward for the users, as they always have to use fully qualified name, because of the name conflicts. No better idea yet, so just writing down for now.
There was a problem hiding this comment.
Why do we need to use fully qualified name here? Which other class clashes with PrefixMismatchMode?
There was a problem hiding this comment.
If the restriction is removed, it will be DeleteOrphanFiles.PrefixMismatchMode. The name DeleteOrphanFiles conflicts with a class in Flink.
There was a problem hiding this comment.
This compiles for me:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
...
public Builder prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
this.prefixMismatchMode = newPrefixMismatchMode;
return this;
}Then, from another class, this works as well:
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
...
DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);There was a problem hiding this comment.
Thank you very much, this is indeed feasible.
| * @param newCaseSensitive case-sensitive or not | ||
| * @return for chained calls | ||
| */ | ||
| public Builder caseSensitive(boolean newCaseSensitive) { |
There was a problem hiding this comment.
Could you please check when would we need this? Seems strange, and I don't understand why would the user need to set this.
There was a problem hiding this comment.
To use IcebergSourceSplitSerializer, the constructor requires.So I have to exposed to user.
There was a problem hiding this comment.
Could you please dig a bit deeper so we understand why the IcebergSourceSplitSerializer needs the caseSensitive flat, and how it is used?
There was a problem hiding this comment.
In IcebergSourceSplit deserialization, caseSensitive needs to be passed down to the underlying layer. I’m not sure if this is to maintain compatibility with different source JSON systems or to support historical data formats. In toJson, caseSensitive is not required, but in fromJson, it needs to be passed.
IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
There was a problem hiding this comment.
You can follow the usage of the caseSensitive in the fromJson method, and you can find, that it is used when deserializing the FileScanTask. Specifically when creating the filter (ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter, caseSensitive);)
ScanContext already contains caseSensitive. We can reuse it instead of requesting it again.
There was a problem hiding this comment.
You are right, I will do it !
| FileURI valid = foundInTable.value(); | ||
| FileURI actual = foundInFileSystem.value(); |
There was a problem hiding this comment.
How do we get here if the 2 authorities are not exactly the same?
There was a problem hiding this comment.
Ok.. so the keyBy is done on the getPath. What happens if we have multiple versions in the metadata for the given path? Shall we check that too?
There was a problem hiding this comment.
I understand that the scenario where multiple versions exist for a given path should only occur when the authority changes.
In HDFS, authority changes would happen when foundInFileSystem detects a change (like a namenode-host change), and in that case, those files should not be deleted.
In S3, if the authority changes, I understand it happens when switching between different S3 endpoints or regions, and those files should also not be deleted.
I am mainly following Spark’s logic here:
Please let me know if there are any gaps or misunderstandings in my reasoning. Thank you!
There was a problem hiding this comment.
Very edge case, but let's say that the authority changes, so we have multiple versions of the same path in the table metadata. Also let's say that the equalAuthorities is configured incorrectly. In this case the result will be different based on the order the files are received by the AntiJoin.
Maybe when we receive the 2nd element from the metadata, we could immediately check if the 2 values are matching or, not.
|
|
||
| @Override | ||
| public void processElement2(StreamRecord<Exception> element) throws Exception { | ||
| hasError.add(true); |
There was a problem hiding this comment.
Would it worth the complexity to start dropping filesToDelete immediately after we get an exception?
There was a problem hiding this comment.
The two streams are unordered. If an exception occurs in processElement2 and filesToDelete is cleared immediately, there is no guarantee that filesToDelete will be empty when processWatermark is called.
There was a problem hiding this comment.
What I meant:
- If there is an error, then
filesToDelete.empty(), and set a boolean flag, that new values are also dropped. - We still keep the exception in state, and handling the watermark is the same, or just emit all
filesToDelete, since they are empty anyway when there is an error
| iterator.forEachRemaining( | ||
| rowData -> { | ||
| if (rowData != null && rowData.getString(0) != null) { | ||
| out.collect(rowData.getString(0).toString()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
This is basically removes the generality of the TableReader. We extract the value of the first column of the row, and emit it as a string.
Do we want to create a parent class as TableReader<R> which gets a parameter (rowData, Collector<R>)->{} method to extract the values, or call this class differently, like FileNameReader for now, and do this if we reuse it in the ManifestRewrite flow?
There was a problem hiding this comment.
I think we can first implement an initial version according to Plan One, and then make adjustments based on the specific situation when rewriting the manifest.
There was a problem hiding this comment.
What is Plan One in this case? 😄
There was a problem hiding this comment.
Create a parent class as TableReader which gets a parameter (rowData, Collector)->{} method to extract the values
There was a problem hiding this comment.
Ok. Sounds good. Go ahead and do this.
b167841 to
4e64a54
Compare
|
@mxm: One final check? |
|
@pvary Ack. Taking a look 👀 |
|
|
||
| public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> { | ||
| private String location = null; | ||
| private Duration minAge = Duration.ofDays(3); |
There was a problem hiding this comment.
This is kind of arbitrary. Should we bump this to a week at least?
There was a problem hiding this comment.
The 3-day configuration is aligned with the Spark part. https://iceberg.incubator.apache.org/docs/nightly/spark-procedures/#remove_orphan_files. So I keep the same value.
| public static class Builder extends MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> { | ||
| private String location = null; | ||
| private Duration minAge = Duration.ofDays(3); | ||
| private int planningWorkerPoolSize = 10; |
There was a problem hiding this comment.
Can we pull the default from an existing value for the default worker pool size?
There was a problem hiding this comment.
We can use ThreadPools.WORKER_THREAD_POOL_SIZE to install of use a number . Change it .
| private Duration minAge = Duration.ofDays(3); | ||
| private int planningWorkerPoolSize = 10; | ||
| private int deleteBatchSize = 1000; | ||
| private int maxListingDepth = 3; |
There was a problem hiding this comment.
Why is this set so low by default?
There was a problem hiding this comment.
This configuration is aligned with the Spark part. So I keep the same value here.
| private int planningWorkerPoolSize = 10; | ||
| private int deleteBatchSize = 1000; | ||
| private int maxListingDepth = 3; | ||
| private int maxListingDirectSubDirs = 10; |
There was a problem hiding this comment.
Would it make sense to limit the total number of files instead of the subdirectories?
There was a problem hiding this comment.
- Limiting
ListingDirectSubDirsandListingDepthis provided for distributed tasks. However, if we useMaxFilesCount, in my view, we can't further refine the tasks. - On the Spark side, the above approach is used to distribute the folders that cannot be processed from the driver to the workers for distributed searching. But currently, the Flink side implementation is only within a single operator, so I don't think there is much impact here for now.
WDYT?
There was a problem hiding this comment.
I don't understand (1). Why does a max file count limit the refinement? Doesn't the same apply to filtering by directory depth and number of subdirectories?
Concerning (2), wouldn't we benefit in Flink from processing a limited number of files per table maintenance? Perhaps, this should be a global option, rather than a DeleteOrphan files option.
There was a problem hiding this comment.
I understand what you mean. If we add a maximum file limit here, it indeed can shorten the execution time of a task, which is equivalent to checking whether some system files are orphan files. After thinking it through, I believe it is necessary. If we are to add this configuration item, I think it should be added at the core level.
Can we not support this feature in this PR, but instead open another PR to design and implement it?
There was a problem hiding this comment.
I would be a bit hesitant on limiting the number of files listed by the DeleteOrphanFiles.
The listing needs to list all of the files in the table directory. If we limit the number of files returned, then it would cause erratic behavior - the job runs and succeeds, but the orphan files are not removed. But some cases they might be removed depending on the listing.
If we fear that the listing is too big for a single task, we could use the Spark approach, and distribute the remaining dirs to a downstream operator. I believe that this is a worthwhile improvement, but probably worth to postpone to the next PR
There was a problem hiding this comment.
Sounds good @Guosmilesmile, but could you explain why we need the limits for listing depth and direct subdirectories? I understand that we want to have some safeguards, but why are these limits so low?
There was a problem hiding this comment.
@mxm Because these two configurations can be set by users, I aligned the default values with the Spark part when setting them.
There was a problem hiding this comment.
Can we add a comment to highlight where those defaults are coming from?
There was a problem hiding this comment.
Ok, I will do it now !
| @Override | ||
| public void open(OpenContext openContext) throws Exception { | ||
| tableLoader.open(); | ||
| Table tableOri = tableLoader.loadTable(); |
There was a problem hiding this comment.
nit: Is this originalTable?
It is usually better to not use abbreviations
| FileSystemWalker.listDirRecursivelyWithHadoop( | ||
| location, | ||
| specs, | ||
| predicate, | ||
| configuration, | ||
| maxListingDepth, | ||
| maxListingDirectSubDirs, | ||
| remainingSubDirs::add, | ||
| out::collect); | ||
| for (String remainingSubDir : remainingSubDirs) { | ||
| FileSystemWalker.listDirRecursivelyWithHadoop( | ||
| remainingSubDir, | ||
| specs, | ||
| predicate, | ||
| configuration, | ||
| Integer.MAX_VALUE, | ||
| Integer.MAX_VALUE, | ||
| dir -> {}, | ||
| out::collect); | ||
| } |
There was a problem hiding this comment.
Why not:
FileSystemWalker.listDirRecursivelyWithHadoop(
location,
specs,
predicate,
configuration,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
dir -> {},
out::collect);
There was a problem hiding this comment.
In this case we don't need these configurations for now
There was a problem hiding this comment.
You are right. It was carried over when we reused it from Spark. The consideration was not thorough enough....I will remove it . Sorry.
| out.collect(new SplitInfo(splitSerializer.getVersion(), splitSerializer.serialize(split))); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.error("Exception planning scan for {} at {}", table, ctx.timestamp(), e); |
There was a problem hiding this comment.
This should be warn. The Job will not stop, but we would like to highlight this to the maintainer
There was a problem hiding this comment.
Ok, fix it now.
| } | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.error("Exception listing files for {} at {}", location, ctx.timestamp(), e); |
There was a problem hiding this comment.
This should be warn. The Job will not stop, but we would like to highlight this to the maintainer
| FileURI fileUri = new FileURI(new Path(value).toUri(), equalSchemes, equalAuthorities); | ||
| return fileUri.getPath(); | ||
| } catch (Exception e) { | ||
| LOG.error("Uri convert to FileURI error! Uri is {}.", value, e); |
There was a problem hiding this comment.
This should be warn. The Job will not stop, but we would like to highlight this to the maintainer
| public void close() throws Exception { | ||
| super.close(); | ||
| tableLoader.close(); | ||
| workerPool.shutdown(); |
There was a problem hiding this comment.
Yes, I will add a check for it .
| try (DataIterator<RowData> iterator = rowDataReaderFunction.createDataIterator(split)) { | ||
| iterator.forEachRemaining(rowData -> extract(rowData, out)); | ||
| } catch (Exception e) { | ||
| LOG.error("Exception processing split {} at {}", split, ctx.timestamp(), e); |
| @Parameters(name = "usePrefixListing={0}") | ||
| public static List<Object[]> parameters() { | ||
| return Arrays.asList(new Object[] {true}, new Object[] {false}); | ||
| } |
There was a problem hiding this comment.
@Parameters(name = "usePrefixListing = {0}")
private static Object[][] parameters() {
return new Object[][] {{true}, {false}};
}
|
Merged to main. |
This PR aims to add support for deleting orphan files in TableMaintenance for Flink.
The relevant design document can be found at https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?tab=t.0#heading=h.qirf2hu9bhwb