Spark: Support building against both Spark 3.0 and Spark 3.1.#2512
Conversation
|
|
||
| @Test | ||
| public void testDeleteFromUnpartitionedTable() { | ||
| Assume.assumeFalse(Spark3VersionUtil.isSpark31()); |
There was a problem hiding this comment.
@aokolnychyi this test and the next fail in Spark 3.1. In https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L250-L253, if canDeleteWhere returns false, there is no attempt to rewrite the query, an exception is just thrown. Is Spark supposed to rewrite the query if SupportsDelete#canDeleteWhere returns false? If so, this part hasn't been implemented in Spark yet.
|
@RussellSpitzer @rdblue can you please review and suggest what to do on the build side? I am not very familiar with gradle and am also not sure how we want to do this. |
|
Took a brief look but I think once the gradle fixes are in there will be more changes to make so I didn't go over the whole thing. I think what we'll need to do is make a new configuration for Spark3.1 Testing (and maybe a new integration configuration for Spark3-Runtime) something like Then in dependencies add the Spark dependency you want to each of those configurations. That's at least how I would do it if we want 1 jar that runs against both versions of Spark even though it's only compiled against one. |
|
@RussellSpitzer thanks for taking a look and the pointer. It doesn't make immediate sense to me (that's how little I know about gradle), but I'll do some research. |
|
CI is green. |
|
@RussellSpitzer when you get the time, could you please review? Thanks! |
|
I will definitely take a look soon. |
| } | ||
| } | ||
|
|
||
| project(':iceberg-spark31') { |
There was a problem hiding this comment.
Do the changes work when using the compiled artifact from iceberg-spark3 but testing in Spark 3.1? That's what we're ideally trying to achieve. It is nice to share source, but we would prefer not to have a separate module for every Spark version.
There was a problem hiding this comment.
I found that a Spark3 runtime jar built against Spark 3.0 does not completely work with Spark 3.1.
|
I think I would prefer 2 test configurations like I suggested before. That way we run the suite against both spark versions with the same compiled code. I can try throwing together an example if we get some of the Compaction stuff done this week. |
|
@wypoon + @rdblue Wrote up a little example about we can do a similar multi-test version here Basically what we do in this example is have 2 test tasks In both cases the test code itself is compiled against the respective Spark version but the library code is always based on the 3.0.2 code. So basically we make sure our library classes work regardless of which Spark version you use them with. I only implemented the Spark3 config for this but we could duplicate it for Extensions and Integration tests pretty easily. Let me know what you think. |
|
Added another commit https://github.com/RussellSpitzer/iceberg/tree/spark3_1_RussDualConfigTest To add test suites to the rest of the modules I think thats most of it, but I may have screwed up something with the "sql" command. |
|
@RussellSpitzer thanks for providing the example! Let me try it out. |
|
I pushed the code-side changes for now. I'll follow up with the build-side changes. |
|
@RussellSpitzer I tried out your build.gradle. Unfortunately, it appears that an Iceberg Spark3 runtime jar built against Spark 3.0 does not completely work with Spark 3.1. The integration test, This is because the case class added an additional parameter in 3.1 vs 3.0: @rdblue I'm going to stick with my approach, introduce new modules to build and test spark3 and spark3-extensions against Spark 3.1, and also to introduce a new runtime module for Spark 3.1. We will produce two Spark3 runtime jars explicitly, one for 3.0 and one for 3.1. I won't use a build property. |
|
Excerpt from my gradle output: (@RussellSpitzer I renamed your testSpark31 task to spark31IntegrationTest. As you can see integrationTest passed, but spark31IntegrationTest failed.) |
|
I rebased on master and resolved the conflicts. CI is green. |
|
Thanks for the great works! I have deployed iceberg master code with this patch with Spark 3.1.2 on yarn cluster (cdh-6.3.1), and everything goes well until now. |
| if (Spark3VersionUtil.isSpark30) { | ||
| repartitionByExpressionCtor.newInstance(distribution.toSeq, query, new Integer(numShufflePartitions)) | ||
| } else { | ||
| repartitionByExpressionCtor.newInstance(distribution.toSeq, query, Some(numShufflePartitions)) |
There was a problem hiding this comment.
One thing that may out of the PR's scope. There is a feature introduced in SPARK-32056, iff numShufflePartitions is absent, partition coalesces for AQE will take effect, which will avoid writing small files when AQE is enabled. But I don't know if there are side effects when set numShufflePartitions to None.
There was a problem hiding this comment.
Indeed, SPARK-32056, introduced in Spark 3.1, is what changed the signature so that the last parameter is an Option[Int] instead of an Int. If you look at the code change for SPARK-32056, if the Option[Int] is None, then the number of partitions is obtained from the conf:
val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions)
And here, in DistributionAndOrderingUtils.prepareQuery, numShufflePartitions is indeed conf.numShufflePartitions from the SQLConf that is passed in.
So there is no reason to change the code here.
There was a problem hiding this comment.
Things are more interesting here, the Strategy also changed in SPARK-32056.
https://github.com/apache/spark/pull/28900/files#diff-21f071d73070b8257ad76e6e16ec5ed38a13d1278fe94bd42546c258a69f4410R687-R690
There was a problem hiding this comment.
You're right. There is a difference between passing Some(conf.numShufflePartitions) and passing None. In the latter case, ShuffleExchangeExec will have true for canChangeNumParts.
I can see that there will be opportunities to vary the logic depending on Spark 3 version to take advantage of new Spark 3 features. However, as you remarked, that is out of the scope of this PR. Here we just want to support building and running against both 3.0 and 3.1 and keep the logic the same.
There was a problem hiding this comment.
I'm also +1 for keeping the logic the same in this PR, just wanna know if it will introduce side effects when set numShufflePartitions to None. @RussellSpitzer @aokolnychyi would you please take a look at it?
There was a problem hiding this comment.
I'm +1, I think as a follow up we should be checking if we are in Spark3 and doing something like
val numPartitions = write.requiredNumPartitions()
val finalNumPartitions = if (numPartitions > 0) {
numPartitions
} else {
conf.numShufflePartitions
}
Which is what we are doing, but I think that's a bit of a perf improvement and not neccessary for getting this compatibility in. Maybe @wypoon you want to make a followup?
There was a problem hiding this comment.
@RussellSpitzer, what is the write in your code example?
I can look into a followup.
There was a problem hiding this comment.
Ah sorry we pass through the whole write (instead of just the distribution and ordering) as a parameter.
There was a problem hiding this comment.
@aokolnychyi is busy at the moment but I believe he'll want to put in some of the code changes to make this easier
|
I'll do some quick tests myself but I'm good to go with this, I have one tiny additional suggestion that we put 3.0.1 and 3.1.1 in constants so we can change around versions a bit easier. Just have a Spark30Version and Spark31Version constant? |
Also, use Integer.valueOf instead of new Integer. (Review feedback.)
|
@RussellSpitzer thanks for looking at this again. I have just updated the PR with your suggestion to put the Spark 3.0 and 3.1 versions in constants. I also adopted a change suggested by @pan3793 in his review. |
|
Ok I think everything is good to go here! Thank you so much @wypoon for dealing with all of my comments :) This is going to be a huge benefit to the Iceberg community! |
|
@RussellSpitzer, @rdblue and @pan3793, thanks for your reviews. @RussellSpitzer especially, thanks for your help and getting this in! |
The constructor of RepartitionByExpression changed between Spark 3.0 and 3.1. There was an instance of constructing RepartitionByExpression that was missed in the original commit (apache#2512).
Code changes that allow spark3 and spark3-extensions to be built against both Spark 3.0 and Spark 3.1.
org.apache.spark.sql.catalyst.util.DateTimeUtilsthat has changed its name is copied toorg.apache.iceberg.util.DateTimeUtiland the new method is used instead.org.apache.spark.sql.catalyst.plans.logical.V2WriteCommand, has 3 additional methods that need to be implemented. They are implemented inReplaceDatabut withoutoverride.org.apache.spark.sql.catalyst.parser.ParserInterface, no longer has theparseRawDataTypemethod, soIcebergSparkSqlExtensionsParserimplements it withoutoverride, by simply throwingUnsupportedOperationException, as the method is not used in Iceberg.org.apache.spark.sql.catalyst.expressions.SortOrderhas changed its signature. Use reflection to create instances ofSortOrder.org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelationhas changed its signature. Use reflection to create instances ofDataSourceV2ScanRelation.org.apache.spark.sql.catalyst.SQLConfHelperwas introduced in Spark 3.1 and a number of classes and traits now extend it, or, in the case oforg.apache.spark.sql.catalyst.analysis.CastSupport, has it as a self type. This is the trickiest part. I move the mixin,CastSupportfromAssignmentAlignmentSupportto the rule,AlignRowLevelOperations, sinceorg.apache.spark.sql.catalyst.rules.RuleimplementsSQLConfHelper. I define theconfmethod in the traitsAssignmentAlignmentSupportandRewriteRowLevelOperationHelper, so that it can be overridden in the classes that extend them, which also extendRule[LogicalPlan](and thusSQLConfHelperin Spark 3.1). When compiling with Spark 3.0, theconfin the Iceberg traits are overridden, and when compiling with Spark 3.1, theconfinSQLConfHelperis overridden.org.apache.spark.sql.catalyst.expressions.Aliashas changed its signature to add another parameter to its second parameter list. Use reflection to create instances ofAlias. This issue is only encountered at runtime when using a different version of Spark than built against.org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpressionhas changed its signature. Use reflection to create instances ofRepartitionByExpression. This issue is only encountered at runtime when using a different version of Spark than built against.On the build side: