Flink: support limit pushdown in FLIP-27 source#10748
Conversation
fb1cc36 to
f8a67bc
Compare
f8a67bc to
2061332
Compare
| this.counter = new AtomicLong(0); | ||
| } | ||
|
|
||
| public boolean reachLimit() { |
There was a problem hiding this comment.
| public boolean reachLimit() { | |
| public boolean reachedLimit() { |
| private final AtomicLong counter; | ||
|
|
||
| private RecordLimiter(long limit) { | ||
| Preconditions.checkArgument(limit > 0, "Invalid limit: not a positive number"); |
There was a problem hiding this comment.
| Preconditions.checkArgument(limit > 0, "Invalid limit: not a positive number"); | |
| Preconditions.checkArgument(limit > 0, "Invalid limit: %s must a positive number", limit); |
| import org.apache.iceberg.flink.source.FileScanTaskReader; | ||
| import org.apache.iceberg.io.FileIO; | ||
|
|
||
| class LimitableDataIterator<T> extends DataIterator<T> { |
There was a problem hiding this comment.
it might make sense to add a small unit test to make sure this works as expected
|
|
||
| @Override | ||
| public T next() { | ||
| if (limiter != null) { |
There was a problem hiding this comment.
maybe instead of having null checks everywhere we could have a NOOP limiter that just wouldn't do anything, wdyt? That way we wouldn't need null checks everywhere and you'd end up using either the Noop limiter or a normal one with a valid limit
There was a problem hiding this comment.
I have moved the check of the long limit value (non-positive for unlimited) inside the RecordLimiter. I tried to avoid another class of NoopRecordLimiter. please see if this is inline with what you are thinking.
| @Override | ||
| public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) { | ||
| return new DataIterator<>( | ||
| return new LimitableDataIterator<>( |
There was a problem hiding this comment.
Is this limit applied after the residual filters?
There was a problem hiding this comment.
limit is applied after the residual filters. Residual filters are applied inside RowDataFileScanTaskReader, which is used and wrapped by the DataIterator
…e/reader/LimitableDataIterator.java Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
| // Note that this query doesn't have the limit clause in the SQL. | ||
| // This assertions works because limit is pushed down to the reader and | ||
| // reader parallelism is 1. |
There was a problem hiding this comment.
Is the limit applied for every reader? Would it mean that if we have 4 readers, and 4 splits, then we will have 4 records in the result instead of 1?
There was a problem hiding this comment.
That is correct. That is exactly what I am trying to clarify here, because the SQL query has no limit clause. if the source parallelism is 4, there could be 4 readers and each may emit 1 record. Note that limit pushdown is not guarantee that source only emit the limited number of record. Source only needs to try its best to break/stop early. The SQL limit clause and the SQL engine does the final result limit.
(cherry picked from commit f758593)
(cherry picked from commit 72b39ab)
(cherry picked from commit f758593)
(cherry picked from commit 72b39ab)
No description provided.