Skip to content

Spark: Make maxRecordPerMicrobatch a soft limit#12988

Merged
stevenzwu merged 7 commits into
apache:mainfrom
singhpk234:fix/stream-stuck
Jun 27, 2025
Merged

Spark: Make maxRecordPerMicrobatch a soft limit#12988
stevenzwu merged 7 commits into
apache:mainfrom
singhpk234:fix/stream-stuck

Conversation

@singhpk234

@singhpk234 singhpk234 commented May 7, 2025

Copy link
Copy Markdown
Contributor

About the change

Make maxRecordsPerMicrobatch a soft limit, as the cases like for ex max number of records is less than the totalRecords of a file, would expect us to read the file partially (current behavior), this can't be just a row group boundary or something which we can incorporate in our scan tasks as if like splitting it at the record count cutoff, hence its very difficult to define the boundary, it would be better if we make the make this a soft limit as if and when including a file if it can be contained within the limit its fine, otherwise include the whole file and be done with that particular micro-batch stream.

This change is motivated by two major factors :

  1. stream being stuck presently leading to poor UX Docs: Add documentation for Rate limiting in Spark Structured Streaming #12217 (comment)
  2. Softlimit is what other solution enforce for ex : delta doc

maxBytesPerTrigger: How much data gets processed in each micro-batch. This option sets a “soft max”, meaning that a batch processes approximately this amount of data and may process more than the limit in order to make the streaming query move forward in cases when the smallest input unit is larger than this limit. This is not set by default.

Testing done

Modified the existing UT which mimics stuckness to pass now.

@singhpk234

Copy link
Copy Markdown
Contributor Author

seems un-related failure

TestRewriteDataFilesAction > testParallelPartialProgressWithMaxFailedCommitsLargerThanTotalFileGroup() > formatVersion = 2 FAILED
    java.lang.RuntimeException: partial-progress.enabled is true but 1 rewrite commits failed. This is more than the maximum allowed failures of 0. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase partial-progress.max-commits which will split the rewrite operation into smaller commits.
        at org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.doExecuteWithPartialProgress(RewriteDataFilesSparkAction.java:400)
        at org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.execute(RewriteDataFilesSparkAction.java:187)

@singhpk234 singhpk234 closed this May 7, 2025
@singhpk234 singhpk234 reopened this May 7, 2025

@RussellSpitzer RussellSpitzer left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some nits about docs, but this looks good to me

Comment thread docs/docs/spark-configuration.md Outdated

@stevenzwu stevenzwu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment thread docs/docs/spark-configuration.md Outdated
assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
.isEqualTo(4);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the micro batch count 4 previously?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are total 6 files, with rows as 1, 2, 1, 1, 1, 1 earlier the microbatches were [1, 2, 2, 2] now they are [3, 2, 2] as [1, 2] are now merged into 1 microbatch since number of rows being a soft-limit.

Thats a good callout, I refactored to check exact microbatch record sizes, hopefully this helps in easy reasoning !

@stevenzwu stevenzwu Jun 27, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 3 snapshots. is it because the write parallelism is 2 for the Spark writer? hence there are 2 data files per commit. that is not obvious from appendDataAsMultipleSnapshots method in this test class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, either this or the defaults of spark are set that way, haven't taken a deeper look (let me double check), but yes the appendDataAsMultipleSnapshots consistent produces this file orientation across all test cases and spark versions

@stevenzwu stevenzwu Jun 27, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline. currently, the unit test depends on the spark session setup with default parallelism at 2 in TestBase`. We can follow up on this separately.

        SparkSession.builder()
            .master("local[2]")

If someone change it to local[3], this test will break. not saying someone will change it. just that there is an implicit dependency.

@singhpk234 singhpk234 force-pushed the fix/stream-stuck branch 3 times, most recently from c626273 to 76abae4 Compare June 27, 2025 08:05
@stevenzwu stevenzwu merged commit 6e80089 into apache:main Jun 27, 2025
27 checks passed
@stevenzwu stevenzwu added this to the Iceberg 1.10.0 milestone Jun 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants