Spark: Provide Procedure for Catalog Register Procedure#4810
Conversation
Adds the ability to invoke a Catalog's register method via a Spark procedure. This allows a user to create a catalog entry for a metadata.json file which already exists but does not have a corrosponding catalog identifier.
|
@karuppayya Could you please take a look? |
| * @param catalogName The name of the Spark Catalog being referenced | ||
| * @return the Iceberg catalog class being wrapped by the Spark Catalog | ||
| */ | ||
| public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) { |
There was a problem hiding this comment.
This isn't strictly needed for this PR but is another API which is now possible because of this PR
There was a problem hiding this comment.
@kbendick I think you would be interested in this as well
There was a problem hiding this comment.
Thanks for tagging me @RussellSpitzer. I do think this would be really beneficial. I have a number of awkward workarounds that I've either suggsted to people or implemented myself that this would get around. So emphatic +1.
There was a problem hiding this comment.
We'd probably want to add the HasIcebergCatalog interface to other catalogs as well, though that can be handled in a follow up once the interface is in.
If / when we merge this, let's open an issue for adding HasIcebergCatalog to other catalogs. I've got a newer way of tracking opened issues I've been using so that's one way to ensure that it is at least not forgotten about =)
There was a problem hiding this comment.
One issue with that is this is a Spark Table Catalog implementing class in the current iteration, should we change that?
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.spark.sql.connector.catalog.TableCatalog; | ||
|
|
||
| public interface HasIcebergCatalog extends TableCatalog { |
There was a problem hiding this comment.
👍 . This is something I get asked about often enough and I don't like having to ask end users to do multiple casts.
There was a problem hiding this comment.
As a note, The reason I put this here as a public interface is that I didn't want to expose BaseCatalog
| Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(), | ||
| "Cannot handle an empty argument metadata_file"); | ||
|
|
||
| Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog(); |
There was a problem hiding this comment.
Should we check if the tableCatalog is an instance of HasIcebergCatalog?
|
|
||
| @Override | ||
| public String description() { | ||
| return "MigrateTableProcedure"; |
There was a problem hiding this comment.
MigrateTableProcedure -> RegisterTableProcedure ?
| long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count(); | ||
| long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count(); |
There was a problem hiding this comment.
I think this will scan 2 metadata tables, right? The concern is that the perf of the procedure may not be good if the table has a lot of data/metadata files. Can we use the ones from snapshot summary? e.g. TOTAL_DATA_FILES_PROP
There was a problem hiding this comment.
I kinda wanted to do a little sanity check here that the table actually worked which is why I had the full scans here
| public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) { | ||
| CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); | ||
| Preconditions.checkArgument(catalogPlugin instanceof HasIcebergCatalog, | ||
| String.format("Cannot load Iceberg catalog from catalog %s because it is not a Spark Iceberg catalog. Actual " + |
There was a problem hiding this comment.
because it is not a Spark Iceberg catalog -> because it doesn't contain an Iceberg catalog?
|
|
||
| Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog(); | ||
| Table table = icebergCatalog.registerTable(tableName, metadataFile); | ||
| Snapshot currentSnapshot = table.currentSnapshot(); |
There was a problem hiding this comment.
I think we need a null check here. There is an edge case, that current snapshot is null.
flyrain
left a comment
There was a problem hiding this comment.
Thanks for the PR. LGTM overall. The only thing is the null check of the current snapshot.
singhpk234
left a comment
There was a problem hiding this comment.
LGTM as well, Thanks @RussellSpitzer :) !!!
| assertEquals("Registered table rows should match original table rows", original, registered); | ||
| Assert.assertEquals("Should have the right row count in the procedure result", | ||
| numRows, result.get(0)[1]); | ||
| Assert.assertEquals("Should have the right datafile count the procedure result", |
There was a problem hiding this comment.
nit
| Assert.assertEquals("Should have the right datafile count the procedure result", | |
| Assert.assertEquals("Should have the right datafile count in the procedure result", |
| }; | ||
|
|
||
| private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ | ||
| new StructField("Current Snapshot", DataTypes.LongType, true, Metadata.empty()), |
There was a problem hiding this comment.
[minor] should we call this Current SnapshotId
kbendick
left a comment
There was a problem hiding this comment.
+1 this LGTM as well. Are we still interested in merging this @RussellSpitzer?
|
Yes sorry the merge alignment pr I just merged in was high priority and this went on my back burner |
|
Thanks @flyrain, @kbendick and @singhpk234 ! |
* Spark: Provide Procedure for Catalog Register Procedure Adds the ability to invoke a Catalog's register method via a Spark procedure. This allows a user to create a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier.
* Spark: Provide Procedure for Catalog Register Procedure Adds the ability to invoke a Catalog's register method via a Spark procedure. This allows a user to create a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier.
Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corrosponding catalog identifier.