Skip to content

Spark: Support building against both Spark 3.0 and Spark 3.1.#2512

Merged
RussellSpitzer merged 7 commits into
apache:masterfrom
wypoon:spark3_1
Jun 24, 2021
Merged

Spark: Support building against both Spark 3.0 and Spark 3.1.#2512
RussellSpitzer merged 7 commits into
apache:masterfrom
wypoon:spark3_1

Conversation

@wypoon

@wypoon wypoon commented Apr 26, 2021

Copy link
Copy Markdown
Contributor

Code changes that allow spark3 and spark3-extensions to be built against both Spark 3.0 and Spark 3.1.

  • A method from org.apache.spark.sql.catalyst.util.DateTimeUtils that has changed its name is copied to org.apache.iceberg.util.DateTimeUtil and the new method is used instead.
  • The trait, org.apache.spark.sql.catalyst.plans.logical.V2WriteCommand, has 3 additional methods that need to be implemented. They are implemented in ReplaceData but without override.
  • The trait, org.apache.spark.sql.catalyst.parser.ParserInterface, no longer has the parseRawDataType method, so IcebergSparkSqlExtensionsParser implements it without override, by simply throwing UnsupportedOperationException, as the method is not used in Iceberg.
  • The main constructor for org.apache.spark.sql.catalyst.expressions.SortOrder has changed its signature. Use reflection to create instances of SortOrder.
  • The constructor for org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation has changed its signature. Use reflection to create instances of DataSourceV2ScanRelation.
  • org.apache.spark.sql.catalyst.SQLConfHelper was introduced in Spark 3.1 and a number of classes and traits now extend it, or, in the case of org.apache.spark.sql.catalyst.analysis.CastSupport, has it as a self type. This is the trickiest part. I move the mixin, CastSupport from AssignmentAlignmentSupport to the rule, AlignRowLevelOperations, since org.apache.spark.sql.catalyst.rules.Rule implements SQLConfHelper. I define the conf method in the traits AssignmentAlignmentSupport and RewriteRowLevelOperationHelper, so that it can be overridden in the classes that extend them, which also extend Rule[LogicalPlan] (and thus SQLConfHelper in Spark 3.1). When compiling with Spark 3.0, the conf in the Iceberg traits are overridden, and when compiling with Spark 3.1, the conf in SQLConfHelper is overridden.
  • The constructor for org.apache.spark.sql.catalyst.expressions.Alias has changed its signature to add another parameter to its second parameter list. Use reflection to create instances of Alias. This issue is only encountered at runtime when using a different version of Spark than built against.
  • The constructor for org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression has changed its signature. Use reflection to create instances of RepartitionByExpression. This issue is only encountered at runtime when using a different version of Spark than built against.

On the build side:

  • spark3 and spark3-extensions continue to build against Spark 3.0. New tasks are added to build the tests (using main classes built against Spark 3.0) and run them against Spark 3.1.
  • spark3-runtime builds against Spark 3.0 (since it depends on spark3 and spark3-extensions that build against 3.0), but a new integration test task is added to build and run the integration tests against Spark 3.1 using the runtime jar (built against 3.0).


@Test
public void testDeleteFromUnpartitionedTable() {
Assume.assumeFalse(Spark3VersionUtil.isSpark31());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi this test and the next fail in Spark 3.1. In https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L250-L253, if canDeleteWhere returns false, there is no attempt to rewrite the query, an exception is just thrown. Is Spark supposed to rewrite the query if SupportsDelete#canDeleteWhere returns false? If so, this part hasn't been implemented in Spark yet.

@wypoon

wypoon commented Apr 26, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer @rdblue can you please review and suggest what to do on the build side? I am not very familiar with gradle and am also not sure how we want to do this.

Comment thread spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java Outdated
@RussellSpitzer

RussellSpitzer commented Apr 26, 2021

Copy link
Copy Markdown
Member

Took a brief look but I think once the gradle fixes are in there will be more changes to make so I didn't go over the whole thing.

I think what we'll need to do is make a new configuration for Spark3.1 Testing (and maybe a new integration configuration for Spark3-Runtime)

something like

  configurations {
     spark30test extends from testRuntime
     spark31test extends from testRuntime
   }

Then in dependencies add the Spark dependency you want to each of those configurations.

That's at least how I would do it if we want 1 jar that runs against both versions of Spark even though it's only compiled against one.

@wypoon

wypoon commented Apr 26, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer thanks for taking a look and the pointer. It doesn't make immediate sense to me (that's how little I know about gradle), but I'll do some research.

@wypoon

wypoon commented Apr 30, 2021

Copy link
Copy Markdown
Contributor Author

CI is green.
Building spark3 and spark3-extensions against both 3.0 and 3.1 ensures that the code can compile against both. spark3-runtime can be built against either, up to the user; it builds against 3.0 by default. I think this is a good compromise.
In principle, we can also build against 3.0 and run (test) against 3.1, but I think this approach is a good tradeoff and would rather not add one more combination to the CI.

@wypoon

wypoon commented Apr 30, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer when you get the time, could you please review? Thanks!

@RussellSpitzer

Copy link
Copy Markdown
Member

I will definitely take a look soon.

Comment thread build.gradle Outdated
}
}

project(':iceberg-spark31') {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the changes work when using the compiled artifact from iceberg-spark3 but testing in Spark 3.1? That's what we're ideally trying to achieve. It is nice to share source, but we would prefer not to have a separate module for every Spark version.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that a Spark3 runtime jar built against Spark 3.0 does not completely work with Spark 3.1.

Comment thread build.gradle Outdated
@RussellSpitzer

Copy link
Copy Markdown
Member

I think I would prefer 2 test configurations like I suggested before. That way we run the suite against both spark versions with the same compiled code. I can try throwing together an example if we get some of the Compaction stuff done this week.

@RussellSpitzer

Copy link
Copy Markdown
Member

@wypoon + @rdblue Wrote up a little example about we can do a similar multi-test version here
RussellSpitzer@7ee33d9

Basically what we do in this example is have 2 test tasks
The default which uses 3.0.2
And testSpark31 which uses 3.1

In both cases the test code itself is compiled against the respective Spark version but the library code is always based on the 3.0.2 code. So basically we make sure our library classes work regardless of which Spark version you use them with. I only implemented the Spark3 config for this but we could duplicate it for Extensions and Integration tests pretty easily. Let me know what you think.

@RussellSpitzer

Copy link
Copy Markdown
Member

Added another commit https://github.com/RussellSpitzer/iceberg/tree/spark3_1_RussDualConfigTest

To add test suites to the rest of the modules I think thats most of it, but I may have screwed up something with the "sql" command.

@wypoon

wypoon commented May 11, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer thanks for providing the example! Let me try it out.
I can handle the java/scala changes, but the gradle changes are a struggle for me!

@wypoon

wypoon commented May 11, 2021

Copy link
Copy Markdown
Contributor Author

I pushed the code-side changes for now. I'll follow up with the build-side changes.

@wypoon

wypoon commented May 11, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer I tried out your build.gradle. Unfortunately, it appears that an Iceberg Spark3 runtime jar built against Spark 3.0 does not completely work with Spark 3.1. The integration test, SmokeTest fails in testGettingStarted when running a MERGE INTO, due to incompatible signatures in the Spark case class Alias:

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
	at org.apache.spark.sql.catalyst.optimizer.RewriteMergeInto$$anonfun$apply$1.applyOrElse(RewriteMergeInto.scala:156)

This is because the case class added an additional parameter in 3.1 vs 3.0:
https://github.com/apache/spark/blob/branch-3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L146-L150
https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L149-L154

@rdblue I'm going to stick with my approach, introduce new modules to build and test spark3 and spark3-extensions against Spark 3.1, and also to introduce a new runtime module for Spark 3.1. We will produce two Spark3 runtime jars explicitly, one for 3.0 and one for 3.1. I won't use a build property.

@wypoon

wypoon commented May 11, 2021

Copy link
Copy Markdown
Contributor Author

Excerpt from my gradle output:

> Task :iceberg-spark3-runtime:integrationTest
[Thread-8] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook calle
[Thread-8] INFO org.apache.spark.util.ShutdownHookManager - Deleting directory /private/var/folders/7x/vgfk2h155v9gjtw55z6ncgc00000gp/T/spark-51d1d3aa-5b0a-48fa-8dec-70ef4725e0be

> Task :iceberg-spark3-runtime:spark31IntegrationTest

org.apache.iceberg.spark.SmokeTest > testGettingStarted[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    java.lang.NoSuchMethodError at SmokeTest.java:67

org.apache.iceberg.spark.SmokeTest > testGettingStarted[catalogName = testhadoop, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hadoop}] FAILED
    java.lang.NoSuchMethodError at SmokeTest.java:67

org.apache.iceberg.spark.SmokeTest > testGettingStarted[catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, parquet-enabled=true, cache-enabled=false}]FAILED
    java.lang.NoSuchMethodError at SmokeTest.java:67
[shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook called
[shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Deleting directory /private/var/folders/7x/vgfk2h155v9gjtw55z6ncgc00000gp/T/spark-93a9a974-bcaf-4d43-99c8-2b4b97f6a9ed

9 tests completed, 3 failed

> Task :iceberg-spark3-runtime:spark31IntegrationTest FAILED

FAILURE: Build failed with an exception.

(@RussellSpitzer I renamed your testSpark31 task to spark31IntegrationTest. As you can see integrationTest passed, but spark31IntegrationTest failed.)

@wypoon

wypoon commented Jun 22, 2021

Copy link
Copy Markdown
Contributor Author

I rebased on master and resolved the conflicts. CI is green.
@RussellSpitzer @rdblue can you help move this forward?

@RussellSpitzer

Copy link
Copy Markdown
Member

@wypoon Sorry i was on vacation for a few weeks, i'll check this out soon. I think you addressed everything I was worried about. @rdblue should take a look too but I think it would be good if we had a compatible artifact for folks to start trying out at least.

@pan3793

pan3793 commented Jun 23, 2021

Copy link
Copy Markdown
Member

Thanks for the great works! I have deployed iceberg master code with this patch with Spark 3.1.2 on yarn cluster (cdh-6.3.1), and everything goes well until now.

if (Spark3VersionUtil.isSpark30) {
repartitionByExpressionCtor.newInstance(distribution.toSeq, query, new Integer(numShufflePartitions))
} else {
repartitionByExpressionCtor.newInstance(distribution.toSeq, query, Some(numShufflePartitions))

@pan3793 pan3793 Jun 23, 2021

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that may out of the PR's scope. There is a feature introduced in SPARK-32056, iff numShufflePartitions is absent, partition coalesces for AQE will take effect, which will avoid writing small files when AQE is enabled. But I don't know if there are side effects when set numShufflePartitions to None.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, SPARK-32056, introduced in Spark 3.1, is what changed the signature so that the last parameter is an Option[Int] instead of an Int. If you look at the code change for SPARK-32056, if the Option[Int] is None, then the number of partitions is obtained from the conf:

  val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions)

And here, in DistributionAndOrderingUtils.prepareQuery, numShufflePartitions is indeed conf.numShufflePartitions from the SQLConf that is passed in.
So there is no reason to change the code here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon wypoon Jun 23, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. There is a difference between passing Some(conf.numShufflePartitions) and passing None. In the latter case, ShuffleExchangeExec will have true for canChangeNumParts.
I can see that there will be opportunities to vary the logic depending on Spark 3 version to take advantage of new Spark 3 features. However, as you remarked, that is out of the scope of this PR. Here we just want to support building and running against both 3.0 and 3.1 and keep the logic the same.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also +1 for keeping the logic the same in this PR, just wanna know if it will introduce side effects when set numShufflePartitions to None. @RussellSpitzer @aokolnychyi would you please take a look at it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1, I think as a follow up we should be checking if we are in Spark3 and doing something like

val numPartitions = write.requiredNumPartitions()
        val finalNumPartitions = if (numPartitions > 0) {
          numPartitions
        } else {
          conf.numShufflePartitions
        }

Which is what we are doing, but I think that's a bit of a perf improvement and not neccessary for getting this compatibility in. Maybe @wypoon you want to make a followup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, what is the write in your code example?
I can look into a followup.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry we pass through the whole write (instead of just the distribution and ordering) as a parameter.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi is busy at the moment but I believe he'll want to put in some of the code changes to make this easier

@RussellSpitzer

RussellSpitzer commented Jun 23, 2021

Copy link
Copy Markdown
Member

I'll do some quick tests myself but I'm good to go with this, I have one tiny additional suggestion that we put 3.0.1 and 3.1.1 in constants so we can change around versions a bit easier. Just have a Spark30Version and Spark31Version constant?

Also, use Integer.valueOf instead of new Integer.
(Review feedback.)
@wypoon

wypoon commented Jun 23, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer thanks for looking at this again. I have just updated the PR with your suggestion to put the Spark 3.0 and 3.1 versions in constants. I also adopted a change suggested by @pan3793 in his review.

@RussellSpitzer

Copy link
Copy Markdown
Member

Ok I think everything is good to go here! Thank you so much @wypoon for dealing with all of my comments :) This is going to be a huge benefit to the Iceberg community!

@RussellSpitzer RussellSpitzer merged commit 111fe81 into apache:master Jun 24, 2021
@RussellSpitzer

Copy link
Copy Markdown
Member

Thanks to @rdblue and @pan3793 For reviewing as well!

@wypoon

wypoon commented Jun 24, 2021

Copy link
Copy Markdown
Contributor Author

@RussellSpitzer, @rdblue and @pan3793, thanks for your reviews. @RussellSpitzer especially, thanks for your help and getting this in!

wypoon added a commit to wypoon/iceberg that referenced this pull request Aug 7, 2021
The constructor of RepartitionByExpression changed between Spark 3.0 and 3.1.
There was an instance of constructing RepartitionByExpression that was missed in the original commit (apache#2512).
rdblue pushed a commit that referenced this pull request Aug 9, 2021
…2954)

The constructor of RepartitionByExpression changed between Spark 3.0 and 3.1.
There was an instance of constructing RepartitionByExpression that was missed in the original commit (#2512).
@wypoon wypoon deleted the spark3_1 branch September 16, 2021 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants