Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ private TableProperties() {
public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;

public static final String ORC_BATCH_SIZE = "read.orc.vectorization.batch-size";
public static final int ORC_BATCH_SIZE_DEFAULT = 5000;

public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

Expand Down
40 changes: 39 additions & 1 deletion spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
private final boolean localityPreferred;
private final int batchSize;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was this set before? Was it just ignored?

private final boolean readTimestampWithoutZone;

// lazy variables
private Schema schema = null;
private StructType type = null; // cached because Spark accesses it multiple times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
private Boolean readUsingBatch = null;
private int batchSize = 0;

Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
Expand Down Expand Up @@ -346,6 +346,10 @@ public boolean enableBatchRead() {

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));

if (readUsingBatch) {
this.batchSize = batchSize(allParquetFileScanTasks, allOrcFileScanTasks);
}
}
return readUsingBatch;
}
Expand Down Expand Up @@ -388,6 +392,40 @@ public boolean isVectorizationEnabled(FileFormat fileFormat) {
}
}

private int batchSize(boolean isParquetOnly, boolean isOrcOnly) {
if (isParquetOnly) {
return batchSize(FileFormat.PARQUET);
} else if (isOrcOnly) {
return batchSize(FileFormat.ORC);
} else {
return 0;
}
}

private int batchSize(FileFormat fileFormat) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a temporary solution until we consume #3132 that will put this logic in one place for Spark 2 and 3.

String readOptionValue = options.asMap().get(SparkReadOptions.VECTORIZATION_BATCH_SIZE);
if (readOptionValue != null) {
return Integer.parseInt(readOptionValue);
}

switch (fileFormat) {
case PARQUET:
return PropertyUtil.propertyAsInt(
table.properties(),
TableProperties.PARQUET_BATCH_SIZE,
TableProperties.PARQUET_BATCH_SIZE_DEFAULT);

case ORC:
return PropertyUtil.propertyAsInt(
table.properties(),
TableProperties.ORC_BATCH_SIZE,
TableProperties.ORC_BATCH_SIZE_DEFAULT);

default:
throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat);
}
}

private static void mergeIcebergHadoopConfs(
Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
Expand Down
27 changes: 23 additions & 4 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,29 @@ public static boolean isVectorizationEnabled(FileFormat fileFormat,
}
}

public static int batchSize(Map<String, String> properties, CaseInsensitiveStringMap readOptions) {
return readOptions.getInt(SparkReadOptions.VECTORIZATION_BATCH_SIZE,
PropertyUtil.propertyAsInt(properties,
TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
public static int batchSize(FileFormat fileFormat, Map<String, String> properties,
CaseInsensitiveStringMap readOptions) {
String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE);
if (readOptionValue != null) {
return Integer.parseInt(readOptionValue);
}

switch (fileFormat) {
case PARQUET:
return PropertyUtil.propertyAsInt(
properties,
TableProperties.PARQUET_BATCH_SIZE,
TableProperties.PARQUET_BATCH_SIZE_DEFAULT);

case ORC:
return PropertyUtil.propertyAsInt(
properties,
TableProperties.ORC_BATCH_SIZE,
TableProperties.ORC_BATCH_SIZE_DEFAULT);

default:
throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat);
}
}

public static Long propertyAsLong(CaseInsensitiveStringMap options, String property, Long defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private final boolean localityPreferred;
private final Schema expectedSchema;
private final List<Expression> filterExpressions;
private final int batchSize;
private final boolean readTimestampWithoutZone;
private final CaseInsensitiveStringMap options;

Expand All @@ -87,7 +86,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
this.batchSize = Spark3Util.batchSize(table.properties(), options);
this.options = options;

RuntimeConfig sessionConf = SparkSession.active().conf();
Expand Down Expand Up @@ -180,7 +178,9 @@ public PartitionReaderFactory createReaderFactory() {
boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));

return new ReaderFactory(readUsingBatch ? batchSize : 0);
int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0;

return new ReaderFactory(batchSize);
}

private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {
Expand All @@ -195,6 +195,17 @@ private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {
}
}

private int batchSize(boolean isParquetOnly, boolean isOrcOnly) {
Map<String, String> properties = table.properties();
if (isParquetOnly) {
return Spark3Util.batchSize(FileFormat.PARQUET, properties, options);
} else if (isOrcOnly) {
return Spark3Util.batchSize(FileFormat.ORC, properties, options);
} else {
return 0;
}
}

@Override
public Statistics estimateStatistics() {
// its a fresh table, no data
Expand Down