Skip to content

Core: Fixes OOM caused by Avro decoder caching#7791

Merged
pvary merged 7 commits into
apache:masterfrom
ConeyLiu:fixes-memory-leak
Jun 26, 2023
Merged

Core: Fixes OOM caused by Avro decoder caching#7791
pvary merged 7 commits into
apache:masterfrom
ConeyLiu:fixes-memory-leak

Conversation

@ConeyLiu

@ConeyLiu ConeyLiu commented Jun 7, 2023

Copy link
Copy Markdown
Contributor

Closes #5652

This memory leak especially happens flink sink job which could lead there are many schema and decoders cached in memory and OOM in the end. Such as the following heap dumps:

image

Here are two problems at here:

  1. The guava map uses identity to compare the key when using the weak key. Here are the details: https://github.com/google/guava/blob/master/guava/src/com/google/common/collect/MapMaker.java#LL58C39-L58C39.
  2. Copied from DecoderResolver may lead to OOM of flink jobs writing to iceberg tables #5652

The DecoderResolver holds a ThreadLocal variable of a two-layer map. The outer map has a weak key while the inner map has a strong one. As the inner map holds a reference to a Schema object, the outer map holding the same weak reference to the Schema object will not release the weak key. That leads to the OOM.

@github-actions github-actions Bot added the core label Jun 7, 2023
@ConeyLiu ConeyLiu changed the title Core: Fixes memory leak caused by Avro decoder caching Core: Fixes OOM caused by Avro decoder caching Jun 7, 2023
@ConeyLiu

ConeyLiu commented Jun 7, 2023

Copy link
Copy Markdown
Contributor Author

Hi @stevenzwu @nastra @rdblue @Fokko @aokolnychyi, could you help to review this when you are free? Thanks in advance.

@Fokko

Fokko commented Jun 7, 2023

Copy link
Copy Markdown
Contributor

Thanks @ConeyLiu for raising this, and sorry for the long wait. Did you confirm with the new WeakHashMap that the problem has been resolved?

At Avro, we had a similar issue: https://lists.apache.org/thread/8q3g304thhjgsfk7d6l62w706y365616 And the fix: apache/avro#2090

@Fokko

Fokko commented Jun 7, 2023

Copy link
Copy Markdown
Contributor

Hmm, that looks unrelated since it caches the fields that are singleton, and not the schema itself.

@Fokko Fokko left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks reasonable to me. The schema checks on identity, so that looks good. @ConeyLiu Do you think it is possible to add a check to see if the HashMap actually removes objects that are garbage collected? It would be nice to check its behavior now, but also we make sure that we keep this behavior in the future (it was broken in Avro along the way).

@Fokko Fokko requested a review from stevenzwu June 7, 2023 12:53
@ConeyLiu

ConeyLiu commented Jun 8, 2023

Copy link
Copy Markdown
Contributor Author

Thanks @Fokko for the review.

Did you confirm with the new WeakHashMap that the problem has been resolved?

It needs to run several weeks or months to trigger the problem. I tested locally and the caching works as expected.

Do you think it is possible to add a check to see if the HashMap actually removes objects that are garbage collected? It would be nice to check its behavior now, but also we make sure that we keep this behavior in the future (it was broken in Avro along the way).

Add the new UTs to cover this. Please take another look when you are free.

Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
Map<Schema, ResolvingDecoder> fileSchemaToResolver =
cache.computeIfAbsent(readSchema, k -> Maps.newHashMap());
cache.computeIfAbsent(readSchema, k -> new WeakHashMap<>());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to WeakHashMap to solve the problems mentioned in problem 2.

Comment thread core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java Outdated
@pvary

pvary commented Jun 9, 2023

Copy link
Copy Markdown
Contributor

@ConeyLiu: One last question: Did we run any performance tests to check how the changed caching effects the performance?

@ConeyLiu ConeyLiu force-pushed the fixes-memory-leak branch from a3c39f6 to 69b7984 Compare June 10, 2023 04:52
@ConeyLiu

ConeyLiu commented Jun 10, 2023

Copy link
Copy Markdown
Contributor Author

@pvary Thanks for pointing that out. This is really needed and found the potential problems.

Previously, we use java.util.WeakHashMap to replace the MapMaker().weakKeys().makeMap(), while the key equally checking is really slow compared with identity. The result is as follows:

// java.util.WeakHashMap
Benchmark                               Mode  Cnt   Score   Error  Units
ManifestReadBenchmark.readManifestFile    ss    5  12.896 ± 1.155   s/op

// MapMaker().weakKeys().makeMap()
Benchmark                               Mode  Cnt  Score   Error  Units
ManifestReadBenchmark.readManifestFile    ss    5  6.180 ± 0.067   s/op

Because for each avro record, we need to call the DecoderResolver.resolveAndRead. So I keep the top map with MapMaker().weakKeys().makeMap() while the inner map changed to java.util.WeakHashMap.
Here is the benchmarks:

// Master
Benchmark                               Mode  Cnt  Score   Error  Units
ManifestReadBenchmark.readManifestFile    ss    5  6.180 ± 0.067   s/op

// This change
Benchmark                               Mode  Cnt  Score   Error  Units
ManifestReadBenchmark.readManifestFile    ss    5  6.190 ± 0.069   s/op

@ConeyLiu

Copy link
Copy Markdown
Contributor Author

Actually, I am not sure why we need DecoderResolver.resolveAndRead for each record reading. Maybe we could create a ResolvingDecoder for a given AvroReader instance because for each instance the readSchema and fileSchema are unchanged.

Comment thread core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java Outdated
import org.junit.Before;
import org.junit.Test;

public class TestDecoderResolver {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice test

return identityKeys.size();
}

private void checkCachedSize(int expected) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this can be flaky on CI builds where servers are quite often overloaded. we can keep an eye on this. not a concern right now.

@pvary pvary merged commit 9f12b85 into apache:master Jun 26, 2023
@pvary

pvary commented Jun 26, 2023

Copy link
Copy Markdown
Contributor

Merged the change as there are 3 approvals, and no comments for quite a while.

Thanks @ConeyLiu for the PR and @Fokko and @stevenzwu for the reviews!

@ConeyLiu

Copy link
Copy Markdown
Contributor Author

Thanks @pvary for merging this, and also thanks @Fokko @stevenzwu @pvary for reviewing.

@ConeyLiu ConeyLiu deleted the fixes-memory-leak branch June 26, 2023 11:03
rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DecoderResolver may lead to OOM of flink jobs writing to iceberg tables

4 participants