Core: Fixes OOM caused by Avro decoder caching#7791
Conversation
|
Hi @stevenzwu @nastra @rdblue @Fokko @aokolnychyi, could you help to review this when you are free? Thanks in advance. |
|
Thanks @ConeyLiu for raising this, and sorry for the long wait. Did you confirm with the new At Avro, we had a similar issue: https://lists.apache.org/thread/8q3g304thhjgsfk7d6l62w706y365616 And the fix: apache/avro#2090 |
|
Hmm, that looks unrelated since it caches the fields that are singleton, and not the schema itself. |
Fokko
left a comment
There was a problem hiding this comment.
The fix looks reasonable to me. The schema checks on identity, so that looks good. @ConeyLiu Do you think it is possible to add a check to see if the HashMap actually removes objects that are garbage collected? It would be nice to check its behavior now, but also we make sure that we keep this behavior in the future (it was broken in Avro along the way).
|
Thanks @Fokko for the review.
It needs to run several weeks or months to trigger the problem. I tested locally and the caching works as expected.
Add the new UTs to cover this. Please take another look when you are free. |
| Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get(); | ||
| Map<Schema, ResolvingDecoder> fileSchemaToResolver = | ||
| cache.computeIfAbsent(readSchema, k -> Maps.newHashMap()); | ||
| cache.computeIfAbsent(readSchema, k -> new WeakHashMap<>()); |
There was a problem hiding this comment.
Change to WeakHashMap to solve the problems mentioned in problem 2.
|
@ConeyLiu: One last question: Did we run any performance tests to check how the changed caching effects the performance? |
a3c39f6 to
69b7984
Compare
|
@pvary Thanks for pointing that out. This is really needed and found the potential problems. Previously, we use Because for each avro record, we need to call the DecoderResolver.resolveAndRead. So I keep the top map with |
|
Actually, I am not sure why we need DecoderResolver.resolveAndRead for each record reading. Maybe we could create a |
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| public class TestDecoderResolver { |
| return identityKeys.size(); | ||
| } | ||
|
|
||
| private void checkCachedSize(int expected) { |
There was a problem hiding this comment.
wondering if this can be flaky on CI builds where servers are quite often overloaded. we can keep an eye on this. not a concern right now.
|
Merged the change as there are 3 approvals, and no comments for quite a while. Thanks @ConeyLiu for the PR and @Fokko and @stevenzwu for the reviews! |
|
Thanks @pvary for merging this, and also thanks @Fokko @stevenzwu @pvary for reviewing. |
Closes #5652
This memory leak especially happens flink sink job which could lead there are many schema and decoders cached in memory and OOM in the end. Such as the following heap dumps:
Here are two problems at here: