From 2071268c256f1b5af236b6cef73e5835ce9c5245 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 16 Sep 2021 12:41:40 -0700 Subject: [PATCH] Core: Add table property for ORC batch size --- .../org/apache/iceberg/TableProperties.java | 3 ++ .../apache/iceberg/spark/source/Reader.java | 40 ++++++++++++++++++- .../org/apache/iceberg/spark/Spark3Util.java | 27 +++++++++++-- .../iceberg/spark/source/SparkBatchScan.java | 17 ++++++-- 4 files changed, 79 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index e2433db5fe69..8988c3aa2c9e 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -132,6 +132,9 @@ private TableProperties() { public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled"; public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false; + public static final String ORC_BATCH_SIZE = "read.orc.vectorization.batch-size"; + public static final int ORC_BATCH_SIZE_DEFAULT = 5000; + public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled"; public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false; diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index edc03c3ef7e5..7d0c301f6547 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -98,7 +98,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; private final boolean localityPreferred; - private final int batchSize; private final boolean readTimestampWithoutZone; // lazy variables @@ -106,6 +105,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private StructType type = null; // cached because Spark accesses it multiple times private List tasks = null; // lazy cache of tasks private Boolean readUsingBatch = null; + private int batchSize = 0; Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) { this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -346,6 +346,10 @@ public boolean enableBatchRead() { this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + + if (readUsingBatch) { + this.batchSize = batchSize(allParquetFileScanTasks, allOrcFileScanTasks); + } } return readUsingBatch; } @@ -388,6 +392,40 @@ public boolean isVectorizationEnabled(FileFormat fileFormat) { } } + private int batchSize(boolean isParquetOnly, boolean isOrcOnly) { + if (isParquetOnly) { + return batchSize(FileFormat.PARQUET); + } else if (isOrcOnly) { + return batchSize(FileFormat.ORC); + } else { + return 0; + } + } + + private int batchSize(FileFormat fileFormat) { + String readOptionValue = options.asMap().get(SparkReadOptions.VECTORIZATION_BATCH_SIZE); + if (readOptionValue != null) { + return Integer.parseInt(readOptionValue); + } + + switch (fileFormat) { + case PARQUET: + return PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.PARQUET_BATCH_SIZE, + TableProperties.PARQUET_BATCH_SIZE_DEFAULT); + + case ORC: + return PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.ORC_BATCH_SIZE, + TableProperties.ORC_BATCH_SIZE_DEFAULT); + + default: + throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat); + } + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java index a2b1ff41f42e..4bf7a64c968c 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -531,10 +531,29 @@ public static boolean isVectorizationEnabled(FileFormat fileFormat, } } - public static int batchSize(Map properties, CaseInsensitiveStringMap readOptions) { - return readOptions.getInt(SparkReadOptions.VECTORIZATION_BATCH_SIZE, - PropertyUtil.propertyAsInt(properties, - TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + public static int batchSize(FileFormat fileFormat, Map properties, + CaseInsensitiveStringMap readOptions) { + String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE); + if (readOptionValue != null) { + return Integer.parseInt(readOptionValue); + } + + switch (fileFormat) { + case PARQUET: + return PropertyUtil.propertyAsInt( + properties, + TableProperties.PARQUET_BATCH_SIZE, + TableProperties.PARQUET_BATCH_SIZE_DEFAULT); + + case ORC: + return PropertyUtil.propertyAsInt( + properties, + TableProperties.ORC_BATCH_SIZE, + TableProperties.ORC_BATCH_SIZE_DEFAULT); + + default: + throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat); + } } public static Long propertyAsLong(CaseInsensitiveStringMap options, String property, Long defaultValue) { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index e9eda0b29394..3c256bc49248 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -72,7 +72,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final boolean localityPreferred; private final Schema expectedSchema; private final List filterExpressions; - private final int batchSize; private final boolean readTimestampWithoutZone; private final CaseInsensitiveStringMap options; @@ -87,7 +86,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.expectedSchema = expectedSchema; this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); - this.batchSize = Spark3Util.batchSize(table.properties(), options); this.options = options; RuntimeConfig sessionConf = SparkSession.active().conf(); @@ -180,7 +178,9 @@ public PartitionReaderFactory createReaderFactory() { boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); - return new ReaderFactory(readUsingBatch ? batchSize : 0); + int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0; + + return new ReaderFactory(batchSize); } private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) { @@ -195,6 +195,17 @@ private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) { } } + private int batchSize(boolean isParquetOnly, boolean isOrcOnly) { + Map properties = table.properties(); + if (isParquetOnly) { + return Spark3Util.batchSize(FileFormat.PARQUET, properties, options); + } else if (isOrcOnly) { + return Spark3Util.batchSize(FileFormat.ORC, properties, options); + } else { + return 0; + } + } + @Override public Statistics estimateStatistics() { // its a fresh table, no data