Core: Optimize DeleteFileIndex#8157
Conversation
|
|
||
| // a delete file wrapper that caches the converted boundaries for faster boundary checks | ||
| // this class is not meant to be exposed beyond the delete file index | ||
| private static class IndexedDeleteFile { |
|
|
||
| DeleteFileIndex( | ||
| Map<Integer, PartitionSpec> specsById, | ||
| Map<Integer, PartitionSpec> specs, |
There was a problem hiding this comment.
Renamed specsById to stay on one line below.
There was a problem hiding this comment.
Do we need to care about style like this later?
There was a problem hiding this comment.
I feel like it is OK to do given that this PR already contains a few cosmetic changes.
| } | ||
|
|
||
| DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { | ||
| if (isEmpty) { |
There was a problem hiding this comment.
No need to derive the partition and do anything with the streams if the index is empty.
| // read all of the matching delete manifests in parallel and accumulate the matching files in | ||
| // a queue | ||
| Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>(); | ||
| Queue<DeleteFile> files = new ConcurrentLinkedQueue<>(); |
There was a problem hiding this comment.
I will need this change for distributed planning as well.
|
|
||
| private static <T> boolean rangesOverlap( | ||
| Type.PrimitiveType type, | ||
| Types.NestedField field, |
There was a problem hiding this comment.
Passing field to stay on a single line when calling this method.
6b0b1ba to
745d9e3
Compare
| return Arrays.stream(files, start, files.length); | ||
| } | ||
|
|
||
| private static DeleteFileGroup index( |
There was a problem hiding this comment.
These index methods only exist because the old constructor is package private and is used in tests. I had to keep that for compatibility.
| if (convertedLowerBounds == null) { | ||
| synchronized (this) { | ||
| if (convertedLowerBounds == null) { | ||
| this.convertedLowerBounds = convertBounds(wrapped.lowerBounds()); |
There was a problem hiding this comment.
Is the idea here to convert all bounds at once to avoid a null check in lowerBound?
If so, I don't think this is a good idea. The delete index is only going to use a few bound values (overlap uses only the equality delete's equality field ID set), so converting all of them is probably unnecessary. Plus, calling lowerBounds() in lowerBound(int) already incurs a null check to see if the bounds are converted. So lazily converting each lower bound is probably no more cost.
There was a problem hiding this comment.
The underlying method convertBounds only converts file path for position deletes and equality field ids for equality deletes. You are right, they are converted at the same time. I did that to avoid using a concurrent hash map and computeIfAbsent (which has performance issues).
There was a problem hiding this comment.
Ah, I see. Sounds good then.
There was a problem hiding this comment.
I am still debating. We probably need a concurrent hash map to load each value one by one, right?
There was a problem hiding this comment.
Here is the problem I was talking about: https://bugs.openjdk.org/browse/JDK-8161372
There was a problem hiding this comment.
Here is the problem I was talking about: https://bugs.openjdk.org/browse/JDK-8161372
This should help then: apache/uniffle#766
There was a problem hiding this comment.
I spent a bit more time thinking about this and I don't think it would be worth the extra complexity. Let's keep this as is for now. We only index equality IDs and all columns must be checked to discard a file.
I also checked Caffeine caches and they have some workarounds but I don't think we need them here.
|
|
||
| } else { | ||
| for (int id : equalityFieldIds()) { | ||
| Type type = spec.schema().findField(id).type(); |
There was a problem hiding this comment.
I just checked and spec.schema() is used in both conversions so the type should always match.
rdblue
left a comment
There was a problem hiding this comment.
Looks correct overall. I had some comments about mostly minor things.
|
@rdblue @aokolnychyi you might be interested in revisit this PR again #5760 . |
| T deleteUpper) { | ||
| Type.PrimitiveType type = field.type().asPrimitiveType(); | ||
| Comparator<T> cmp = Comparators.forType(type); | ||
| T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf); |
There was a problem hiding this comment.
So, the most performance improvement comes here.
|
|
||
| return comparator.compare(deleteLower, dataUpper) <= 0 | ||
| && comparator.compare(dataLower, deleteUpper) <= 0; | ||
| return cmp.compare(deleteLower, dataUpper) <= 0 && cmp.compare(dataLower, deleteUpper) <= 0; |
There was a problem hiding this comment.
Here, maybe we can avoid the conversion for the upper. But this is a little improvement.
There was a problem hiding this comment.
Updated to match the checks we have for the file path.
745d9e3 to
32a3d1b
Compare
|
@zinking, we have discussed it a bit during one of the community syncs. I'll take another look over the weekend. |
|
I'll merge this one to unblock the distributed planning effort. Thanks everyone for reviewing! |

This PR improves and refactors
DeleteFileIndex.dataSequenceNumberfromContentFileinstead ofManifestEntryto support distributed planning in the future.This change relies on existing tests and adds a new benchmark.
Results prior to this change:
Results after this change:
This would be even more important for equality deletes.