Apache Iceberg version: 1.10.1 (also present in the default planner on 1.11.0 / main)
Query engine: Spark (3.5)
Describe the bug / improvement
With an unbounded ReadLimit (ReadLimit.allAvailable() — which Spark's Trigger.AvailableNow passes via prepareForTriggerAvailableNow to compute its target offset), SyncSparkMicroBatchPlanner.latestOffset (the logic was inline in SparkMicroBatchStream before 1.11.0) walks every snapshot in (committed, head] on the driver — per snapshot it reads the manifest list (MicroBatches.skippedManifestIndexesFromSnapshot → Snapshot.dataManifests → ManifestLists.read) and opens the added manifests to iterate every FileScanTask.
Because getMaxFiles(allAvailable) == Integer.MAX_VALUE (and likewise getMaxRows), the per-file/-row accumulators never trip a limit and are discarded. The returned offset is (lastValidSnapshot, positionInThatSnapshot), which depends only on the final valid snapshot — so the entire per-file traversal of the intermediate gap is wasted work: O(snapshots-in-gap × files), single-threaded on the driver.
Impact
On a high-commit-cadence table (e.g. a snapshot ~every 30s) consumed by a scheduled Trigger.AvailableNow job, the committed→head gap equals the snapshots committed during the previous run. A slow run grows the next gap, so latestOffset time ramps run-over-run, and a single run that fails to commit makes the gap permanent (it never recovers without a checkpoint reset). In a production case the per-run offset computation ramped from ~8 min to ~30+ min over a few days until the job exceeded its scheduling budget and stalled — while the source table was fully maintained (snapshots bounded by retention, data files bounded by compaction, manifests bounded by auto-merge). A continuously-running stream avoids this only because it never accumulates a large gap.
The async planner already does the right thing
AsyncSparkMicroBatchPlanner.latestOffset short-circuits ReadAllAvailable: it advances the snapshot chain via nextValidSnapshot to the latest valid snapshot and returns its offset — no manifest-list reads, no per-file iteration. But the async planner is opt-in (async-micro-batch-planning-enabled), and the default SyncSparkMicroBatchPlanner still performs the full per-file gap walk for the unbounded case.
Proposed fix
Mirror the async planner's ReadAllAvailable short-circuit into SyncSparkMicroBatchPlanner.latestOffset: when the limit is unbounded, advance the snapshot chain (metadata only, via nextValidSnapshot, preserving streaming-skip-overwrite-snapshots / streaming-skip-delete-snapshots semantics) to the last valid snapshot and set position from that one snapshot, skipping the intermediate per-file iteration. The change is localized to one method; add a unit test asserting the fast path returns the same offset as the full walk.
Related
Willingness to contribute
Yes — happy to submit the PR.
Apache Iceberg version: 1.10.1 (also present in the default planner on 1.11.0 /
main)Query engine: Spark (3.5)
Describe the bug / improvement
With an unbounded
ReadLimit(ReadLimit.allAvailable()— which Spark'sTrigger.AvailableNowpasses viaprepareForTriggerAvailableNowto compute its target offset),SyncSparkMicroBatchPlanner.latestOffset(the logic was inline inSparkMicroBatchStreambefore 1.11.0) walks every snapshot in(committed, head]on the driver — per snapshot it reads the manifest list (MicroBatches.skippedManifestIndexesFromSnapshot→Snapshot.dataManifests→ManifestLists.read) and opens the added manifests to iterate everyFileScanTask.Because
getMaxFiles(allAvailable) == Integer.MAX_VALUE(and likewisegetMaxRows), the per-file/-row accumulators never trip a limit and are discarded. The returned offset is(lastValidSnapshot, positionInThatSnapshot), which depends only on the final valid snapshot — so the entire per-file traversal of the intermediate gap is wasted work:O(snapshots-in-gap × files), single-threaded on the driver.Impact
On a high-commit-cadence table (e.g. a snapshot ~every 30s) consumed by a scheduled
Trigger.AvailableNowjob, the committed→head gap equals the snapshots committed during the previous run. A slow run grows the next gap, solatestOffsettime ramps run-over-run, and a single run that fails to commit makes the gap permanent (it never recovers without a checkpoint reset). In a production case the per-run offset computation ramped from ~8 min to ~30+ min over a few days until the job exceeded its scheduling budget and stalled — while the source table was fully maintained (snapshots bounded by retention, data files bounded by compaction, manifests bounded by auto-merge). A continuously-running stream avoids this only because it never accumulates a large gap.The async planner already does the right thing
AsyncSparkMicroBatchPlanner.latestOffsetshort-circuitsReadAllAvailable: it advances the snapshot chain vianextValidSnapshotto the latest valid snapshot and returns its offset — no manifest-list reads, no per-file iteration. But the async planner is opt-in (async-micro-batch-planning-enabled), and the defaultSyncSparkMicroBatchPlannerstill performs the full per-file gap walk for the unbounded case.Proposed fix
Mirror the async planner's
ReadAllAvailableshort-circuit intoSyncSparkMicroBatchPlanner.latestOffset: when the limit is unbounded, advance the snapshot chain (metadata only, vianextValidSnapshot, preservingstreaming-skip-overwrite-snapshots/streaming-skip-delete-snapshotssemantics) to the last valid snapshot and setpositionfrom that one snapshot, skipping the intermediate per-file iteration. The change is localized to one method; add a unit test asserting the fast path returns the same offset as the full walk.Related
Trigger.AvailableNow) — added the AvailableNow support that makes per-micro-batch processing respect rate limits and caches the target offset viaprepareForTriggerAvailableNow. This issue is a follow-up: the target-offset computation it relies on still callslatestOffset(..., allAvailable()), which in the default sync planner performs the full per-file gap walk described above.latestOffset/ skip path; precedent that this area accepts targeted perf fixes.Willingness to contribute
Yes — happy to submit the PR.