blob: 40aadbbc663a921e6b7e1e3458b6900d7f5675af [file] [log] [blame]
diff --git a/build.gradle b/build.gradle
index 998f2ee9ea6..017e61be98c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -814,6 +814,13 @@ project(':iceberg-parquet') {
implementation project(':iceberg-core')
implementation project(':iceberg-common')
+ implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") {
+ exclude group: 'org.apache.arrow'
+ exclude group: 'org.apache.parquet'
+ exclude group: 'org.apache.spark'
+ exclude group: 'org.apache.iceberg'
+ }
+
implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index c50991c5fc6..f7ad00f0b78 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -36,6 +36,7 @@ awssdk-s3accessgrants = "2.3.0"
bson-ver = "4.11.5"
caffeine = "2.9.3"
calcite = "1.39.0"
+comet = "0.12.0"
datasketches = "6.2.0"
delta-standalone = "3.3.1"
delta-spark = "3.3.1"
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
new file mode 100644
index 00000000000..ddf6c7de5ae
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+public class CometTypeUtils {
+
+ private CometTypeUtils() {}
+
+ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
+
+ String[] path = descriptor.getPath();
+ PrimitiveType primitiveType = descriptor.getPrimitiveType();
+ String physicalType = primitiveType.getPrimitiveTypeName().name();
+
+ int typeLength =
+ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ ? primitiveType.getTypeLength()
+ : 0;
+
+ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED;
+
+ // ToDo: extract this into a Util method
+ String logicalTypeName = null;
+ Map<String, String> logicalTypeParams = Maps.newHashMap();
+ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation();
+
+ if (logicalType != null) {
+ logicalTypeName = logicalType.getClass().getSimpleName();
+
+ // Handle specific logical types
+ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
+ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision()));
+ logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
+ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", timestamp.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
+ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC()));
+ logicalTypeParams.put("unit", time.getUnit().name());
+ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
+ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
+ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
+ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth()));
+ }
+ }
+
+ return new ParquetColumnSpec(
+ 1, // ToDo: pass in the correct id
+ path,
+ physicalType,
+ typeLength,
+ isRepeated,
+ descriptor.getMaxDefinitionLevel(),
+ descriptor.getMaxRepetitionLevel(),
+ logicalTypeName,
+ logicalTypeParams);
+ }
+
+ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) {
+ PrimitiveType.PrimitiveTypeName primType =
+ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
+
+ Type.Repetition repetition;
+ if (columnSpec.getMaxRepetitionLevel() > 0) {
+ repetition = Type.Repetition.REPEATED;
+ } else if (columnSpec.getMaxDefinitionLevel() > 0) {
+ repetition = Type.Repetition.OPTIONAL;
+ } else {
+ repetition = Type.Repetition.REQUIRED;
+ }
+
+ String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
+ // Reconstruct the logical type from parameters
+ LogicalTypeAnnotation logicalType = null;
+ if (columnSpec.getLogicalTypeName() != null) {
+ logicalType =
+ reconstructLogicalType(
+ columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams());
+ }
+
+ PrimitiveType primitiveType;
+ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ primitiveType =
+ org.apache.parquet.schema.Types.primitive(primType, repetition)
+ .length(columnSpec.getTypeLength())
+ .as(logicalType)
+ .id(columnSpec.getFieldId())
+ .named(name);
+ } else {
+ primitiveType =
+ Types.primitive(primType, repetition)
+ .as(logicalType)
+ .id(columnSpec.getFieldId())
+ .named(name);
+ }
+
+ return new ColumnDescriptor(
+ columnSpec.getPath(),
+ primitiveType,
+ columnSpec.getMaxRepetitionLevel(),
+ columnSpec.getMaxDefinitionLevel());
+ }
+
+ private static LogicalTypeAnnotation reconstructLogicalType(
+ String logicalTypeName, java.util.Map<String, String> params) {
+
+ switch (logicalTypeName) {
+ // MAP
+ case "MapLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.mapType();
+
+ // LIST
+ case "ListLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.listType();
+
+ // STRING
+ case "StringLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.stringType();
+
+ // MAP_KEY_VALUE
+ case "MapKeyValueLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
+
+ // ENUM
+ case "EnumLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.enumType();
+
+ // DECIMAL
+ case "DecimalLogicalTypeAnnotation":
+ if (!params.containsKey("scale") || !params.containsKey("precision")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for DecimalLogicalTypeAnnotation: " + params);
+ }
+ int scale = Integer.parseInt(params.get("scale"));
+ int precision = Integer.parseInt(params.get("precision"));
+ return LogicalTypeAnnotation.decimalType(scale, precision);
+
+ // DATE
+ case "DateLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.dateType();
+
+ // TIME
+ case "TimeLogicalTypeAnnotation":
+ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for TimeLogicalTypeAnnotation: " + params);
+ }
+
+ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
+ String timeUnitStr = params.get("unit");
+
+ LogicalTypeAnnotation.TimeUnit timeUnit;
+ switch (timeUnitStr) {
+ case "MILLIS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
+ break;
+ case "MICROS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
+ break;
+ case "NANOS":
+ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr);
+ }
+ return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
+
+ // TIMESTAMP
+ case "TimestampLogicalTypeAnnotation":
+ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for TimestampLogicalTypeAnnotation: " + params);
+ }
+ boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
+ String unitStr = params.get("unit");
+
+ LogicalTypeAnnotation.TimeUnit unit;
+ switch (unitStr) {
+ case "MILLIS":
+ unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
+ break;
+ case "MICROS":
+ unit = LogicalTypeAnnotation.TimeUnit.MICROS;
+ break;
+ case "NANOS":
+ unit = LogicalTypeAnnotation.TimeUnit.NANOS;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr);
+ }
+ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
+
+ // INTEGER
+ case "IntLogicalTypeAnnotation":
+ if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) {
+ throw new IllegalArgumentException(
+ "Missing required parameters for IntLogicalTypeAnnotation: " + params);
+ }
+ boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
+ int bitWidth = Integer.parseInt(params.get("bitWidth"));
+ return LogicalTypeAnnotation.intType(bitWidth, isSigned);
+
+ // JSON
+ case "JsonLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.jsonType();
+
+ // BSON
+ case "BsonLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.bsonType();
+
+ // UUID
+ case "UUIDLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.uuidType();
+
+ // INTERVAL
+ case "IntervalLogicalTypeAnnotation":
+ return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
+
+ default:
+ throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
+ }
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
new file mode 100644
index 00000000000..a3cba401827
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import org.apache.comet.parquet.FileReader;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.comet.parquet.ReadOptions;
+import org.apache.comet.parquet.RowGroupReader;
+import org.apache.comet.parquet.WrappedInputFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+
+public class CometVectorizedParquetReader<T> extends CloseableGroup
+ implements CloseableIterable<T> {
+ private final InputFile input;
+ private final ParquetReadOptions options;
+ private final Schema expectedSchema;
+ private final Function<MessageType, VectorizedReader<?>> batchReaderFunc;
+ private final Expression filter;
+ private final boolean reuseContainers;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final NameMapping nameMapping;
+ private final Map<String, String> properties;
+ private Long start = null;
+ private Long length = null;
+ private ByteBuffer fileEncryptionKey = null;
+ private ByteBuffer fileAADPrefix = null;
+
+ public CometVectorizedParquetReader(
+ InputFile input,
+ Schema expectedSchema,
+ ParquetReadOptions options,
+ Function<MessageType, VectorizedReader<?>> readerFunc,
+ NameMapping nameMapping,
+ Expression filter,
+ boolean reuseContainers,
+ boolean caseSensitive,
+ int maxRecordsPerBatch,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ this.input = input;
+ this.expectedSchema = expectedSchema;
+ this.options = options;
+ this.batchReaderFunc = readerFunc;
+ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
+ this.filter = filter == Expressions.alwaysTrue() ? null : filter;
+ this.reuseContainers = reuseContainers;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = maxRecordsPerBatch;
+ this.nameMapping = nameMapping;
+ this.properties = properties;
+ this.start = start;
+ this.length = length;
+ this.fileEncryptionKey = fileEncryptionKey;
+ this.fileAADPrefix = fileAADPrefix;
+ }
+
+ private ReadConf conf = null;
+
+ private ReadConf init() {
+ if (conf == null) {
+ ReadConf readConf =
+ new ReadConf(
+ input,
+ options,
+ expectedSchema,
+ filter,
+ null,
+ batchReaderFunc,
+ nameMapping,
+ reuseContainers,
+ caseSensitive,
+ batchSize);
+ this.conf = readConf.copy();
+ return readConf;
+ }
+ return conf;
+ }
+
+ @Override
+ public CloseableIterator<T> iterator() {
+ FileIterator<T> iter =
+ new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix);
+ addCloseable(iter);
+ return iter;
+ }
+
+ private static class FileIterator<T> implements CloseableIterator<T> {
+ // private final ParquetFileReader reader;
+ private final boolean[] shouldSkip;
+ private final VectorizedReader<T> model;
+ private final long totalValues;
+ private final int batchSize;
+ private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
+ private final boolean reuseContainers;
+ private int nextRowGroup = 0;
+ private long nextRowGroupStart = 0;
+ private long valuesRead = 0;
+ private T last = null;
+ private final FileReader cometReader;
+ private ReadConf conf;
+
+ FileIterator(
+ ReadConf conf,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ this.shouldSkip = conf.shouldSkip();
+ this.totalValues = conf.totalValues();
+ this.reuseContainers = conf.reuseContainers();
+ this.model = conf.vectorizedModel();
+ this.batchSize = conf.batchSize();
+ this.model.setBatchSize(this.batchSize);
+ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
+ this.cometReader =
+ newCometReader(
+ conf.file(),
+ conf.projection(),
+ properties,
+ start,
+ length,
+ fileEncryptionKey,
+ fileAADPrefix);
+ this.conf = conf;
+ }
+
+ private FileReader newCometReader(
+ InputFile file,
+ MessageType projection,
+ Map<String, String> properties,
+ Long start,
+ Long length,
+ ByteBuffer fileEncryptionKey,
+ ByteBuffer fileAADPrefix) {
+ try {
+ ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build();
+
+ FileReader fileReader =
+ new FileReader(
+ new WrappedInputFile(file),
+ cometOptions,
+ properties,
+ start,
+ length,
+ ByteBuffers.toByteArray(fileEncryptionKey),
+ ByteBuffers.toByteArray(fileAADPrefix));
+
+ List<ColumnDescriptor> columnDescriptors = projection.getColumns();
+
+ List<ParquetColumnSpec> specs = Lists.newArrayList();
+
+ for (ColumnDescriptor descriptor : columnDescriptors) {
+ ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
+ specs.add(spec);
+ }
+
+ fileReader.setRequestedSchemaFromSpecs(specs);
+ return fileReader;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return valuesRead < totalValues;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (valuesRead >= nextRowGroupStart) {
+ advance();
+ }
+
+ // batchSize is an integer, so casting to integer is safe
+ int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);
+ if (reuseContainers) {
+ this.last = model.read(last, numValuesToRead);
+ } else {
+ this.last = model.read(null, numValuesToRead);
+ }
+ valuesRead += numValuesToRead;
+
+ return last;
+ }
+
+ private void advance() {
+ while (shouldSkip[nextRowGroup]) {
+ nextRowGroup += 1;
+ cometReader.skipNextRowGroup();
+ }
+ RowGroupReader pages;
+ try {
+ pages = cometReader.readNextRowGroup();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+
+ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));
+ nextRowGroupStart += pages.getRowCount();
+ nextRowGroup += 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ model.close();
+ cometReader.close();
+ if (conf != null && conf.reader() != null) {
+ conf.reader().close();
+ }
+ }
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 31f9e2a80a6..520f142c212 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -1124,6 +1124,7 @@ public class Parquet {
private NameMapping nameMapping = null;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;
+ private boolean isComet;
private ReadBuilder(InputFile file) {
this.file = file;
@@ -1168,6 +1169,11 @@ public class Parquet {
return this;
}
+ public ReadBuilder enableComet(boolean enableComet) {
+ this.isComet = enableComet;
+ return this;
+ }
+
/**
* @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead
*/
@@ -1263,7 +1269,7 @@ public class Parquet {
}
@Override
- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
@@ -1315,16 +1321,35 @@ public class Parquet {
}
if (batchedReaderFunc != null) {
- return new VectorizedParquetReader<>(
- file,
- schema,
- options,
- batchedReaderFunc,
- mapping,
- filter,
- reuseContainers,
- caseSensitive,
- maxRecordsPerBatch);
+ if (isComet) {
+ LOG.info("Comet enabled");
+ return new CometVectorizedParquetReader<>(
+ file,
+ schema,
+ options,
+ batchedReaderFunc,
+ mapping,
+ filter,
+ reuseContainers,
+ caseSensitive,
+ maxRecordsPerBatch,
+ properties,
+ start,
+ length,
+ fileEncryptionKey,
+ fileAADPrefix);
+ } else {
+ return new VectorizedParquetReader<>(
+ file,
+ schema,
+ options,
+ batchedReaderFunc,
+ mapping,
+ filter,
+ reuseContainers,
+ caseSensitive,
+ maxRecordsPerBatch);
+ }
} else {
Function<MessageType, ParquetValueReader<?>> readBuilder =
readerFuncWithSchema != null
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
index 1fb2372ba56..142e5fbadf1 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -157,6 +157,14 @@ class ReadConf<T> {
return newReader;
}
+ InputFile file() {
+ return file;
+ }
+
+ MessageType projection() {
+ return projection;
+ }
+
ParquetValueReader<T> model() {
return model;
}
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 572c32f9292..d155f634a4d 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}
- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
implementation libs.parquet.column
implementation libs.parquet.hadoop
@@ -184,7 +184,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
@@ -265,6 +265,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}"
// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
@@ -302,8 +303,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
+// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
+// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5'
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
index 578845e3da2..0118b30683d 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
@@ -57,6 +57,16 @@ public abstract class ExtensionsTestBase extends CatalogTestBase {
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config(
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean()))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
index ecf9e6f8a59..0f8cced69aa 100644
--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
@@ -56,6 +56,16 @@ public class TestCallStatementParser {
.master("local[2]")
.config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName())
.config("spark.extra.prop", "value")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
TestCallStatementParser.parser = spark.sessionState().sqlParser();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
index 64edb1002e9..5bb449f1ac7 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
@@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", catalogWarehouse())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local");
spark = builder.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
index a5d0456b0b2..4af408f4861 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
@@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark {
"spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]");
spark = builder.getOrCreate();
Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
index c6794e43c63..f7359197407 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
@@ -239,6 +239,16 @@ public class DVReaderBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
index ac74fb5a109..e011b8b2510 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
@@ -223,6 +223,16 @@ public class DVWriterBenchmark {
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir())
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.master("local[*]")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index 68c537e34a4..f66be2f3896 100644
--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark {
}
protected void setupSpark(boolean enableDictionaryEncoding) {
- SparkSession.Builder builder = SparkSession.builder().config("spark.ui.enabled", false);
+ SparkSession.Builder builder =
+ SparkSession.builder()
+ .config("spark.ui.enabled", false)
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true");
if (!enableDictionaryEncoding) {
builder
.config("parquet.dictionary.page.size", "1")
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
index 16159dcbdff..eba1a2a0fb1 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
@@ -19,18 +19,22 @@
package org.apache.iceberg.spark.data.vectorized;
import java.io.IOException;
+import org.apache.comet.CometConf;
+import org.apache.comet.CometSchemaImporter;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
+import org.apache.comet.parquet.ParquetColumnSpec;
+import org.apache.comet.parquet.RowGroupReader;
import org.apache.comet.parquet.TypeUtil;
import org.apache.comet.parquet.Utils;
-import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
@@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
private final ColumnDescriptor descriptor;
private final DataType sparkType;
+ private final int fieldId;
// The delegated ColumnReader from Comet side
private AbstractColumnReader delegate;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;
private CometSchemaImporter importer;
+ private ParquetColumnSpec spec;
- CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
+ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) {
this.sparkType = sparkType;
this.descriptor = descriptor;
+ this.fieldId = fieldId;
}
CometColumnReader(Types.NestedField field) {
DataType dataType = SparkSchemaUtil.convert(field.type());
StructField structField = new StructField(field.name(), dataType, false, Metadata.empty());
this.sparkType = dataType;
- this.descriptor = TypeUtil.convertToParquet(structField);
+ this.descriptor =
+ CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField));
+ this.fieldId = field.fieldId();
}
public AbstractColumnReader delegate() {
@@ -92,7 +101,26 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
}
this.importer = new CometSchemaImporter(new RootAllocator());
- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
+
+ spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor);
+
+ boolean useLegacyTime =
+ Boolean.parseBoolean(
+ SQLConf.get()
+ .getConfString(
+ CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false"));
+ boolean useLazyMaterialization =
+ Boolean.parseBoolean(
+ SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false"));
+ this.delegate =
+ Utils.getColumnReader(
+ sparkType,
+ spec,
+ importer,
+ batchSize,
+ true, // Comet sets this to true for native execution
+ useLazyMaterialization,
+ useLegacyTime);
this.initialized = true;
}
@@ -111,9 +139,9 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
* <p>NOTE: this should be called before reading a new Parquet column chunk, and after {@link
* CometColumnReader#reset} is called.
*/
- public void setPageReader(PageReader pageReader) throws IOException {
+ public void setPageReader(RowGroupReader pageStore) throws IOException {
Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first");
- ((ColumnReader) delegate).setPageReader(pageReader);
+ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec);
}
@Override
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
index 04ac69476ad..916face2bf2 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
@@ -22,8 +22,12 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import org.apache.comet.CometRuntimeException;
import org.apache.comet.parquet.AbstractColumnReader;
-import org.apache.comet.parquet.BatchReader;
+import org.apache.comet.parquet.IcebergCometBatchReader;
+import org.apache.comet.parquet.RowGroupReader;
+import org.apache.comet.vector.CometSelectionVector;
+import org.apache.comet.vector.CometVector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.VectorizedReader;
@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
// calling BatchReader.nextBatch, the isDeleted value is not yet available, so
// DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is
// available.
- private final BatchReader delegate;
+ private final IcebergCometBatchReader delegate;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
this.hasIsDeletedColumn =
readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);
- AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
- this.delegate = new BatchReader(abstractColumnReaders);
- delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
+ this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema));
}
@Override
@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
&& !(readers[i] instanceof CometPositionColumnReader)
&& !(readers[i] instanceof CometDeleteColumnReader)) {
readers[i].reset();
- readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
+ readers[i].setPageReader((RowGroupReader) pageStore);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e);
}
}
+ AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length];
for (int i = 0; i < readers.length; i++) {
- delegate.getColumnReaders()[i] = this.readers[i].delegate();
+ delegateReaders[i] = readers[i].delegate();
}
+ delegate.init(delegateReaders);
+
this.rowStartPosInBatch =
- pageStore
+ ((RowGroupReader) pageStore)
.getRowIndexOffset()
.orElseThrow(
() ->
@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
Pair<int[], Integer> pair = buildRowIdMapping(vectors);
if (pair != null) {
int[] rowIdMapping = pair.first();
- numLiveRows = pair.second();
- for (int i = 0; i < vectors.length; i++) {
- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
+ if (pair.second() != null) {
+ numLiveRows = pair.second();
+ for (int i = 0; i < vectors.length; i++) {
+ if (vectors[i] instanceof CometVector) {
+ vectors[i] =
+ new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows);
+ } else {
+ throw new CometRuntimeException(
+ "Unsupported column vector type: " + vectors[i].getClass());
+ }
+ }
}
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
index 047c96314b1..88d691a607a 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.comet.parquet.ConstantColumnReader;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -34,7 +35,11 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
- new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
+ new ConstantColumnReader(
+ sparkType(),
+ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()),
+ convertToSparkValue(value),
+ false));
}
@Override
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
index 6235bfe4865..cba108e4326 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
DeleteColumnReader() {
super(
DataTypes.BooleanType,
- TypeUtil.convertToParquet(
+ TypeUtil.convertToParquetSpec(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
- false /* isConstant = false */);
+ false /* isConstant */);
this.isDeleted = new boolean[0];
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
index bcc0e514c28..98e80068c51 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
import org.apache.comet.parquet.MetadataColumnReader;
import org.apache.comet.parquet.Native;
+import org.apache.iceberg.parquet.CometTypeUtils;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.types.DataTypes;
@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader {
PositionColumnReader(ColumnDescriptor descriptor) {
super(
DataTypes.LongType,
- descriptor,
+ CometTypeUtils.descriptorToParquetColumnSpec(descriptor),
false /* useDecimal128 = false */,
false /* isConstant = false */);
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
index d36f1a72747..56f8c9bff93 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReade
return null;
}
- return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc);
+ return new CometColumnReader(
+ SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId);
}
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index 780e1750a52..57892ac4c59 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -109,6 +109,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
// read performance as every batch read doesn't have to pay the cost of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
+ .enableComet(parquetConf.readerType() == ParquetReaderType.COMET)
.build();
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 45bd48aea2e..5362578861c 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -172,11 +172,11 @@ class SparkBatch implements Batch {
return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId());
}
- private boolean useCometBatchReads() {
+ protected boolean useCometBatchReads() {
return readConf.parquetVectorizationEnabled()
&& readConf.parquetReaderType() == ParquetReaderType.COMET
&& expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
- && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
+ && taskGroups.stream().allMatch(this::supportsCometBatchReads);
}
private boolean supportsCometBatchReads(Types.NestedField field) {
@@ -186,6 +186,21 @@ class SparkBatch implements Batch {
&& field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
}
+ private boolean supportsCometBatchReads(ScanTask task) {
+ if (task instanceof ScanTaskGroup) {
+ ScanTaskGroup<?> taskGroup = (ScanTaskGroup<?>) task;
+ return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads);
+
+ } else if (task.isFileScanTask() && !task.isDataTask()) {
+ FileScanTask fileScanTask = task.asFileScanTask();
+ // Comet can't handle delete files for now
+ return fileScanTask.file().format() == FileFormat.PARQUET;
+
+ } else {
+ return false;
+ }
+ }
+
// conditions for using ORC batch reads:
// - ORC vectorization is enabled
// - all tasks are of type FileScanTask and read only ORC files with no delete files
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 106b296de09..967b0d41d08 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.comet.parquet.SupportsComet;
import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class SparkScan implements Scan, SupportsReportStatistics {
+abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet {
private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
private static final String NDV_KEY = "ndv";
@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
return splitSize;
}
}
+
+ @Override
+ public boolean isCometEnabled() {
+ SparkBatch batch = (SparkBatch) this.toBatch();
+ return batch.useCometBatchReads();
+ }
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
index 404ba728460..26d6f9b613f 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
.master("local[2]")
.config("spark.serializer", serializer)
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
index 659507e4c5e..eb9cedc34c5 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
index a218f965ea6..395c02441e7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
index 2665d7ba8d3..306e859ce1a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
index 3e9f3334ef6..ecf855986b7 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
@@ -77,6 +77,17 @@ public abstract class TestBase extends SparkTestHelperBase {
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .config("spark.comet.exec.broadcastExchange.enabled", "false")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
index bc4e722bc86..1a40d8edcb3 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -59,7 +59,20 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect
@BeforeAll
public static void startSpark() {
- spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
index 0886df957db..ba2ee6b1c2c 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
@@ -57,7 +57,20 @@ public abstract class ScanTestBase extends AvroDataTest {
@BeforeAll
public static void startSpark() {
- ScanTestBase.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ ScanTestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 6b7d861364f..f9805fa8df5 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase {
@BeforeAll
public static void startSpark() {
- TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestCompressionSettings.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@BeforeEach
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index c4ba96e6340..a0e77db99b9 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -75,7 +75,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog {
@BeforeAll
public static void startSpark() {
- TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestDataSourceOptions.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 348173596e4..bdd31528d3f 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -110,7 +110,20 @@ public class TestFilteredScan {
@BeforeAll
public static void startSpark() {
- TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestFilteredScan.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index 84c99a575c8..6f4b2af003b 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -93,7 +93,20 @@ public class TestForwardCompatibility {
@BeforeAll
public static void startSpark() {
- TestForwardCompatibility.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestForwardCompatibility.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
index 7eff93d204e..1774acd056b 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
@@ -46,7 +46,20 @@ public class TestIcebergSpark {
@BeforeAll
public static void startSpark() {
- TestIcebergSpark.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestIcebergSpark.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
index 9464f687b0e..b5dbb88dd5a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
@@ -112,7 +112,20 @@ public class TestPartitionPruning {
@BeforeAll
public static void startSpark() {
- TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestPartitionPruning.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme);
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index 5c218f21c47..e48e6d121f0 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -107,7 +107,20 @@ public class TestPartitionValues {
@BeforeAll
public static void startSpark() {
- TestPartitionValues.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestPartitionValues.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index a7334a580ca..dca7fdd9c69 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -87,7 +87,20 @@ public class TestSnapshotSelection {
@BeforeAll
public static void startSpark() {
- TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestSnapshotSelection.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index 182b1ef8f5a..5da4a3c90aa 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -120,7 +120,20 @@ public class TestSparkDataFile {
@BeforeAll
public static void startSpark() {
- TestSparkDataFile.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestSparkDataFile.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index fb2b312bed9..5927a408c6b 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -96,7 +96,20 @@ public class TestSparkDataWrite {
@BeforeAll
public static void startSpark() {
- TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestSparkDataWrite.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterEach
@@ -140,7 +153,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
for (ManifestFile manifest :
@@ -210,7 +223,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
}
@@ -256,7 +269,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
}
@@ -309,7 +322,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
}
@@ -352,7 +365,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
}
@@ -392,7 +405,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
@@ -458,7 +471,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
}
@@ -622,7 +635,7 @@ public class TestSparkDataWrite {
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected);
assertThat(actual).as("Result rows should match").isEqualTo(expected);
@@ -708,7 +721,7 @@ public class TestSparkDataWrite {
// Since write and commit succeeded, the rows should be readable
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
List<SimpleRecord> actual =
- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).as("Number of rows should match").hasSize(records.size() + records2.size());
assertThat(actual)
.describedAs("Result rows should match")
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
index becf6a064dc..beb7c801b05 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
@@ -83,7 +83,20 @@ public class TestSparkReadProjection extends TestReadProjection {
@BeforeAll
public static void startSpark() {
- TestSparkReadProjection.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestSparkReadProjection.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
ImmutableMap<String, String> config =
ImmutableMap.of(
"type", "hive",
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 4f1cef5d373..0e34e08db4a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -136,6 +136,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
.config("spark.ui.liveUpdate.period", 0)
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
index baf7fa8f88a..509c5deba51 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
SparkSession.builder()
.master("local[2]")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index c84a65cbe95..42ee74280c1 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -67,6 +67,16 @@ public class TestStructuredStreaming {
SparkSession.builder()
.master("local[2]")
.config("spark.sql.shuffle.partitions", 4)
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.getOrCreate();
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
index 306444b9f29..0ed52fd6cfb 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
@BeforeAll
public static void startSpark() {
- TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestTimestampWithoutZone.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
}
@AfterAll
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
index 841268a6be0..e290c246f7b 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
@@ -80,7 +80,20 @@ public class TestWriteMetricsConfig {
@BeforeAll
public static void startSpark() {
- TestWriteMetricsConfig.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ TestWriteMetricsConfig.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
index 6e09252704a..438bc4da575 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
@@ -60,6 +60,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
SparkSession.builder()
.master("local[2]")
.config("spark.sql.iceberg.aggregate_pushdown", "true")
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
+ .config(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .config("spark.comet.explainFallback.enabled", "true")
+ .config("spark.sql.iceberg.parquet.reader-type", "COMET")
+ .config("spark.memory.offHeap.enabled", "true")
+ .config("spark.memory.offHeap.size", "10g")
+ .config("spark.comet.use.lazyMaterialization", "false")
+ .config("spark.comet.schemaEvolution.enabled", "true")
.enableHiveSupport()
.getOrCreate();
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 9d2ce2b388a..5e233688488 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog {
String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
if (sparkFilter != null) {
- assertThat(planAsString)
- .as("Post scan filter should match")
- .contains("Filter (" + sparkFilter + ")");
+ assertThat(planAsString).as("Post scan filter should match").contains("CometFilter");
} else {
assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter (");
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
index 6719c45ca96..2515454401a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
@@ -616,7 +616,7 @@ public class TestStoragePartitionedJoins extends TestBaseWithCatalog {
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.id = t2.id AND t1.%s = t2.%s "
- + "ORDER BY t1.id, t1.%s",
+ + "ORDER BY t1.id, t1.%s, t1.salary",
sourceColumnName,
tableName,
tableName(OTHER_TABLE_NAME),