Adding new rewrite manifest spark action to accept custom partition order#12840
Conversation
| */ | ||
| RewriteManifests rewriteIf(Predicate<ManifestFile> predicate); | ||
|
|
||
| /** |
There was a problem hiding this comment.
I think there are still a few issues in this doc
- This should only produce a single manifest list.
- Unclear what this is supposed to produce, are we sorting or clustering here? Will manifests be separated based on values of B and D? Does the order matter?
Rewriting Manifests in this way
* will yield manifest_lists that point to manifest_files containing data files for common 'd' and
* 'b' partitions.
I think the core message here is
Rewrites the manifests and order (cluster?) their entries based on the specified transforms. Manifests can be filtered based on their min and max partition values. Choosing a frequently queried partition field can reduce planning time by skipping unnecessary manifests.
Or something along those lines
There was a problem hiding this comment.
This also shouldn't use the current spec but the one specified by specId(int specId)
There was a problem hiding this comment.
Can update documentation. Thanks!
| * bucket(N, data) partition * definition | ||
| * @return this for method chaining | ||
| */ | ||
| default RewriteManifests clusterBy(List<String> partitionFields) { |
There was a problem hiding this comment.
why cluster and not sort?
There was a problem hiding this comment.
In the original proposal we talked about updating the Spark api to match the Java API, where they have a clusterBy interface
Educate me - I see clustering and sorting as synonyms. Clustering is just the word for the spark technique to sort and repartitionByRange data into "clusters". I'm not married to either.
There was a problem hiding this comment.
Hmm that's a good question. In my eyes we are doing a hierarchical sort which feels different to me than a multi-dimensional clustering algo. So for example Cluster(a, b) might get me manifests with common tuples where A and B are correlated but we can't actually do that here.
So for example if I would expect cluster to make files like
{(1,1)(1,2)(2,1)(2,2)}
{(1,3)(1,4)(2,3)(2,4)}
{(3,1)(3,2)(4,1)(4,2)}
{(3,3)(3,4)(4,3)(4,4)}
I would consider that clustered
But our current algo can't do that, it can only do a hierarchical sort, each column is dependent on the one before it. Like in the above example if I cluster (a,b) I would produce
{(1,1)(1,2)(1,3)(1,4)}
{(2,1)(2,2)(2,3)(2,4)}
{(3,1)(3,2)(3,3)(3,4)}
{(4,1)(4,2)(4,3)(4,4)}
There was a problem hiding this comment.
Good illustration. I think the question in your illustration is - are the tuples globally sorted? And does that matter? I don't think the manifest list assumes sorting, so each manifest's lower/uppers are checked. In which case I'd call it "clustering".
Really no preference on naming. Like I said I just picked this to match the Java API. At the end of the day either of your examples is better than
{(1,4)(1,4)(1,4)(1,4)}
{(1,4)(1,4)(1,4)(1,4)}
{(1,4)(1,4)(1,4)(1,4)}
{(1,4)(1,4)(1,4)(1,4)}
:-)
There was a problem hiding this comment.
The original API "clusters" this api "sorts", I'm not sure they are comparable. I really think the naming should be "sort"
| // Check if these partition fields are included in the spec | ||
| Preconditions.checkArgument( | ||
| missingFields.isEmpty(), | ||
| "Cannot set manifest clustering because specified field(s) %s were not found in current partition spec %s.", |
There was a problem hiding this comment.
This shouldn't be the current spec, but the spec chosen for rewrite target
| List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io()); | ||
| assertThat(manifests).as("Should have 1 manifests before rewrite").hasSize(1); | ||
|
|
||
| // Capture the c3 partition's lower and upper bounds - used for later test assertions |
There was a problem hiding this comment.
The checks here are pretty complicated, I think it probably would be a lot simpler to just check that the rows within the manifest are ordered rather than going through all the manifest metadata. Our readers should already read through entries in order from the file I think ...
Then we could just check "contents before = contents after" - "row before unsorted on c3 != rows after sorted on c3 transform"
There was a problem hiding this comment.
I'll see what I can do. Contents can still be sorted locally, but not globally sorted without the clusterBy. I think you're suggesting to just say "sorted but different" is sufficient?
There was a problem hiding this comment.
I'm just trying to think how we can avoid re-implementing a check overlapping bounds function (we already have done this once in the spark rewrite datafiles test code - but for metrics) and avoid doing all the conversions from byte buffer. It's a lot of code to have in a single test case and I think it's a bit difficult to check.
Since you are using a Integer output here we probably could just use the MetadataTable - ManfiestsTable which reads the manifest list and serailzes all the partition min/maxs into strings. So you could take those flatten them into a list and check that the list is sorted
for (lower, upper <- partitionsummaries.field you want) {
bounds.add(Integer.of(lower))
bounds.add(Integer.of(upper))
}
assert that bounds is sorted
Mostly I'm looking here for a way to simplify this test so it's more clear what's doing without so much boiler plate code. If you have other suggestions I'm open those as well
I'd also like to apologize for taking so long to get back to this, please feel free to ping my on slack if I haven't noticed a github comment. My email is a bit packed so sometimes I lose important notices.
There was a problem hiding this comment.
Will simplify, thanks for staying plugged in!
…g column identifier for RewriteManifests with custom ordering
|
Made most of the changes asked for. Still undecided around clustering vs sorting naming, and I didn't change the test that verifies content ordering within manifests. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
@RussellSpitzer mind a review? |
| /** | ||
| * Rewrite manifests in a given order, based on partition field names | ||
| * | ||
| * <p>Supply an optional set of partition field names to cluster the rewritten manifests by. |
There was a problem hiding this comment.
Here and below we should be using "sort" and not "cluster"
There was a problem hiding this comment.
Updated documentation
|
Reverted signature back to sortBy at @RussellSpitzer's request |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
@RussellSpitzer anything else you want here? |
|
@RussellSpitzer thanks for the tip on the unit test. Spot on with your simplification idea. Hope this is what you're looking for! |
|
Merged! Thanks @zachdisc for the contribution. You should now be able to get the integration tests to run automatically. If you have a chance please backwards and forwards port your changes to the other spark versions. I assume they will be clean merges so we can do a fast review on those and get them in as well. |
Note this is a fresh PR replacing #9731. It had too much accumulated conflicts and changes, I rebased and messed it up. This is a clean start with all previous feedback incorporated.
What
This adds a simple
sortmethod to theRewriteManifestsspark action which lets user specify the partition column order to consider when grouping manifests.Illustration:
Closes #9615
Why
Iceberg's metadata is organized into a forest of manifest_files which point to data files sharing common partitions. By default, and during
RewriteManifests, the partition grouping is determined by the defaultSpecpartition order. If the primary query pattern is more aligned with the last partition in the table's spec, manifests are poorly suited to quickly plan and prune around those partitions.EG
Will create manifests that first group by
region, whosemanifest_filecontents may span a wide range ofevent_timevalues. For a primary query pattern that doesn't care aboutregion,storeId, etc, this leads to inefficient queries.