From 1b20250354b80a843469a36fff13b091d0268ab3 Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Mon, 18 Jul 2022 10:46:52 +0800 Subject: [PATCH 1/5] Orc: Support row group bloom filters --- .../org/apache/iceberg/TableProperties.java | 4 + docs/configuration.md | 2 + .../main/java/org/apache/iceberg/orc/ORC.java | 53 ++++++++++++- .../apache/iceberg/orc/TestBloomFilter.java | 77 +++++++++++++++++++ 4 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 771f53d52728..d3bfbd20fa36 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -177,6 +177,10 @@ private TableProperties() {} public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null; public static final String ORC_STRIPE_SIZE_BYTES = "write.orc.stripe-size-bytes"; + + public static final String ORC_BLOOM_FILTER_COLUMNS = "write.orc.bloom.filter.columns"; + public static final String ORC_BLOOM_FILTER_FPP = "write.orc.bloom.filter.fpp"; + public static final String DELETE_ORC_STRIPE_SIZE_BYTES = "write.delete.orc.stripe-size-bytes"; public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024; // 64 MB diff --git a/docs/configuration.md b/docs/configuration.md index f0f0462f94a2..93f2d0b70090 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -64,6 +64,8 @@ Iceberg tables support table properties to configure table behavior, like the de | write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files | | write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none | | write.orc.compression-strategy | speed | ORC compression strategy: speed, compression | +| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created | +| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) | | write.location-provider.impl | null | Optional custom implementation for LocationProvider | | write.metadata.compression-codec | none | Metadata compression codec; none or gzip | | write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 79c83aec90b8..b895b3d20752 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE; import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES; import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; @@ -202,6 +204,8 @@ public FileAppender build() { OrcConf.COMPRESS.setString(conf, context.compressionKind().name()); OrcConf.COMPRESSION_STRATEGY.setString(conf, context.compressionStrategy().name()); OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite); + OrcConf.BLOOM_FILTER_COLUMNS.setString(conf, context.bloomFilterColumns()); + OrcConf.BLOOM_FILTER_FPP.setDouble(conf, context.bloomFilterFpp()); return new OrcFileAppender<>( schema, @@ -220,6 +224,10 @@ private static class Context { private final CompressionKind compressionKind; private final CompressionStrategy compressionStrategy; + private final String bloomFilterColumns; + + private final double bloomFilterFpp; + public long stripeSize() { return stripeSize; } @@ -240,17 +248,29 @@ public CompressionStrategy compressionStrategy() { return compressionStrategy; } + public String bloomFilterColumns() { + return bloomFilterColumns; + } + + public double bloomFilterFpp() { + return bloomFilterFpp; + } + private Context( long stripeSize, long blockSize, int vectorizedRowBatchSize, CompressionKind compressionKind, - CompressionStrategy compressionStrategy) { + CompressionStrategy compressionStrategy, + String bloomFilterColumns, + double bloomFilterFpp) { this.stripeSize = stripeSize; this.blockSize = blockSize; this.vectorizedRowBatchSize = vectorizedRowBatchSize; this.compressionKind = compressionKind; this.compressionStrategy = compressionStrategy; + this.bloomFilterColumns = bloomFilterColumns; + this.bloomFilterFpp = bloomFilterFpp; } static Context dataContext(Map config) { @@ -285,9 +305,24 @@ static Context dataContext(Map config) { strategyAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, strategyAsString); CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString); + String bloomFilterColumns = + PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), ""); + bloomFilterColumns = + PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns); + + double bloomFilterFpp = + PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05); + bloomFilterFpp = + PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); return new Context( - stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy); + stripeSize, + blockSize, + vectorizedRowBatchSize, + compressionKind, + compressionStrategy, + bloomFilterColumns, + bloomFilterFpp); } static Context deleteContext(Map config) { @@ -316,9 +351,21 @@ static Context deleteContext(Map config) { strategyAsString != null ? toCompressionStrategy(strategyAsString) : dataContext.compressionStrategy(); + String bloomFilterColumns = + PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), ""); + double bloomFilterFpp = + PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05); + bloomFilterFpp = + PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); return new Context( - stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy); + stripeSize, + blockSize, + vectorizedRowBatchSize, + compressionKind, + compressionStrategy, + bloomFilterColumns, + bloomFilterFpp); } private static CompressionKind toCompressionKind(String codecAsString) { diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java new file mode 100644 index 000000000000..88ccdb979841 --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.orc; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.lang.reflect.Field; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.types.Types; +import org.apache.orc.impl.WriterImpl; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestBloomFilter { + private static final Schema DATA_SCHEMA = + new Schema( + required(100, "id", Types.LongType.get()), + required(101, "name", Types.StringType.get()), + required(102, "price", Types.DoubleType.get())); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testWriteOption() throws Exception { + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + ORC.write(Files.localOutput(testFile)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(DATA_SCHEMA) + .set("write.orc.bloom.filter.columns", "id,name") + .set("write.orc.bloom.filter.fpp", "0.04") + .build()) { + + Class clazzOrcFileAppender = Class.forName("org.apache.iceberg.orc.OrcFileAppender"); + Field writerField = clazzOrcFileAppender.getDeclaredField("writer"); + writerField.setAccessible(true); + WriterImpl orcWriter = (WriterImpl) writerField.get(writer); + + Class clazzWriterImpl = Class.forName("org.apache.orc.impl.WriterImpl"); + Field bloomFilterColumnsField = clazzWriterImpl.getDeclaredField("bloomFilterColumns"); + Field bloomFilterFppField = clazzWriterImpl.getDeclaredField("bloomFilterFpp"); + bloomFilterColumnsField.setAccessible(true); + bloomFilterFppField.setAccessible(true); + boolean[] bloomFilterColumns = (boolean[]) bloomFilterColumnsField.get(orcWriter); + double bloomFilterFpp = (double) bloomFilterFppField.get(orcWriter); + + Assert.assertTrue(bloomFilterColumns[1]); + Assert.assertTrue(bloomFilterColumns[2]); + Assert.assertEquals(0.04, bloomFilterFpp, 1e-15); + } + } +} From 994aa9eadec93e22375e26708a779da41e7d1d5e Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Mon, 22 Aug 2022 10:27:53 +0800 Subject: [PATCH 2/5] Add and remove blank lines, define a constant --- .../org/apache/iceberg/TableProperties.java | 2 ++ .../main/java/org/apache/iceberg/orc/ORC.java | 17 ++++++----------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index d3bfbd20fa36..11ac6ebabe28 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -179,7 +179,9 @@ private TableProperties() {} public static final String ORC_STRIPE_SIZE_BYTES = "write.orc.stripe-size-bytes"; public static final String ORC_BLOOM_FILTER_COLUMNS = "write.orc.bloom.filter.columns"; + public static final String ORC_BLOOM_FILTER_FPP = "write.orc.bloom.filter.fpp"; + public static final double BLOOM_FILTER_FPP_DEFAULT = 0.05; public static final String DELETE_ORC_STRIPE_SIZE_BYTES = "write.delete.orc.stripe-size-bytes"; public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024; // 64 MB diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index b895b3d20752..223864fbccd3 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.orc; +import static org.apache.iceberg.TableProperties.BLOOM_FILTER_FPP_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES; import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; @@ -223,9 +224,7 @@ private static class Context { private final int vectorizedRowBatchSize; private final CompressionKind compressionKind; private final CompressionStrategy compressionStrategy; - private final String bloomFilterColumns; - private final double bloomFilterFpp; public long stripeSize() { @@ -305,13 +304,15 @@ static Context dataContext(Map config) { strategyAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, strategyAsString); CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString); + String bloomFilterColumns = PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), ""); bloomFilterColumns = PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns); double bloomFilterFpp = - PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05); + PropertyUtil.propertyAsDouble( + config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), BLOOM_FILTER_FPP_DEFAULT); bloomFilterFpp = PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); @@ -351,12 +352,6 @@ static Context deleteContext(Map config) { strategyAsString != null ? toCompressionStrategy(strategyAsString) : dataContext.compressionStrategy(); - String bloomFilterColumns = - PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), ""); - double bloomFilterFpp = - PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05); - bloomFilterFpp = - PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); return new Context( stripeSize, @@ -364,8 +359,8 @@ static Context deleteContext(Map config) { vectorizedRowBatchSize, compressionKind, compressionStrategy, - bloomFilterColumns, - bloomFilterFpp); + dataContext.bloomFilterColumns(), + dataContext.bloomFilterFpp()); } private static CompressionKind toCompressionKind(String codecAsString) { From 02829dfacccdf63c95614a7a73c87b55ce78f681 Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Thu, 25 Aug 2022 11:44:05 +0800 Subject: [PATCH 3/5] Add fpp check --- .../main/java/org/apache/iceberg/orc/ORC.java | 3 +++ .../apache/iceberg/orc/TestBloomFilter.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 223864fbccd3..727af79788e9 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -315,6 +315,9 @@ static Context dataContext(Map config) { config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), BLOOM_FILTER_FPP_DEFAULT); bloomFilterFpp = PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); + Preconditions.checkArgument( + bloomFilterFpp > 0.0 && bloomFilterFpp < 1.0, + "Bloom filter fpp must be > 0.0 and < 1.0"); return new Context( stripeSize, diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java index 88ccdb979841..cd882210bd36 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java @@ -74,4 +74,22 @@ public void testWriteOption() throws Exception { Assert.assertEquals(0.04, bloomFilterFpp, 1e-15); } } + + @Test + public void testInvalidFppOption() throws Exception { + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + ORC.write(Files.localOutput(testFile)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(DATA_SCHEMA) + .set("write.orc.bloom.filter.columns", "id,name") + .set("write.orc.bloom.filter.fpp", "-1") + .build()) { + Assert.fail("Expected exception"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("Bloom filter fpp must be > 0.0 and < 1.0")); + } + } } From e2b1212699b870506e158722e481e2b911ed620c Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Fri, 26 Aug 2022 14:30:41 +0800 Subject: [PATCH 4/5] Fix constants --- .../main/java/org/apache/iceberg/TableProperties.java | 3 ++- orc/src/main/java/org/apache/iceberg/orc/ORC.java | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 11ac6ebabe28..027ed30b7dcd 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -179,9 +179,10 @@ private TableProperties() {} public static final String ORC_STRIPE_SIZE_BYTES = "write.orc.stripe-size-bytes"; public static final String ORC_BLOOM_FILTER_COLUMNS = "write.orc.bloom.filter.columns"; + public static final String ORC_BLOOM_FILTER_COLUMNS_DEFAULT = ""; public static final String ORC_BLOOM_FILTER_FPP = "write.orc.bloom.filter.fpp"; - public static final double BLOOM_FILTER_FPP_DEFAULT = 0.05; + public static final double ORC_BLOOM_FILTER_FPP_DEFAULT = 0.05; public static final String DELETE_ORC_STRIPE_SIZE_BYTES = "write.delete.orc.stripe-size-bytes"; public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024; // 64 MB diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 727af79788e9..5b2b877a97d4 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.orc; -import static org.apache.iceberg.TableProperties.BLOOM_FILTER_FPP_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES; import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; @@ -27,7 +26,9 @@ import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES; import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP; +import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; @@ -306,13 +307,16 @@ static Context dataContext(Map config) { CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString); String bloomFilterColumns = - PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), ""); + PropertyUtil.propertyAsString( + config, + OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), + ORC_BLOOM_FILTER_COLUMNS_DEFAULT); bloomFilterColumns = PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns); double bloomFilterFpp = PropertyUtil.propertyAsDouble( - config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), BLOOM_FILTER_FPP_DEFAULT); + config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), ORC_BLOOM_FILTER_FPP_DEFAULT); bloomFilterFpp = PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp); Preconditions.checkArgument( From 097ef5c1267acff5454bbc2a682f84c03e6473cf Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Wed, 19 Oct 2022 15:53:55 +0800 Subject: [PATCH 5/5] Add test case to validate bloom in files --- build.gradle | 1 + .../apache/iceberg/orc/TestBloomFilter.java | 56 ++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 2a413004cbb9..026bc3ddd0af 100644 --- a/build.gradle +++ b/build.gradle @@ -545,6 +545,7 @@ project(':iceberg-orc') { testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(':iceberg-common') + testImplementation 'org.apache.orc:orc-tools' } } diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java index cd882210bd36..c27ce9b18a58 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java @@ -22,12 +22,24 @@ import java.io.File; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto; +import org.apache.orc.Reader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.OrcIndex; +import org.apache.orc.impl.RecordReaderImpl; import org.apache.orc.impl.WriterImpl; import org.junit.Assert; import org.junit.Rule; @@ -48,8 +60,9 @@ public void testWriteOption() throws Exception { File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); + OutputFile outFile = Files.localOutput(testFile); try (FileAppender writer = - ORC.write(Files.localOutput(testFile)) + ORC.write(outFile) .createWriterFunc(GenericOrcWriter::buildWriter) .schema(DATA_SCHEMA) .set("write.orc.bloom.filter.columns", "id,name") @@ -69,9 +82,50 @@ public void testWriteOption() throws Exception { boolean[] bloomFilterColumns = (boolean[]) bloomFilterColumnsField.get(orcWriter); double bloomFilterFpp = (double) bloomFilterFppField.get(orcWriter); + // Validate whether the bloom filters are set in ORC SDK or not Assert.assertTrue(bloomFilterColumns[1]); Assert.assertTrue(bloomFilterColumns[2]); Assert.assertEquals(0.04, bloomFilterFpp, 1e-15); + + Record recordTemplate = GenericRecord.create(DATA_SCHEMA); + Record record1 = recordTemplate.copy("id", 1L, "name", "foo", "price", 1.0); + Record record2 = recordTemplate.copy("id", 2L, "name", "bar", "price", 2.0); + writer.add(record1); + writer.add(record2); + } + + Class clazzFileDump = Class.forName("org.apache.orc.tools.FileDump"); + Method getFormattedBloomFilters = + clazzFileDump.getDeclaredMethod( + "getFormattedBloomFilters", + int.class, + OrcIndex.class, + OrcFile.WriterVersion.class, + TypeDescription.Category.class, + OrcProto.ColumnEncoding.class); + getFormattedBloomFilters.setAccessible(true); + + try (Reader reader = + OrcFile.createReader( + new Path(outFile.location()), new OrcFile.ReaderOptions(new Configuration())); ) { + boolean[] readCols = new boolean[] {false, true, true, false}; + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + OrcIndex indices = rows.readRowIndex(0, null, readCols); + StripeInformation stripe = reader.getStripes().get(0); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + + String bloomFilterString = + (String) + getFormattedBloomFilters.invoke( + null, + 1, + indices, + reader.getWriterVersion(), + reader.getSchema().findSubtype(1).getCategory(), + footer.getColumns(1)); + + // Validate whether the bloom filters are written ORC files or not + Assert.assertTrue(bloomFilterString.contains("Bloom filters for column")); } }