Spark: Make maxRecordPerMicrobatch a soft limit#12988
Conversation
|
seems un-related failure |
cddcc31 to
4dddd5b
Compare
RussellSpitzer
left a comment
There was a problem hiding this comment.
I have some nits about docs, but this looks good to me
| assertThat( | ||
| microBatchCount( | ||
| ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"))) | ||
| .isEqualTo(4); |
There was a problem hiding this comment.
why is the micro batch count 4 previously?
There was a problem hiding this comment.
There are total 6 files, with rows as 1, 2, 1, 1, 1, 1 earlier the microbatches were [1, 2, 2, 2] now they are [3, 2, 2] as [1, 2] are now merged into 1 microbatch since number of rows being a soft-limit.
Thats a good callout, I refactored to check exact microbatch record sizes, hopefully this helps in easy reasoning !
There was a problem hiding this comment.
there are 3 snapshots. is it because the write parallelism is 2 for the Spark writer? hence there are 2 data files per commit. that is not obvious from appendDataAsMultipleSnapshots method in this test class.
There was a problem hiding this comment.
I think so, either this or the defaults of spark are set that way, haven't taken a deeper look (let me double check), but yes the appendDataAsMultipleSnapshots consistent produces this file orientation across all test cases and spark versions
There was a problem hiding this comment.
discussed offline. currently, the unit test depends on the spark session setup with default parallelism at 2 in TestBase`. We can follow up on this separately.
SparkSession.builder()
.master("local[2]")
If someone change it to local[3], this test will break. not saying someone will change it. just that there is an implicit dependency.
c626273 to
76abae4
Compare
76abae4 to
bd6b3b7
Compare
About the change
Make maxRecordsPerMicrobatch a soft limit, as the cases like for ex max number of records is less than the totalRecords of a file, would expect us to read the file partially (current behavior), this can't be just a row group boundary or something which we can incorporate in our scan tasks as if like splitting it at the record count cutoff, hence its very difficult to define the boundary, it would be better if we make the make this a soft limit as if and when including a file if it can be contained within the limit its fine, otherwise include the whole file and be done with that particular micro-batch stream.
This change is motivated by two major factors :
maxBytesPerTrigger: How much data gets processed in each micro-batch. This option sets a “soft max”, meaning that a batch processes approximately this amount of data and may process more than the limit in order to make the streaming query move forward in cases when the smallest input unit is larger than this limit. This is not set by default.
Testing done
Modified the existing UT which mimics stuckness to pass now.