Skip to content

Spark: Provide Procedure for Catalog Register Procedure#4810

Merged
RussellSpitzer merged 7 commits into
apache:masterfrom
RussellSpitzer:registerTableProcedure
Jun 3, 2022
Merged

Spark: Provide Procedure for Catalog Register Procedure#4810
RussellSpitzer merged 7 commits into
apache:masterfrom
RussellSpitzer:registerTableProcedure

Conversation

@RussellSpitzer

Copy link
Copy Markdown
Member

Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corrosponding catalog identifier.

Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corrosponding catalog identifier.
@RussellSpitzer

Copy link
Copy Markdown
Member Author

@karuppayya Could you please take a look?
@flyrain This is the API we were discussing

* @param catalogName The name of the Spark Catalog being referenced
* @return the Iceberg catalog class being wrapped by the Spark Catalog
*/
public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't strictly needed for this PR but is another API which is now possible because of this PR

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick I think you would be interested in this as well

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tagging me @RussellSpitzer. I do think this would be really beneficial. I have a number of awkward workarounds that I've either suggsted to people or implemented myself that this would get around. So emphatic +1.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd probably want to add the HasIcebergCatalog interface to other catalogs as well, though that can be handled in a follow up once the interface is in.

If / when we merge this, let's open an issue for adding HasIcebergCatalog to other catalogs. I've got a newer way of tracking opened issues I've been using so that's one way to ensure that it is at least not forgotten about =)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with that is this is a Spark Table Catalog implementing class in the current iteration, should we change that?

import org.apache.iceberg.catalog.Catalog;
import org.apache.spark.sql.connector.catalog.TableCatalog;

public interface HasIcebergCatalog extends TableCatalog {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 . This is something I get asked about often enough and I don't like having to ask end users to do multiple casts.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a note, The reason I put this here as a public interface is that I didn't want to expose BaseCatalog

Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
"Cannot handle an empty argument metadata_file");

Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the tableCatalog is an instance of HasIcebergCatalog?


@Override
public String description() {
return "MigrateTableProcedure";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MigrateTableProcedure -> RegisterTableProcedure ?

Comment on lines +82 to +83
long totalManifests = spark().table(tableCatalog().name() + "." + tableName + ".manifests").count();
long totalDataFiles = spark().table(tableCatalog().name() + "." + tableName + ".files").count();

@flyrain flyrain May 18, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will scan 2 metadata tables, right? The concern is that the perf of the procedure may not be good if the table has a lot of data/metadata files. Can we use the ones from snapshot summary? e.g. TOTAL_DATA_FILES_PROP

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda wanted to do a little sanity check here that the table actually worked which is why I had the full scans here

public static Catalog loadIcebergCatalog(SparkSession spark, String catalogName) {
CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName);
Preconditions.checkArgument(catalogPlugin instanceof HasIcebergCatalog,
String.format("Cannot load Iceberg catalog from catalog %s because it is not a Spark Iceberg catalog. Actual " +

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it is not a Spark Iceberg catalog -> because it doesn't contain an Iceberg catalog?

@RussellSpitzer

Copy link
Copy Markdown
Member Author

Thanks @flyrain and @kbendick , Please take a look again when you have a chance


Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
Table table = icebergCatalog.registerTable(tableName, metadataFile);
Snapshot currentSnapshot = table.currentSnapshot();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a null check here. There is an edge case, that current snapshot is null.

@flyrain flyrain left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM overall. The only thing is the null check of the current snapshot.

@flyrain flyrain left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@singhpk234 singhpk234 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well, Thanks @RussellSpitzer :) !!!

assertEquals("Registered table rows should match original table rows", original, registered);
Assert.assertEquals("Should have the right row count in the procedure result",
numRows, result.get(0)[1]);
Assert.assertEquals("Should have the right datafile count the procedure result",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
Assert.assertEquals("Should have the right datafile count the procedure result",
Assert.assertEquals("Should have the right datafile count in the procedure result",

};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
new StructField("Current Snapshot", DataTypes.LongType, true, Metadata.empty()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] should we call this Current SnapshotId

@kbendick kbendick left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this LGTM as well. Are we still interested in merging this @RussellSpitzer?

@RussellSpitzer

Copy link
Copy Markdown
Member Author

Yes sorry the merge alignment pr I just merged in was high priority and this went on my back burner

@RussellSpitzer RussellSpitzer merged commit 82607f1 into apache:master Jun 3, 2022
@RussellSpitzer RussellSpitzer deleted the registerTableProcedure branch June 3, 2022 13:09
@RussellSpitzer

Copy link
Copy Markdown
Member Author

Thanks @flyrain, @kbendick and @singhpk234 !

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
* Spark: Provide Procedure for Catalog Register Procedure

Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corresponding catalog identifier.
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
* Spark: Provide Procedure for Catalog Register Procedure

Adds the ability to invoke a Catalog's register method via a Spark
procedure. This allows a user to create a catalog entry for a metadata.json
file which already exists but does not have a corresponding catalog identifier.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants