blob: f15a279e6aaec8239adcef56af66276fad59293d [file] [log] [blame]
diff --git a/build.gradle b/build.gradle
index 6bc052885fc..db2aca3a5ee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -878,6 +878,13 @@ project(':iceberg-parquet') {
implementation project(':iceberg-core')
implementation project(':iceberg-common')
+ implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") {
+ exclude group: 'org.apache.arrow'
+ exclude group: 'org.apache.parquet'
+ exclude group: 'org.apache.spark'
+ exclude group: 'org.apache.iceberg'
+ }
+
implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index eeabe54f5f0..867018058ee 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0"
bson-ver = "4.11.5"
caffeine = "2.9.3"
calcite = "1.40.0"
-comet = "0.8.1"
+comet = "0.12.0"
datasketches = "6.2.0"
delta-standalone = "3.3.2"
delta-spark = "3.3.2"
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
new file mode 100644
index 00000000000..ddf6c7de5ae
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+public class CometTypeUtils {
+
+ private CometTypeUtils() {}
+
+ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
+
+ String[] path = descriptor.getPath();
+ PrimitiveType primitiveType = descriptor.getPrimitiveType();
+ String physicalType = primitiveType.getPrimitiveTypeName().name();
+
+ int typeLength =
+ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ ? primitiveType.getTypeLength()
+ : 0;
+
+ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED;
+
+ // ToDo: extract this into a Util method
+ String logicalTypeName = null;
+ Map<String, String> logicalTypeParams = Maps.newHashMap();
+ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation();
+
+ if (logicalType != null) {
+ logicalTypeName = logicalType.getClass().getSimpleName();
+
+ // Handle specific logical types
+ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
+ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision()));
+ logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
+ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", timestamp.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
+ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", time.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
+ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
+ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth()));
+ }
+ }
+
+ return new ParquetColumnSpec(
+ 1, // ToDo: pass in the correct id
+ path,
+ physicalType,
+ typeLength,
+ isRepeated,
+ descriptor.getMaxDefinitionLevel(),
+ descriptor.getMaxRepetitionLevel(),
+ logicalTypeName,
+ logicalTypeParams);
+ }
+
+ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) {
+ PrimitiveType.PrimitiveTypeName primType =
+ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
+
+ Type.Repetition repetition;
+ if (columnSpec.getMaxRepetitionLevel() > 0) {
+ repetition = Type.Repetition.REPEATED;
+ } else if (columnSpec.getMaxDefinitionLevel() > 0) {
+ repetition = Type.Repetition.OPTIONAL;
+ } else {
+ repetition = Type.Repetition.REQUIRED;
+ }
+
+ String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
+ // Reconstruct the logical type from parameters
+ LogicalTypeAnnotation logicalType = null;
+ if (columnSpec.getLogicalTypeName() != null) {
+ logicalType =
+ reconstructLogicalType(
+ columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams());
+ }
+
+ PrimitiveType primitiveType;
+ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ primitiveType =
+ org.apache.parquet.schema.Types.primitive(primType, repetition)
+ .length(columnSpec.getTypeLength())
+ .as(logicalType)
+ .id(columnSpec.getFieldId())
+ .named(name);
+ } else {
+ primitiveType =
+ Types.primitive(primType, repetition)
+ .as(logicalType)
+ .id(columnSpec.getFieldId())
+ .named(name);
+ }
+
+ return new ColumnDescriptor(
+ columnSpec.getPath(),
+ primitiveType,
+ columnSpec.getMaxRepetitionLevel(),
+ columnSpec.getMaxDefinitionLevel());
+ }
+
+ private static LogicalTypeAnnotation reconstructLogicalType(
+ String logicalTypeName, java.util.Map<String, String> params) {
+
+ switch (logicalTypeName) {
+ // MAP
+ case "MapLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.mapType();
+
+ // LIST
+ case "ListLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.listType();
+
+ // STRING
+ case "StringLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.stringType();
+
+ // MAP_KEY_VALUE
+ case "MapKeyValueLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
+
+ // ENUM
+ case "EnumLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.enumType();
+
+ // DECIMAL
+ case "DecimalLogicalTypeAnnotation":
+ if (!params.containsKey("scale") || !params.containsKey("precision")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for DecimalLogicalTypeAnnotation: " + params);
+ }
+ int scale = Integer.parseInt(params.get("scale"));
+ int precision = Integer.parseInt(params.get("precision"));
+ return LogicalTypeAnnotation.decimalType(scale, precision);
+
+ // DATE
+ case "DateLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.dateType();
+
+ // TIME
+ case "TimeLogicalTypeAnnotation":
+ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for TimeLogicalTypeAnnotation: " + params);
+ }
+
+ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
+ String timeUnitStr = params.get("unit");
+
+ LogicalTypeAnnotation.TimeUnit timeUnit;
+ switch (timeUnitStr) {
+ case "MILLIS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
+ break;
+ case "MICROS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
+ break;
+ case "NANOS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr);
+ }
+ return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
+
+ // TIMESTAMP
+ case "TimestampLogicalTypeAnnotation":
+ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for TimestampLogicalTypeAnnotation: " + params);
+ }
+ boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
+ String unitStr = params.get("unit");
+
+ LogicalTypeAnnotation.TimeUnit unit;
+ switch (unitStr) {
+ case "MILLIS":
+ unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
+ break;
+ case "MICROS":
+ unit = LogicalTypeAnnotation.TimeUnit.MICROS;
+ break;
+ case "NANOS":
+ unit = LogicalTypeAnnotation.TimeUnit.NANOS;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr);
+ }
+ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
+
+ // INTEGER
+ case "IntLogicalTypeAnnotation":
+ if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for IntLogicalTypeAnnotation: " + params);
+ }
+ boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
+ int bitWidth = Integer.parseInt(params.get("bitWidth"));
+ return LogicalTypeAnnotation.intType(bitWidth, isSigned);
+
+ // JSON
+ case "JsonLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.jsonType();
+
+ // BSON
+ case "BsonLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.bsonType();
+
+ // UUID
+ case "UUIDLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.uuidType();
+
+ // INTERVAL
+ case "IntervalLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
+
+ default:
+ throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
+ }
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
new file mode 100644
index 00000000000..a3cba401827
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.comet.parquet.FileReader;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.comet.parquet.ReadOptions;
+import org.apache.comet.parquet.RowGroupReader;
+import org.apache.comet.parquet.WrappedInputFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class CometVectorizedParquetReader<T> extends CloseableGroup
+ implements CloseableIterable<T> {
+ private final InputFile input;
+ private final ParquetReadOptions options;
+ private final Schema expectedSchema;
+ private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
+ private final Expression filter;
+ private final boolean reuseContainers;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final NameMapping nameMapping;
+ private final Map<String, String> properties;
+ private Long start = null;
+ private Long length = null;
+ private ByteBuffer fileEncryptionKey = null;
+ private ByteBuffer fileAADPrefix = null;
+
+ public CometVectorizedParquetReader(
+ InputFile input,
+ Schema expectedSchema,
+ ParquetReadOptions options,
+ Function<MessageType, VectorizedReader<?>> readerFunc,
+ NameMapping nameMapping,
+ Expression filter,
+ boolean reuseContainers,
+ boolean caseSensitive,
+ int maxRecordsPerBatch,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ this.input = input;
+ this.expectedSchema = expectedSchema;
+ this.options = options;
+ this.batchReaderFunc = readerFunc;
+ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+ this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+ this.reuseContainers = reuseContainers;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = maxRecordsPerBatch;
+ this.nameMapping = nameMapping;
+ this.properties = properties;
+ this.start = start;
+ this.length = length;
+ this.fileEncryptionKey = fileEncryptionKey;
+ this.fileAADPrefix = fileAADPrefix;
+ }
+
+ private ReadConf conf = null;
+
+ private ReadConf init() {
+ if (conf == null) {
+ ReadConf readConf =
+ new ReadConf(
+ input,
+ options,
+ expectedSchema,
+ filter,
+ null,
+ batchReaderFunc,
+ nameMapping,
+ reuseContainers,
+ caseSensitive,
+ batchSize);
+ this.conf = readConf.copy();
+ return readConf;
+ }
+ return conf;
+ }
+
+ @Override
+ public CloseableIterator<T> iterator() {
+ FileIterator<T> iter =
+ new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix);
+ addCloseable(iter);
+ return iter;
+ }
+
+ private static class FileIterator<T> implements CloseableIterator<T> {
+ // private final ParquetFileReader reader;
+ private final boolean[] shouldSkip;
+ private final VectorizedReader<T> model;
+ private final long totalValues;
+ private final int batchSize;
+ private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
+ private final boolean reuseContainers;
+ private int nextRowGroup = 0;
+ private long nextRowGroupStart = 0;
+ private long valuesRead = 0;
+ private T last = null;
+ private final FileReader cometReader;
+ private ReadConf conf;
+
+ FileIterator(
+ ReadConf conf,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ this.shouldSkip = conf.shouldSkip();
+ this.totalValues = conf.totalValues();
+ this.reuseContainers = conf.reuseContainers();
+ this.model = conf.vectorizedModel();
+ this.batchSize = conf.batchSize();
+ this.model.setBatchSize(this.batchSize);
+ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
+ this.cometReader =
+ newCometReader(
+ conf.file(),
+ conf.projection(),
+ properties,
+ start,
+ length,
+ fileEncryptionKey,
+ fileAADPrefix);
+ this.conf = conf;
+ }
+
+ private FileReader newCometReader(
+ InputFile file,
+ MessageType projection,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ try {
+ ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build();
+
+ FileReader fileReader =
+ new FileReader(
+ new WrappedInputFile(file),
+ cometOptions,
+ properties,
+ start,
+ length,
+ ByteBuffers.toByteArray(fileEncryptionKey),
+ ByteBuffers.toByteArray(fileAADPrefix));
+
+ List<ColumnDescriptor> columnDescriptors = projection.getColumns();
+
+ List<ParquetColumnSpec> specs = Lists.newArrayList();
+
+ for (ColumnDescriptor descriptor : columnDescriptors) {
+ ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
+ specs.add(spec);
+ }
+
+ fileReader.setRequestedSchemaFromSpecs(specs);
+ return fileReader;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return valuesRead < totalValues;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (valuesRead >= nextRowGroupStart) {
+ advance();
+ }
+
+ // batchSize is an integer, so casting to integer is safe
+ int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);
+ if (reuseContainers) {
+ this.last = model.read(last, numValuesToRead);
+ } else {
+ this.last = model.read(null, numValuesToRead);
+ }
+ valuesRead += numValuesToRead;
+
+ return last;
+ }
+
+ private void advance() {
+ while (shouldSkip[nextRowGroup]) {
+ nextRowGroup += 1;
+ cometReader.skipNextRowGroup();
+ }
+ RowGroupReader pages;
+ try {
+ pages = cometReader.readNextRowGroup();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+
+ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
+ nextRowGroupStart += pages.getRowCount();
+ nextRowGroup += 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ model.close();
+ cometReader.close();
+ if (conf != null && conf.reader() != null) {
+ conf.reader().close();
+ }
+ }
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 6f68fbe150f..b740543f3c9 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -1161,6 +1161,7 @@ public class Parquet {
private NameMapping nameMapping = null;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;
+ private boolean isComet;
private ReadBuilder(InputFile file) {
this.file = file;
@@ -1205,6 +1206,11 @@ public class Parquet {
return this;
}
+ public ReadBuilder enableComet(boolean enableComet) {
+ this.isComet = enableComet;
+ return this;
+ }
+
/**
* @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead
*/
@@ -1300,7 +1306,7 @@ public class Parquet {
}
@Override
- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
@@ -1352,16 +1358,35 @@ public class Parquet {
}
if (batchedReaderFunc != null) {
- return new VectorizedParquetReader<>(
- file,
- schema,
- options,
- batchedReaderFunc,
- mapping,
- filter,
- reuseContainers,
- caseSensitive,
- maxRecordsPerBatch);
+ if (isComet) {
+ LOG.info("Comet enabled");
+ return new CometVectorizedParquetReader<>(
+ file,
+ schema,
+ options,
+ batchedReaderFunc,
+ mapping,
+ filter,
+ reuseContainers,
+ caseSensitive,
+ maxRecordsPerBatch,
+ properties,
+ start,
+ length,
+ fileEncryptionKey,
+ fileAADPrefix);
+ } else {
+ return new VectorizedParquetReader<>(
+ file,
+ schema,
+ options,
+ batchedReaderFunc,
+ mapping,
+ filter,
+ reuseContainers,
+ caseSensitive,
+ maxRecordsPerBatch);
+ }
} else {
Function<MessageType, ParquetValueReader<?>> readBuilder =
readerFuncWithSchema != null
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
index 1fb2372ba56..142e5fbadf1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -157,6 +157,14 @@ class ReadConf<T> {
return newReader;
}
+ InputFile file() {
+ return file;
+ }
+
+ MessageType projection() {
+ return projection;
+ }
+
ParquetValueReader<T> model() {
return model;
}
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 69700d84366..49ea338a458 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -264,6 +264,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
index 4c1a5095916..964f196daad 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
@@ -59,6 +59,16 @@ public abstract class ExtensionsTestBase extends CatalogTestBase {
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config(
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean()))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
index ecf9e6f8a59..0f8cced69aa 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
@@ -56,6 +56,16 @@ public class TestCallStatementParser {
.master("local[2]")
.config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName())
.config("spark.extra.prop", "value")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
TestCallStatementParser.parser = spark.sessionState().sqlParser();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
index 64edb1002e9..5bb449f1ac7 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
@@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", catalogWarehouse())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local");
spark = builder.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index 77b79384a6d..08f7de1c0de 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark {
"spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]");
spark = builder.getOrCreate();
Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
index c6794e43c63..f7359197407 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
@@ -239,6 +239,16 @@ public class DVReaderBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
index ac74fb5a109..e011b8b2510 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
@@ -223,6 +223,16 @@ public class DVWriterBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index 68c537e34a4..f66be2f3896 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark {
}
protected void setupSpark(boolean enableDictionaryEncoding) {
- SparkSession.Builder builder = SparkSession.builder().config("spark.ui.enabled", false);
+ SparkSession.Builder builder =
+ SparkSession.builder()
+ .config("spark.ui.enabled", false)
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true");
if (!enableDictionaryEncoding) {
builder
.config("parquet.dictionary.page.size", "1")
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
index 81b7d83a707..eba1a2a0fb1 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
@@ -19,18 +19,22 @@
package org.apache.iceberg.spark.data.vectorized;
import java.io.IOException;
+import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.comet.parquet.RowGroupReader;
import org.apache.comet.parquet.TypeUtil;
import org.apache.comet.parquet.Utils;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
@@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
private final ColumnDescriptor descriptor;
private final DataType sparkType;
+ private final int fieldId;
// The delegated ColumnReader from Comet side
private AbstractColumnReader delegate;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;
private CometSchemaImporter importer;
+ private ParquetColumnSpec spec;
- CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
+ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) {
this.sparkType = sparkType;
this.descriptor = descriptor;
+ this.fieldId = fieldId;
}
CometColumnReader(Types.NestedField field) {
DataType dataType = SparkSchemaUtil.convert(field.type());
StructField structField = new StructField(field.name(), dataType, false, Metadata.empty());
this.sparkType = dataType;
- this.descriptor = TypeUtil.convertToParquet(structField);
+ this.descriptor =
+ CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField));
+ this.fieldId = field.fieldId();
}
public AbstractColumnReader delegate() {
@@ -92,7 +101,26 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
}
this.importer = new CometSchemaImporter(new RootAllocator());
- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
+
+ spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
+
+ boolean useLegacyTime =
+ Boolean.parseBoolean(
+ SQLConf.get()
+ .getConfString(
+ CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false"));
+ boolean useLazyMaterialization =
+ Boolean.parseBoolean(
+ SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false"));
+ this.delegate =
+ Utils.getColumnReader(
+ sparkType,
+ spec,
+ importer,
+ batchSize,
+ true, // Comet sets this to true for native execution
+ useLazyMaterialization,
+ useLegacyTime);
this.initialized = true;
}
@@ -111,9 +139,9 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
* <p>NOTE: this should be called before reading a new Parquet column chunk, and after {@link
* CometColumnReader#reset} is called.
*/
- public void setPageReader(PageReader pageReader) throws IOException {
+ public void setPageReader(RowGroupReader pageStore) throws IOException {
Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first");
- ((ColumnReader) delegate).setPageReader(pageReader);
+ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec);
}
@Override
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
index 04ac69476ad..916face2bf2 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
@@ -22,8 +22,12 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import org.apache.comet.CometRuntimeException;
import org.apache.comet.parquet.AbstractColumnReader;
-import org.apache.comet.parquet.BatchReader;
+import org.apache.comet.parquet.IcebergCometBatchReader;
+import org.apache.comet.parquet.RowGroupReader;
+import org.apache.comet.vector.CometSelectionVector;
+import org.apache.comet.vector.CometVector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.VectorizedReader;
@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
// calling BatchReader.nextBatch, the isDeleted value is not yet available, so
// DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is
// available.
- private final BatchReader delegate;
+ private final IcebergCometBatchReader delegate;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
this.hasIsDeletedColumn =
readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);
- AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
- this.delegate = new BatchReader(abstractColumnReaders);
- delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
+ this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema));
}
@Override
@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
&& !(readers[i] instanceof CometPositionColumnReader)
&& !(readers[i] instanceof CometDeleteColumnReader)) {
readers[i].reset();
- readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
+ readers[i].setPageReader((RowGroupReader) pageStore);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e);
}
}
+ AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length];
for (int i = 0; i < readers.length; i++) {
- delegate.getColumnReaders()[i] = this.readers[i].delegate();
+ delegateReaders[i] = readers[i].delegate();
}
+ delegate.init(delegateReaders);
+
this.rowStartPosInBatch =
- pageStore
+ ((RowGroupReader) pageStore)
.getRowIndexOffset()
.orElseThrow(
() ->
@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
Pair<int[], Integer> pair = buildRowIdMapping(vectors);
if (pair != null) {
int[] rowIdMapping = pair.first();
- numLiveRows = pair.second();
- for (int i = 0; i < vectors.length; i++) {
- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
+ if (pair.second() != null) {
+ numLiveRows = pair.second();
+ for (int i = 0; i < vectors.length; i++) {
+ if (vectors[i] instanceof CometVector) {
+ vectors[i] =
+ new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows);
+ } else {
+ throw new CometRuntimeException(
+ "Unsupported column vector type: " + vectors[i].getClass());
+ }
+ }
}
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
index 047c96314b1..88d691a607a 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.comet.parquet.ConstantColumnReader;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -34,7 +35,11 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
- new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
+ new ConstantColumnReader(
+ sparkType(),
+ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()),
+ convertToSparkValue(value),
+ false));
}
@Override
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
index 6235bfe4865..cba108e4326 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
DeleteColumnReader() {
super(
DataTypes.BooleanType,
- TypeUtil.convertToParquet(
+ TypeUtil.convertToParquetSpec(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
- false /* isConstant = false */);
+ false /* isConstant */);
this.isDeleted = new boolean[0];
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
index bcc0e514c28..98e80068c51 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
import org.apache.comet.parquet.MetadataColumnReader;
import org.apache.comet.parquet.Native;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.types.DataTypes;
@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader {
PositionColumnReader(ColumnDescriptor descriptor) {
super(
DataTypes.LongType,
- descriptor,
+ CometTypeUtils.descriptorToParquetColumnSpec(descriptor),
false /* useDecimal128 = false */,
false /* isConstant = false */);
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
index d36f1a72747..56f8c9bff93 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReade
return null;
}
- return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc);
+ return new CometColumnReader(
+ SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId);
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a0f45e7610a..473b34bb0f3 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -111,6 +111,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
// read performance as every batch read doesn't have to pay the cost of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
+ .enableComet(parquetConf.readerType() == ParquetReaderType.COMET)
.build();
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 0626d0b4398..9ec8f534669 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -175,11 +175,11 @@ class SparkBatch implements Batch {
return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId());
}
- private boolean useCometBatchReads() {
+ protected boolean useCometBatchReads() {
return readConf.parquetVectorizationEnabled()
&& readConf.parquetReaderType() == ParquetReaderType.COMET
&& expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
- && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
+ && taskGroups.stream().allMatch(this::supportsCometBatchReads);
}
private boolean supportsCometBatchReads(Types.NestedField field) {
@@ -189,6 +189,21 @@ class SparkBatch implements Batch {
&& field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
}
+ private boolean supportsCometBatchReads(ScanTask task) {
+ if (task instanceof ScanTaskGroup) {
+ ScanTaskGroup<?> taskGroup = (ScanTaskGroup<?>) task;
+ return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads);
+
+ } else if (task.isFileScanTask() && !task.isDataTask()) {
+ FileScanTask fileScanTask = task.asFileScanTask();
+ // Comet can't handle delete files for now
+ return fileScanTask.file().format() == FileFormat.PARQUET;
+
+ } else {
+ return false;
+ }
+ }
+
// conditions for using ORC batch reads:
// - ORC vectorization is enabled
// - all tasks are of type FileScanTask and read only ORC files with no delete files
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 106b296de09..967b0d41d08 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.comet.parquet.SupportsComet;
import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class SparkScan implements Scan, SupportsReportStatistics {
+abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet {
private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
private static final String NDV_KEY = "ndv";
@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
return splitSize;
}
}
+
+ @Override
+ public boolean isCometEnabled() {
+ SparkBatch batch = (SparkBatch) this.toBatch();
+ return batch.useCometBatchReads();
+ }
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
index 404ba728460..26d6f9b613f 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
.master("local[2]")
.config("spark.serializer", serializer)
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
index 659507e4c5e..eb9cedc34c5 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
index a218f965ea6..395c02441e7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
index 2665d7ba8d3..306e859ce1a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
index daf4e29ac07..fc9ee40c502 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
@@ -79,6 +79,17 @@ public abstract class TestBase extends SparkTestHelperBase {
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .config("spark.comet.exec.broadcastExchange.enabled", "false")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
index 973a17c9a38..dd0fd5cc9aa 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
@@ -65,6 +65,16 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
index 1c5905744a7..6db62e1f90d 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
@@ -61,6 +61,16 @@ public abstract class ScanTestBase extends AvroDataTestBase {
ScanTestBase.spark =
SparkSession.builder()
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[2]")
.getOrCreate();
ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 19ec6d13dd5..bf7c837cf38 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase {
@BeforeAll
public static void startSpark() {
- TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestCompressionSettings.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@BeforeEach
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index a7702b169a6..bbb85f7d5c6 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -74,7 +74,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog {
@BeforeAll
public static void startSpark() {
- TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestDataSourceOptions.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index fd7d52178f2..929ebd405c5 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -114,6 +114,16 @@ public class TestFilteredScan {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index 153564f7d12..761c20f5d80 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -98,6 +98,16 @@ public class TestForwardCompatibility {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
index f4f57157e47..d1a7cc64179 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
@@ -51,6 +51,16 @@ public class TestIcebergSpark {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
index e1402396fa7..ca4212f52e6 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
@@ -118,6 +118,16 @@ public class TestPartitionPruning {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 0b6ab2052b6..a8176332fb7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -112,6 +112,16 @@ public class TestPartitionValues {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 11865db7fce..8fe32e8300c 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -91,6 +91,16 @@ public class TestSnapshotSelection {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index 3051e27d720..6c39f76c286 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -125,6 +125,16 @@ public class TestSparkDataFile {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 4ccbf86f125..40cff1f69a7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -100,6 +100,16 @@ public class TestSparkDataWrite {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
@@ -144,7 +154,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
for (ManifestFile manifest :
SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) {
@@ -213,7 +223,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
}
@@ -258,7 +268,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
}
@@ -310,7 +320,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
}
@@ -352,7 +362,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
}
@@ -391,7 +401,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
List<DataFile> files = Lists.newArrayList();
@@ -459,7 +469,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
}
@@ -706,7 +716,7 @@ public class TestSparkDataWrite {
// Since write and commit succeeded, the rows should be readable
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual)
.hasSize(records.size() + records2.size())
.containsExactlyInAnyOrder(
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
index 596d05d30b5..dc8563314c7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
@@ -88,6 +88,16 @@ public class TestSparkReadProjection extends TestReadProjection {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
ImmutableMap<String, String> config =
ImmutableMap.of(
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 42699f46623..058c2d79b62 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -138,6 +138,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
.config("spark.ui.liveUpdate.period", 0)
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
index baf7fa8f88a..509c5deba51 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
SparkSession.builder()
.master("local[2]")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 54048bbf218..b1a2ca92098 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -69,6 +69,16 @@ public class TestStructuredStreaming {
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
.config("spark.sql.shuffle.partitions", 4)
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
index 8b1e3fbfc77..74936e2487e 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
@BeforeAll
public static void startSpark() {
- TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestTimestampWithoutZone.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
index c3fac70dd3f..b7f2431c119 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
@@ -84,6 +84,16 @@ public class TestWriteMetricsConfig {
SparkSession.builder()
.master("local[2]")
.config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 5ce56b4feca..0def2a156d4 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -63,6 +63,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
SparkSession.builder()
.master("local[2]")
.config("spark.sql.iceberg.aggregate_pushdown", "true")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 9d2ce2b388a..5e233688488 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog {
String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
if (sparkFilter != null) {
- assertThat(planAsString)
- .as("Post scan filter should match")
- .contains("Filter (" + sparkFilter + ")");
+ assertThat(planAsString).as("Post scan filter should match").contains("CometFilter");
} else {
assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter (");
}