Kafka Connect: Don't check that consumer group is stable for coordinator leader election#14395
Conversation
|
@kumarpritam863 @bryanck will appreciate a review! 🙇 |
| } | ||
|
|
||
| private boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) { | ||
| private boolean hasLeaderPartition( |
There was a problem hiding this comment.
Maybe change the parameter name to onlyIfStable to be a little bit more clear? Alternatively the method could return an enum indicating the state.
There was a problem hiding this comment.
Good idea. Updated. ✅
|
@kumarpritam863 Do you have a minute to take a look at this PR? |
|
I realized if we always close the coordinator when a partiion is revoked; during race conditions, we can also end up with no coordinators if we check for stability during open call for new partitions. @kumarpritam863 let me know your thoughts! Thank you. 🙇 |
|
@fenil25 Thank you so much for this thoughtful change — really appreciate your effort! |
| private CommitterImpl committer; | ||
| private List<MemberDescription> members; | ||
| private List<TopicPartition> leaderAssignments; | ||
| private List<TopicPartition> nonLeaderAssignments; |
There was a problem hiding this comment.
I don't think these need to be member variables, I feel it would be better to initialize these in the test.
There was a problem hiding this comment.
I use these variables in both the tests so to avoid code repetition, I added them in Before call. I can add them individually in each test.
Perfect. Thanks a lot for the insights! Really appreciate your eyes. I added some tests 🙇♂️ |
|
LGTM. Thanks @fenil25 for the contribution, and @kumarpritam863 for the review! |
…tor leader election (apache#14395)
…tor leader election (apache#14395)
This PR - changed the logic of how we close the coordinator in ICR mode.
Initially leaderPartition was only checked while assigning new partitions to the task during rebalancing.
While closing the partitions for a task, the coordinator was killed. This did not work during incremental co-operative rebalancing. This PR - #12372 fixed this issue.
However, we are being too conservative right now in checking whether there is leaderPartition. When, Kafka connect consumers are rebalancing, there can be a possibility that partition-0 is assigned to task A, and we start the coordinator. During a rebalance operation, partition 0 can move from task A to task B. When it calls close() on A, the group is rebalancing, and we never close the coordinator.
This removes the criteria that consumer needs to be stable while opening or closing partitions. Even during continuous rebalance, Kafka will always provide new partitions in opening and closing call. So, we don't have to check whether consumer group is stable while closing the coordinator.
Currently, without this, during rebalancing, there is a race condition and there is a possibility of having multiple coordinators around. This fixes the issue.
This PR #13756 addresses some of the scenarios where there can be multiple coordinators.
This PR handles the case while rebalancing.