Skip to content

Flink: support limit pushdown in FLIP-27 source#10748

Merged
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:flip27-source-limit-pushdown
Jul 29, 2024
Merged

Flink: support limit pushdown in FLIP-27 source#10748
stevenzwu merged 4 commits into
apache:mainfrom
stevenzwu:flip27-source-limit-pushdown

Conversation

@stevenzwu

Copy link
Copy Markdown
Contributor

No description provided.

@stevenzwu stevenzwu requested review from nastra and pvary July 22, 2024 17:36
@github-actions github-actions Bot added the flink label Jul 22, 2024
@stevenzwu stevenzwu force-pushed the flip27-source-limit-pushdown branch from fb1cc36 to f8a67bc Compare July 22, 2024 17:59
@stevenzwu stevenzwu force-pushed the flip27-source-limit-pushdown branch from f8a67bc to 2061332 Compare July 22, 2024 18:44

@czy006 czy006 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

this.counter = new AtomicLong(0);
}

public boolean reachLimit() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public boolean reachLimit() {
public boolean reachedLimit() {

private final AtomicLong counter;

private RecordLimiter(long limit) {
Preconditions.checkArgument(limit > 0, "Invalid limit: not a positive number");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Preconditions.checkArgument(limit > 0, "Invalid limit: not a positive number");
Preconditions.checkArgument(limit > 0, "Invalid limit: %s must a positive number", limit);

import org.apache.iceberg.flink.source.FileScanTaskReader;
import org.apache.iceberg.io.FileIO;

class LimitableDataIterator<T> extends DataIterator<T> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to add a small unit test to make sure this works as expected


@Override
public T next() {
if (limiter != null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of having null checks everywhere we could have a NOOP limiter that just wouldn't do anything, wdyt? That way we wouldn't need null checks everywhere and you'd end up using either the Noop limiter or a normal one with a valid limit

@stevenzwu stevenzwu Jul 23, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the check of the long limit value (non-positive for unlimited) inside the RecordLimiter. I tried to avoid another class of NoopRecordLimiter. please see if this is inline with what you are thinking.

@Override
public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
return new DataIterator<>(
return new LimitableDataIterator<>(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this limit applied after the residual filters?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit is applied after the residual filters. Residual filters are applied inside RowDataFileScanTaskReader, which is used and wrapped by the DataIterator

…e/reader/LimitableDataIterator.java

Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
Comment on lines +53 to +55
// Note that this query doesn't have the limit clause in the SQL.
// This assertions works because limit is pushed down to the reader and
// reader parallelism is 1.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the limit applied for every reader? Would it mean that if we have 4 readers, and 4 splits, then we will have 4 records in the result instead of 1?

@stevenzwu stevenzwu Jul 26, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. That is exactly what I am trying to clarify here, because the SQL query has no limit clause. if the source parallelism is 4, there could be 4 readers and each may emit 1 record. Note that limit pushdown is not guarantee that source only emit the limited number of record. Source only needs to try its best to break/stop early. The SQL limit clause and the SQL engine does the final result limit.

@stevenzwu stevenzwu merged commit f758593 into apache:main Jul 29, 2024
@stevenzwu

Copy link
Copy Markdown
Contributor Author

thanks @czy006 @nastra @pvary for the review

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Jul 29, 2024
@stevenzwu stevenzwu deleted the flip27-source-limit-pushdown branch July 30, 2024 02:41
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants