| diff --git a/build.gradle b/build.gradle |
| index 998f2ee9ea6..017e61be98c 100644 |
| --- a/build.gradle |
| +++ b/build.gradle |
| @@ -814,6 +814,13 @@ project(':iceberg-parquet') { |
| implementation project(':iceberg-core') |
| implementation project(':iceberg-common') |
| |
| + implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") { |
| + exclude group: 'org.apache.arrow' |
| + exclude group: 'org.apache.parquet' |
| + exclude group: 'org.apache.spark' |
| + exclude group: 'org.apache.iceberg' |
| + } |
| + |
| implementation(libs.parquet.avro) { |
| exclude group: 'org.apache.avro', module: 'avro' |
| // already shaded by Parquet |
| diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml |
| index c50991c5fc6..f7ad00f0b78 100644 |
| --- a/gradle/libs.versions.toml |
| +++ b/gradle/libs.versions.toml |
| @@ -36,6 +36,7 @@ awssdk-s3accessgrants = "2.3.0" |
| bson-ver = "4.11.5" |
| caffeine = "2.9.3" |
| calcite = "1.39.0" |
| +comet = "0.12.0" |
| datasketches = "6.2.0" |
| delta-standalone = "3.3.1" |
| delta-spark = "3.3.1" |
| diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java |
| new file mode 100644 |
| index 00000000000..ddf6c7de5ae |
| --- /dev/null |
| +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java |
| @@ -0,0 +1,255 @@ |
| +/* |
| + * Licensed to the Apache Software Foundation (ASF) under one |
| + * or more contributor license agreements. See the NOTICE file |
| + * distributed with this work for additional information |
| + * regarding copyright ownership. The ASF licenses this file |
| + * to you under the Apache License, Version 2.0 (the |
| + * "License"); you may not use this file except in compliance |
| + * with the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, |
| + * software distributed under the License is distributed on an |
| + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| + * KIND, either express or implied. See the License for the |
| + * specific language governing permissions and limitations |
| + * under the License. |
| + */ |
| +package org.apache.iceberg.parquet; |
| + |
| +import java.util.Map; |
| +import org.apache.comet.parquet.ParquetColumnSpec; |
| +import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| +import org.apache.parquet.column.ColumnDescriptor; |
| +import org.apache.parquet.schema.LogicalTypeAnnotation; |
| +import org.apache.parquet.schema.PrimitiveType; |
| +import org.apache.parquet.schema.Type; |
| +import org.apache.parquet.schema.Types; |
| + |
| +public class CometTypeUtils { |
| + |
| + private CometTypeUtils() {} |
| + |
| + public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { |
| + |
| + String[] path = descriptor.getPath(); |
| + PrimitiveType primitiveType = descriptor.getPrimitiveType(); |
| + String physicalType = primitiveType.getPrimitiveTypeName().name(); |
| + |
| + int typeLength = |
| + primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY |
| + ? primitiveType.getTypeLength() |
| + : 0; |
| + |
| + boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; |
| + |
| + // ToDo: extract this into a Util method |
| + String logicalTypeName = null; |
| + Map<String, String> logicalTypeParams = Maps.newHashMap(); |
| + LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); |
| + |
| + if (logicalType != null) { |
| + logicalTypeName = logicalType.getClass().getSimpleName(); |
| + |
| + // Handle specific logical types |
| + if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { |
| + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = |
| + (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; |
| + logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); |
| + logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); |
| + } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { |
| + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = |
| + (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; |
| + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); |
| + logicalTypeParams.put("unit", timestamp.getUnit().name()); |
| + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { |
| + LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = |
| + (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; |
| + logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); |
| + logicalTypeParams.put("unit", time.getUnit().name()); |
| + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { |
| + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = |
| + (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; |
| + logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); |
| + logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); |
| + } |
| + } |
| + |
| + return new ParquetColumnSpec( |
| + 1, // ToDo: pass in the correct id |
| + path, |
| + physicalType, |
| + typeLength, |
| + isRepeated, |
| + descriptor.getMaxDefinitionLevel(), |
| + descriptor.getMaxRepetitionLevel(), |
| + logicalTypeName, |
| + logicalTypeParams); |
| + } |
| + |
| + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { |
| + PrimitiveType.PrimitiveTypeName primType = |
| + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); |
| + |
| + Type.Repetition repetition; |
| + if (columnSpec.getMaxRepetitionLevel() > 0) { |
| + repetition = Type.Repetition.REPEATED; |
| + } else if (columnSpec.getMaxDefinitionLevel() > 0) { |
| + repetition = Type.Repetition.OPTIONAL; |
| + } else { |
| + repetition = Type.Repetition.REQUIRED; |
| + } |
| + |
| + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; |
| + // Reconstruct the logical type from parameters |
| + LogicalTypeAnnotation logicalType = null; |
| + if (columnSpec.getLogicalTypeName() != null) { |
| + logicalType = |
| + reconstructLogicalType( |
| + columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); |
| + } |
| + |
| + PrimitiveType primitiveType; |
| + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { |
| + primitiveType = |
| + org.apache.parquet.schema.Types.primitive(primType, repetition) |
| + .length(columnSpec.getTypeLength()) |
| + .as(logicalType) |
| + .id(columnSpec.getFieldId()) |
| + .named(name); |
| + } else { |
| + primitiveType = |
| + Types.primitive(primType, repetition) |
| + .as(logicalType) |
| + .id(columnSpec.getFieldId()) |
| + .named(name); |
| + } |
| + |
| + return new ColumnDescriptor( |
| + columnSpec.getPath(), |
| + primitiveType, |
| + columnSpec.getMaxRepetitionLevel(), |
| + columnSpec.getMaxDefinitionLevel()); |
| + } |
| + |
| + private static LogicalTypeAnnotation reconstructLogicalType( |
| + String logicalTypeName, java.util.Map<String, String> params) { |
| + |
| + switch (logicalTypeName) { |
| + // MAP |
| + case "MapLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.mapType(); |
| + |
| + // LIST |
| + case "ListLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.listType(); |
| + |
| + // STRING |
| + case "StringLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.stringType(); |
| + |
| + // MAP_KEY_VALUE |
| + case "MapKeyValueLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); |
| + |
| + // ENUM |
| + case "EnumLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.enumType(); |
| + |
| + // DECIMAL |
| + case "DecimalLogicalTypeAnnotation": |
| + if (!params.containsKey("scale") || !params.containsKey("precision")) { |
| + throw new IllegalArgumentException( |
| + "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); |
| + } |
| + int scale = Integer.parseInt(params.get("scale")); |
| + int precision = Integer.parseInt(params.get("precision")); |
| + return LogicalTypeAnnotation.decimalType(scale, precision); |
| + |
| + // DATE |
| + case "DateLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.dateType(); |
| + |
| + // TIME |
| + case "TimeLogicalTypeAnnotation": |
| + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { |
| + throw new IllegalArgumentException( |
| + "Missing required parameters for TimeLogicalTypeAnnotation: " + params); |
| + } |
| + |
| + boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); |
| + String timeUnitStr = params.get("unit"); |
| + |
| + LogicalTypeAnnotation.TimeUnit timeUnit; |
| + switch (timeUnitStr) { |
| + case "MILLIS": |
| + timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; |
| + break; |
| + case "MICROS": |
| + timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; |
| + break; |
| + case "NANOS": |
| + timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; |
| + break; |
| + default: |
| + throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); |
| + } |
| + return LogicalTypeAnnotation.timeType(isUTC, timeUnit); |
| + |
| + // TIMESTAMP |
| + case "TimestampLogicalTypeAnnotation": |
| + if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { |
| + throw new IllegalArgumentException( |
| + "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); |
| + } |
| + boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); |
| + String unitStr = params.get("unit"); |
| + |
| + LogicalTypeAnnotation.TimeUnit unit; |
| + switch (unitStr) { |
| + case "MILLIS": |
| + unit = LogicalTypeAnnotation.TimeUnit.MILLIS; |
| + break; |
| + case "MICROS": |
| + unit = LogicalTypeAnnotation.TimeUnit.MICROS; |
| + break; |
| + case "NANOS": |
| + unit = LogicalTypeAnnotation.TimeUnit.NANOS; |
| + break; |
| + default: |
| + throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); |
| + } |
| + return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); |
| + |
| + // INTEGER |
| + case "IntLogicalTypeAnnotation": |
| + if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { |
| + throw new IllegalArgumentException( |
| + "Missing required parameters for IntLogicalTypeAnnotation: " + params); |
| + } |
| + boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); |
| + int bitWidth = Integer.parseInt(params.get("bitWidth")); |
| + return LogicalTypeAnnotation.intType(bitWidth, isSigned); |
| + |
| + // JSON |
| + case "JsonLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.jsonType(); |
| + |
| + // BSON |
| + case "BsonLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.bsonType(); |
| + |
| + // UUID |
| + case "UUIDLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.uuidType(); |
| + |
| + // INTERVAL |
| + case "IntervalLogicalTypeAnnotation": |
| + return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); |
| + |
| + default: |
| + throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); |
| + } |
| + } |
| +} |
| diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java |
| new file mode 100644 |
| index 00000000000..a3cba401827 |
| --- /dev/null |
| +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java |
| @@ -0,0 +1,260 @@ |
| +/* |
| + * Licensed to the Apache Software Foundation (ASF) under one |
| + * or more contributor license agreements. See the NOTICE file |
| + * distributed with this work for additional information |
| + * regarding copyright ownership. The ASF licenses this file |
| + * to you under the Apache License, Version 2.0 (the |
| + * "License"); you may not use this file except in compliance |
| + * with the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, |
| + * software distributed under the License is distributed on an |
| + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| + * KIND, either express or implied. See the License for the |
| + * specific language governing permissions and limitations |
| + * under the License. |
| + */ |
| +package org.apache.iceberg.parquet; |
| + |
| +import java.io.IOException; |
| +import java.io.UncheckedIOException; |
| +import java.nio.ByteBuffer; |
| +import java.util.List; |
| +import java.util.Map; |
| +import java.util.NoSuchElementException; |
| +import java.util.function.Function; |
| +import org.apache.comet.parquet.FileReader; |
| +import org.apache.comet.parquet.ParquetColumnSpec; |
| +import org.apache.comet.parquet.ReadOptions; |
| +import org.apache.comet.parquet.RowGroupReader; |
| +import org.apache.comet.parquet.WrappedInputFile; |
| +import org.apache.hadoop.conf.Configuration; |
| +import org.apache.iceberg.Schema; |
| +import org.apache.iceberg.exceptions.RuntimeIOException; |
| +import org.apache.iceberg.expressions.Expression; |
| +import org.apache.iceberg.expressions.Expressions; |
| +import org.apache.iceberg.io.CloseableGroup; |
| +import org.apache.iceberg.io.CloseableIterable; |
| +import org.apache.iceberg.io.CloseableIterator; |
| +import org.apache.iceberg.io.InputFile; |
| +import org.apache.iceberg.mapping.NameMapping; |
| +import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| +import org.apache.iceberg.util.ByteBuffers; |
| +import org.apache.parquet.ParquetReadOptions; |
| +import org.apache.parquet.column.ColumnDescriptor; |
| +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; |
| +import org.apache.parquet.hadoop.metadata.ColumnPath; |
| +import org.apache.parquet.schema.MessageType; |
| + |
| +public class CometVectorizedParquetReader<T> extends CloseableGroup |
| + implements CloseableIterable<T> { |
| + private final InputFile input; |
| + private final ParquetReadOptions options; |
| + private final Schema expectedSchema; |
| + private final Function<MessageType, VectorizedReader<?>> batchReaderFunc; |
| + private final Expression filter; |
| + private final boolean reuseContainers; |
| + private final boolean caseSensitive; |
| + private final int batchSize; |
| + private final NameMapping nameMapping; |
| + private final Map<String, String> properties; |
| + private Long start = null; |
| + private Long length = null; |
| + private ByteBuffer fileEncryptionKey = null; |
| + private ByteBuffer fileAADPrefix = null; |
| + |
| + public CometVectorizedParquetReader( |
| + InputFile input, |
| + Schema expectedSchema, |
| + ParquetReadOptions options, |
| + Function<MessageType, VectorizedReader<?>> readerFunc, |
| + NameMapping nameMapping, |
| + Expression filter, |
| + boolean reuseContainers, |
| + boolean caseSensitive, |
| + int maxRecordsPerBatch, |
| + Map<String, String> properties, |
| + Long start, |
| + Long length, |
| + ByteBuffer fileEncryptionKey, |
| + ByteBuffer fileAADPrefix) { |
| + this.input = input; |
| + this.expectedSchema = expectedSchema; |
| + this.options = options; |
| + this.batchReaderFunc = readerFunc; |
| + // replace alwaysTrue with null to avoid extra work evaluating a trivial filter |
| + this.filter = filter == Expressions.alwaysTrue() ? null : filter; |
| + this.reuseContainers = reuseContainers; |
| + this.caseSensitive = caseSensitive; |
| + this.batchSize = maxRecordsPerBatch; |
| + this.nameMapping = nameMapping; |
| + this.properties = properties; |
| + this.start = start; |
| + this.length = length; |
| + this.fileEncryptionKey = fileEncryptionKey; |
| + this.fileAADPrefix = fileAADPrefix; |
| + } |
| + |
| + private ReadConf conf = null; |
| + |
| + private ReadConf init() { |
| + if (conf == null) { |
| + ReadConf readConf = |
| + new ReadConf( |
| + input, |
| + options, |
| + expectedSchema, |
| + filter, |
| + null, |
| + batchReaderFunc, |
| + nameMapping, |
| + reuseContainers, |
| + caseSensitive, |
| + batchSize); |
| + this.conf = readConf.copy(); |
| + return readConf; |
| + } |
| + return conf; |
| + } |
| + |
| + @Override |
| + public CloseableIterator<T> iterator() { |
| + FileIterator<T> iter = |
| + new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); |
| + addCloseable(iter); |
| + return iter; |
| + } |
| + |
| + private static class FileIterator<T> implements CloseableIterator<T> { |
| + // private final ParquetFileReader reader; |
| + private final boolean[] shouldSkip; |
| + private final VectorizedReader<T> model; |
| + private final long totalValues; |
| + private final int batchSize; |
| + private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata; |
| + private final boolean reuseContainers; |
| + private int nextRowGroup = 0; |
| + private long nextRowGroupStart = 0; |
| + private long valuesRead = 0; |
| + private T last = null; |
| + private final FileReader cometReader; |
| + private ReadConf conf; |
| + |
| + FileIterator( |
| + ReadConf conf, |
| + Map<String, String> properties, |
| + Long start, |
| + Long length, |
| + ByteBuffer fileEncryptionKey, |
| + ByteBuffer fileAADPrefix) { |
| + this.shouldSkip = conf.shouldSkip(); |
| + this.totalValues = conf.totalValues(); |
| + this.reuseContainers = conf.reuseContainers(); |
| + this.model = conf.vectorizedModel(); |
| + this.batchSize = conf.batchSize(); |
| + this.model.setBatchSize(this.batchSize); |
| + this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); |
| + this.cometReader = |
| + newCometReader( |
| + conf.file(), |
| + conf.projection(), |
| + properties, |
| + start, |
| + length, |
| + fileEncryptionKey, |
| + fileAADPrefix); |
| + this.conf = conf; |
| + } |
| + |
| + private FileReader newCometReader( |
| + InputFile file, |
| + MessageType projection, |
| + Map<String, String> properties, |
| + Long start, |
| + Long length, |
| + ByteBuffer fileEncryptionKey, |
| + ByteBuffer fileAADPrefix) { |
| + try { |
| + ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); |
| + |
| + FileReader fileReader = |
| + new FileReader( |
| + new WrappedInputFile(file), |
| + cometOptions, |
| + properties, |
| + start, |
| + length, |
| + ByteBuffers.toByteArray(fileEncryptionKey), |
| + ByteBuffers.toByteArray(fileAADPrefix)); |
| + |
| + List<ColumnDescriptor> columnDescriptors = projection.getColumns(); |
| + |
| + List<ParquetColumnSpec> specs = Lists.newArrayList(); |
| + |
| + for (ColumnDescriptor descriptor : columnDescriptors) { |
| + ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); |
| + specs.add(spec); |
| + } |
| + |
| + fileReader.setRequestedSchemaFromSpecs(specs); |
| + return fileReader; |
| + } catch (IOException e) { |
| + throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); |
| + } |
| + } |
| + |
| + @Override |
| + public boolean hasNext() { |
| + return valuesRead < totalValues; |
| + } |
| + |
| + @Override |
| + public T next() { |
| + if (!hasNext()) { |
| + throw new NoSuchElementException(); |
| + } |
| + if (valuesRead >= nextRowGroupStart) { |
| + advance(); |
| + } |
| + |
| + // batchSize is an integer, so casting to integer is safe |
| + int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); |
| + if (reuseContainers) { |
| + this.last = model.read(last, numValuesToRead); |
| + } else { |
| + this.last = model.read(null, numValuesToRead); |
| + } |
| + valuesRead += numValuesToRead; |
| + |
| + return last; |
| + } |
| + |
| + private void advance() { |
| + while (shouldSkip[nextRowGroup]) { |
| + nextRowGroup += 1; |
| + cometReader.skipNextRowGroup(); |
| + } |
| + RowGroupReader pages; |
| + try { |
| + pages = cometReader.readNextRowGroup(); |
| + } catch (IOException e) { |
| + throw new RuntimeIOException(e); |
| + } |
| + |
| + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); |
| + nextRowGroupStart += pages.getRowCount(); |
| + nextRowGroup += 1; |
| + } |
| + |
| + @Override |
| + public void close() throws IOException { |
| + model.close(); |
| + cometReader.close(); |
| + if (conf != null && conf.reader() != null) { |
| + conf.reader().close(); |
| + } |
| + } |
| + } |
| +} |
| diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java |
| index 31f9e2a80a6..520f142c212 100644 |
| --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java |
| +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java |
| @@ -1124,6 +1124,7 @@ public class Parquet { |
| private NameMapping nameMapping = null; |
| private ByteBuffer fileEncryptionKey = null; |
| private ByteBuffer fileAADPrefix = null; |
| + private boolean isComet; |
| |
| private ReadBuilder(InputFile file) { |
| this.file = file; |
| @@ -1168,6 +1169,11 @@ public class Parquet { |
| return this; |
| } |
| |
| + public ReadBuilder enableComet(boolean enableComet) { |
| + this.isComet = enableComet; |
| + return this; |
| + } |
| + |
| /** |
| * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead |
| */ |
| @@ -1263,7 +1269,7 @@ public class Parquet { |
| } |
| |
| @Override |
| - @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) |
| + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"}) |
| public <D> CloseableIterable<D> build() { |
| FileDecryptionProperties fileDecryptionProperties = null; |
| if (fileEncryptionKey != null) { |
| @@ -1315,16 +1321,35 @@ public class Parquet { |
| } |
| |
| if (batchedReaderFunc != null) { |
| - return new VectorizedParquetReader<>( |
| - file, |
| - schema, |
| - options, |
| - batchedReaderFunc, |
| - mapping, |
| - filter, |
| - reuseContainers, |
| - caseSensitive, |
| - maxRecordsPerBatch); |
| + if (isComet) { |
| + LOG.info("Comet enabled"); |
| + return new CometVectorizedParquetReader<>( |
| + file, |
| + schema, |
| + options, |
| + batchedReaderFunc, |
| + mapping, |
| + filter, |
| + reuseContainers, |
| + caseSensitive, |
| + maxRecordsPerBatch, |
| + properties, |
| + start, |
| + length, |
| + fileEncryptionKey, |
| + fileAADPrefix); |
| + } else { |
| + return new VectorizedParquetReader<>( |
| + file, |
| + schema, |
| + options, |
| + batchedReaderFunc, |
| + mapping, |
| + filter, |
| + reuseContainers, |
| + caseSensitive, |
| + maxRecordsPerBatch); |
| + } |
| } else { |
| Function<MessageType, ParquetValueReader<?>> readBuilder = |
| readerFuncWithSchema != null |
| diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java |
| index 1fb2372ba56..142e5fbadf1 100644 |
| --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java |
| +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java |
| @@ -157,6 +157,14 @@ class ReadConf<T> { |
| return newReader; |
| } |
| |
| + InputFile file() { |
| + return file; |
| + } |
| + |
| + MessageType projection() { |
| + return projection; |
| + } |
| + |
| ParquetValueReader<T> model() { |
| return model; |
| } |
| diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle |
| index 572c32f9292..d155f634a4d 100644 |
| --- a/spark/v3.5/build.gradle |
| +++ b/spark/v3.5/build.gradle |
| @@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { |
| exclude group: 'org.roaringbitmap' |
| } |
| |
| - compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" |
| + compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" |
| |
| implementation libs.parquet.column |
| implementation libs.parquet.hadoop |
| @@ -184,7 +184,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer |
| testImplementation libs.avro.avro |
| testImplementation libs.parquet.hadoop |
| testImplementation libs.awaitility |
| - testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" |
| + testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" |
| |
| // Required because we remove antlr plugin dependencies from the compile configuration, see note above |
| runtimeOnly libs.antlr.runtime |
| @@ -265,6 +265,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio |
| integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') |
| integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') |
| integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') |
| + integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" |
| |
| // runtime dependencies for running Hive Catalog based integration test |
| integrationRuntimeOnly project(':iceberg-hive-metastore') |
| @@ -302,8 +303,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio |
| relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' |
| relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' |
| relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' |
| - relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' |
| - relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' |
| +// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' |
| +// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' |
| relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' |
| relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' |
| relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' |
| diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java |
| index 578845e3da2..0118b30683d 100644 |
| --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java |
| +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java |
| @@ -57,6 +57,16 @@ public abstract class ExtensionsTestBase extends CatalogTestBase { |
| .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") |
| .config( |
| SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .enableHiveSupport() |
| .getOrCreate(); |
| |
| diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java |
| index ecf9e6f8a59..0f8cced69aa 100644 |
| --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java |
| +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java |
| @@ -56,6 +56,16 @@ public class TestCallStatementParser { |
| .master("local[2]") |
| .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) |
| .config("spark.extra.prop", "value") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| TestCallStatementParser.parser = spark.sessionState().sqlParser(); |
| } |
| diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java |
| index 64edb1002e9..5bb449f1ac7 100644 |
| --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java |
| +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java |
| @@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark { |
| .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) |
| .config("spark.sql.catalog.spark_catalog.type", "hadoop") |
| .config("spark.sql.catalog.spark_catalog.warehouse", catalogWarehouse()) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .master("local"); |
| spark = builder.getOrCreate(); |
| } |
| diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java |
| index a5d0456b0b2..4af408f4861 100644 |
| --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java |
| +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java |
| @@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark { |
| "spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") |
| .config("spark.sql.catalog.spark_catalog.type", "hadoop") |
| .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .master("local[*]"); |
| spark = builder.getOrCreate(); |
| Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); |
| diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java |
| index c6794e43c63..f7359197407 100644 |
| --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java |
| +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java |
| @@ -239,6 +239,16 @@ public class DVReaderBenchmark { |
| .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) |
| .config("spark.sql.catalog.spark_catalog.type", "hadoop") |
| .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .master("local[*]") |
| .getOrCreate(); |
| } |
| diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java |
| index ac74fb5a109..e011b8b2510 100644 |
| --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java |
| +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java |
| @@ -223,6 +223,16 @@ public class DVWriterBenchmark { |
| .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) |
| .config("spark.sql.catalog.spark_catalog.type", "hadoop") |
| .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .master("local[*]") |
| .getOrCreate(); |
| } |
| diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java |
| index 68c537e34a4..f66be2f3896 100644 |
| --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java |
| +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java |
| @@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark { |
| } |
| |
| protected void setupSpark(boolean enableDictionaryEncoding) { |
| - SparkSession.Builder builder = SparkSession.builder().config("spark.ui.enabled", false); |
| + SparkSession.Builder builder = |
| + SparkSession.builder() |
| + .config("spark.ui.enabled", false) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true"); |
| if (!enableDictionaryEncoding) { |
| builder |
| .config("parquet.dictionary.page.size", "1") |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java |
| index 16159dcbdff..eba1a2a0fb1 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java |
| @@ -19,18 +19,22 @@ |
| package org.apache.iceberg.spark.data.vectorized; |
| |
| import java.io.IOException; |
| +import org.apache.comet.CometConf; |
| +import org.apache.comet.CometSchemaImporter; |
| import org.apache.comet.parquet.AbstractColumnReader; |
| import org.apache.comet.parquet.ColumnReader; |
| +import org.apache.comet.parquet.ParquetColumnSpec; |
| +import org.apache.comet.parquet.RowGroupReader; |
| import org.apache.comet.parquet.TypeUtil; |
| import org.apache.comet.parquet.Utils; |
| -import org.apache.comet.shaded.arrow.c.CometSchemaImporter; |
| import org.apache.comet.shaded.arrow.memory.RootAllocator; |
| +import org.apache.iceberg.parquet.CometTypeUtils; |
| import org.apache.iceberg.parquet.VectorizedReader; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.spark.SparkSchemaUtil; |
| import org.apache.iceberg.types.Types; |
| import org.apache.parquet.column.ColumnDescriptor; |
| -import org.apache.parquet.column.page.PageReader; |
| +import org.apache.spark.sql.internal.SQLConf; |
| import org.apache.spark.sql.types.DataType; |
| import org.apache.spark.sql.types.Metadata; |
| import org.apache.spark.sql.types.StructField; |
| @@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader<ColumnVector> { |
| |
| private final ColumnDescriptor descriptor; |
| private final DataType sparkType; |
| + private final int fieldId; |
| |
| // The delegated ColumnReader from Comet side |
| private AbstractColumnReader delegate; |
| private boolean initialized = false; |
| private int batchSize = DEFAULT_BATCH_SIZE; |
| private CometSchemaImporter importer; |
| + private ParquetColumnSpec spec; |
| |
| - CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { |
| + CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { |
| this.sparkType = sparkType; |
| this.descriptor = descriptor; |
| + this.fieldId = fieldId; |
| } |
| |
| CometColumnReader(Types.NestedField field) { |
| DataType dataType = SparkSchemaUtil.convert(field.type()); |
| StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); |
| this.sparkType = dataType; |
| - this.descriptor = TypeUtil.convertToParquet(structField); |
| + this.descriptor = |
| + CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); |
| + this.fieldId = field.fieldId(); |
| } |
| |
| public AbstractColumnReader delegate() { |
| @@ -92,7 +101,26 @@ class CometColumnReader implements VectorizedReader<ColumnVector> { |
| } |
| |
| this.importer = new CometSchemaImporter(new RootAllocator()); |
| - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); |
| + |
| + spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); |
| + |
| + boolean useLegacyTime = |
| + Boolean.parseBoolean( |
| + SQLConf.get() |
| + .getConfString( |
| + CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); |
| + boolean useLazyMaterialization = |
| + Boolean.parseBoolean( |
| + SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); |
| + this.delegate = |
| + Utils.getColumnReader( |
| + sparkType, |
| + spec, |
| + importer, |
| + batchSize, |
| + true, // Comet sets this to true for native execution |
| + useLazyMaterialization, |
| + useLegacyTime); |
| this.initialized = true; |
| } |
| |
| @@ -111,9 +139,9 @@ class CometColumnReader implements VectorizedReader<ColumnVector> { |
| * <p>NOTE: this should be called before reading a new Parquet column chunk, and after {@link |
| * CometColumnReader#reset} is called. |
| */ |
| - public void setPageReader(PageReader pageReader) throws IOException { |
| + public void setPageReader(RowGroupReader pageStore) throws IOException { |
| Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); |
| - ((ColumnReader) delegate).setPageReader(pageReader); |
| + ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); |
| } |
| |
| @Override |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java |
| index 04ac69476ad..916face2bf2 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java |
| @@ -22,8 +22,12 @@ import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.util.List; |
| import java.util.Map; |
| +import org.apache.comet.CometRuntimeException; |
| import org.apache.comet.parquet.AbstractColumnReader; |
| -import org.apache.comet.parquet.BatchReader; |
| +import org.apache.comet.parquet.IcebergCometBatchReader; |
| +import org.apache.comet.parquet.RowGroupReader; |
| +import org.apache.comet.vector.CometSelectionVector; |
| +import org.apache.comet.vector.CometVector; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.data.DeleteFilter; |
| import org.apache.iceberg.parquet.VectorizedReader; |
| @@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { |
| // calling BatchReader.nextBatch, the isDeleted value is not yet available, so |
| // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is |
| // available. |
| - private final BatchReader delegate; |
| + private final IcebergCometBatchReader delegate; |
| private DeleteFilter<InternalRow> deletes = null; |
| private long rowStartPosInBatch = 0; |
| |
| @@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { |
| this.hasIsDeletedColumn = |
| readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); |
| |
| - AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; |
| - this.delegate = new BatchReader(abstractColumnReaders); |
| - delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); |
| + this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); |
| } |
| |
| @Override |
| @@ -79,19 +81,22 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { |
| && !(readers[i] instanceof CometPositionColumnReader) |
| && !(readers[i] instanceof CometDeleteColumnReader)) { |
| readers[i].reset(); |
| - readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); |
| + readers[i].setPageReader((RowGroupReader) pageStore); |
| } |
| } catch (IOException e) { |
| throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); |
| } |
| } |
| |
| + AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; |
| for (int i = 0; i < readers.length; i++) { |
| - delegate.getColumnReaders()[i] = this.readers[i].delegate(); |
| + delegateReaders[i] = readers[i].delegate(); |
| } |
| |
| + delegate.init(delegateReaders); |
| + |
| this.rowStartPosInBatch = |
| - pageStore |
| + ((RowGroupReader) pageStore) |
| .getRowIndexOffset() |
| .orElseThrow( |
| () -> |
| @@ -148,9 +153,17 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { |
| Pair<int[], Integer> pair = buildRowIdMapping(vectors); |
| if (pair != null) { |
| int[] rowIdMapping = pair.first(); |
| - numLiveRows = pair.second(); |
| - for (int i = 0; i < vectors.length; i++) { |
| - vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); |
| + if (pair.second() != null) { |
| + numLiveRows = pair.second(); |
| + for (int i = 0; i < vectors.length; i++) { |
| + if (vectors[i] instanceof CometVector) { |
| + vectors[i] = |
| + new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); |
| + } else { |
| + throw new CometRuntimeException( |
| + "Unsupported column vector type: " + vectors[i].getClass()); |
| + } |
| + } |
| } |
| } |
| } |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java |
| index 047c96314b1..88d691a607a 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java |
| @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import org.apache.comet.parquet.ConstantColumnReader; |
| +import org.apache.iceberg.parquet.CometTypeUtils; |
| import org.apache.iceberg.types.Types; |
| import org.apache.spark.sql.types.DataType; |
| import org.apache.spark.sql.types.DataTypes; |
| @@ -34,7 +35,11 @@ class CometConstantColumnReader<T> extends CometColumnReader { |
| super(field); |
| // use delegate to set constant value on the native side to be consumed by native execution. |
| setDelegate( |
| - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); |
| + new ConstantColumnReader( |
| + sparkType(), |
| + CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), |
| + convertToSparkValue(value), |
| + false)); |
| } |
| |
| @Override |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java |
| index 6235bfe4865..cba108e4326 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java |
| @@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader { |
| DeleteColumnReader() { |
| super( |
| DataTypes.BooleanType, |
| - TypeUtil.convertToParquet( |
| + TypeUtil.convertToParquetSpec( |
| new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), |
| false /* useDecimal128 = false */, |
| - false /* isConstant = false */); |
| + false /* isConstant */); |
| this.isDeleted = new boolean[0]; |
| } |
| |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java |
| index bcc0e514c28..98e80068c51 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java |
| @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; |
| |
| import org.apache.comet.parquet.MetadataColumnReader; |
| import org.apache.comet.parquet.Native; |
| +import org.apache.iceberg.parquet.CometTypeUtils; |
| import org.apache.iceberg.types.Types; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.spark.sql.types.DataTypes; |
| @@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader { |
| PositionColumnReader(ColumnDescriptor descriptor) { |
| super( |
| DataTypes.LongType, |
| - descriptor, |
| + CometTypeUtils.descriptorToParquetColumnSpec(descriptor), |
| false /* useDecimal128 = false */, |
| false /* isConstant = false */); |
| } |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java |
| index d36f1a72747..56f8c9bff93 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java |
| @@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReade |
| return null; |
| } |
| |
| - return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc); |
| + return new CometColumnReader( |
| + SparkSchemaUtil.convert(icebergField.type()), desc, parquetFieldId); |
| } |
| } |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java |
| index 780e1750a52..57892ac4c59 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java |
| @@ -109,6 +109,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa |
| // read performance as every batch read doesn't have to pay the cost of allocating memory. |
| .reuseContainers() |
| .withNameMapping(nameMapping()) |
| + .enableComet(parquetConf.readerType() == ParquetReaderType.COMET) |
| .build(); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java |
| index 45bd48aea2e..5362578861c 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java |
| @@ -172,11 +172,11 @@ class SparkBatch implements Batch { |
| return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId()); |
| } |
| |
| - private boolean useCometBatchReads() { |
| + protected boolean useCometBatchReads() { |
| return readConf.parquetVectorizationEnabled() |
| && readConf.parquetReaderType() == ParquetReaderType.COMET |
| && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads) |
| - && taskGroups.stream().allMatch(this::supportsParquetBatchReads); |
| + && taskGroups.stream().allMatch(this::supportsCometBatchReads); |
| } |
| |
| private boolean supportsCometBatchReads(Types.NestedField field) { |
| @@ -186,6 +186,21 @@ class SparkBatch implements Batch { |
| && field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(); |
| } |
| |
| + private boolean supportsCometBatchReads(ScanTask task) { |
| + if (task instanceof ScanTaskGroup) { |
| + ScanTaskGroup<?> taskGroup = (ScanTaskGroup<?>) task; |
| + return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); |
| + |
| + } else if (task.isFileScanTask() && !task.isDataTask()) { |
| + FileScanTask fileScanTask = task.asFileScanTask(); |
| + // Comet can't handle delete files for now |
| + return fileScanTask.file().format() == FileFormat.PARQUET; |
| + |
| + } else { |
| + return false; |
| + } |
| + } |
| + |
| // conditions for using ORC batch reads: |
| // - ORC vectorization is enabled |
| // - all tasks are of type FileScanTask and read only ORC files with no delete files |
| diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java |
| index 106b296de09..967b0d41d08 100644 |
| --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java |
| +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java |
| @@ -24,6 +24,7 @@ import java.util.Map; |
| import java.util.Optional; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| +import org.apache.comet.parquet.SupportsComet; |
| import org.apache.iceberg.BlobMetadata; |
| import org.apache.iceberg.ScanTask; |
| import org.apache.iceberg.ScanTaskGroup; |
| @@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| -abstract class SparkScan implements Scan, SupportsReportStatistics { |
| +abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { |
| private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); |
| private static final String NDV_KEY = "ndv"; |
| |
| @@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { |
| return splitSize; |
| } |
| } |
| + |
| + @Override |
| + public boolean isCometEnabled() { |
| + SparkBatch batch = (SparkBatch) this.toBatch(); |
| + return batch.useCometBatchReads(); |
| + } |
| } |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java |
| index 404ba728460..26d6f9b613f 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java |
| @@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase |
| .master("local[2]") |
| .config("spark.serializer", serializer) |
| .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| } |
| } |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java |
| index 659507e4c5e..eb9cedc34c5 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java |
| @@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes |
| .master("local[2]") |
| .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
| .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java |
| index a218f965ea6..395c02441e7 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java |
| @@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles |
| .master("local[2]") |
| .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
| .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java |
| index 2665d7ba8d3..306e859ce1a 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java |
| @@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting |
| .master("local[2]") |
| .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
| .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java |
| index 3e9f3334ef6..ecf855986b7 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java |
| @@ -77,6 +77,17 @@ public abstract class TestBase extends SparkTestHelperBase { |
| .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") |
| .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) |
| .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .config("spark.comet.exec.broadcastExchange.enabled", "false") |
| .enableHiveSupport() |
| .getOrCreate(); |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java |
| index bc4e722bc86..1a40d8edcb3 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java |
| @@ -59,7 +59,20 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect |
| |
| @BeforeAll |
| public static void startSpark() { |
| - spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java |
| index 0886df957db..ba2ee6b1c2c 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java |
| @@ -57,7 +57,20 @@ public abstract class ScanTestBase extends AvroDataTest { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - ScanTestBase.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + ScanTestBase.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java |
| index 6b7d861364f..f9805fa8df5 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java |
| @@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestCompressionSettings.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @BeforeEach |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java |
| index c4ba96e6340..a0e77db99b9 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java |
| @@ -75,7 +75,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestDataSourceOptions.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java |
| index 348173596e4..bdd31528d3f 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java |
| @@ -110,7 +110,20 @@ public class TestFilteredScan { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestFilteredScan.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java |
| index 84c99a575c8..6f4b2af003b 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java |
| @@ -93,7 +93,20 @@ public class TestForwardCompatibility { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestForwardCompatibility.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestForwardCompatibility.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java |
| index 7eff93d204e..1774acd056b 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java |
| @@ -46,7 +46,20 @@ public class TestIcebergSpark { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestIcebergSpark.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestIcebergSpark.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java |
| index 9464f687b0e..b5dbb88dd5a 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java |
| @@ -112,7 +112,20 @@ public class TestPartitionPruning { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestPartitionPruning.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); |
| |
| String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme); |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java |
| index 5c218f21c47..e48e6d121f0 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java |
| @@ -107,7 +107,20 @@ public class TestPartitionValues { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestPartitionValues.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestPartitionValues.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java |
| index a7334a580ca..dca7fdd9c69 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java |
| @@ -87,7 +87,20 @@ public class TestSnapshotSelection { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestSnapshotSelection.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java |
| index 182b1ef8f5a..5da4a3c90aa 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java |
| @@ -120,7 +120,20 @@ public class TestSparkDataFile { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestSparkDataFile.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestSparkDataFile.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java |
| index fb2b312bed9..5927a408c6b 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java |
| @@ -96,7 +96,20 @@ public class TestSparkDataWrite { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestSparkDataWrite.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterEach |
| @@ -140,7 +153,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| for (ManifestFile manifest : |
| @@ -210,7 +223,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| } |
| @@ -256,7 +269,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| } |
| @@ -309,7 +322,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| } |
| @@ -352,7 +365,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| } |
| @@ -392,7 +405,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| |
| @@ -458,7 +471,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| } |
| @@ -622,7 +635,7 @@ public class TestSparkDataWrite { |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSameSizeAs(expected); |
| assertThat(actual).as("Result rows should match").isEqualTo(expected); |
| |
| @@ -708,7 +721,7 @@ public class TestSparkDataWrite { |
| // Since write and commit succeeded, the rows should be readable |
| Dataset<Row> result = spark.read().format("iceberg").load(targetLocation); |
| List<SimpleRecord> actual = |
| - result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| + result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); |
| assertThat(actual).as("Number of rows should match").hasSize(records.size() + records2.size()); |
| assertThat(actual) |
| .describedAs("Result rows should match") |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java |
| index becf6a064dc..beb7c801b05 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java |
| @@ -83,7 +83,20 @@ public class TestSparkReadProjection extends TestReadProjection { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestSparkReadProjection.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestSparkReadProjection.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| ImmutableMap<String, String> config = |
| ImmutableMap.of( |
| "type", "hive", |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java |
| index 4f1cef5d373..0e34e08db4a 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java |
| @@ -136,6 +136,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests { |
| .config("spark.ui.liveUpdate.period", 0) |
| .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") |
| .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .enableHiveSupport() |
| .getOrCreate(); |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java |
| index baf7fa8f88a..509c5deba51 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java |
| @@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter { |
| SparkSession.builder() |
| .master("local[2]") |
| .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .enableHiveSupport() |
| .getOrCreate(); |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java |
| index c84a65cbe95..42ee74280c1 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java |
| @@ -67,6 +67,16 @@ public class TestStructuredStreaming { |
| SparkSession.builder() |
| .master("local[2]") |
| .config("spark.sql.shuffle.partitions", 4) |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .getOrCreate(); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java |
| index 306444b9f29..0ed52fd6cfb 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java |
| @@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestTimestampWithoutZone.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| } |
| |
| @AfterAll |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java |
| index 841268a6be0..e290c246f7b 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java |
| @@ -80,7 +80,20 @@ public class TestWriteMetricsConfig { |
| |
| @BeforeAll |
| public static void startSpark() { |
| - TestWriteMetricsConfig.spark = SparkSession.builder().master("local[2]").getOrCreate(); |
| + TestWriteMetricsConfig.spark = |
| + SparkSession.builder() |
| + .master("local[2]") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| + .getOrCreate(); |
| TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); |
| } |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java |
| index 6e09252704a..438bc4da575 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java |
| @@ -60,6 +60,16 @@ public class TestAggregatePushDown extends CatalogTestBase { |
| SparkSession.builder() |
| .master("local[2]") |
| .config("spark.sql.iceberg.aggregate_pushdown", "true") |
| + .config("spark.plugins", "org.apache.spark.CometPlugin") |
| + .config( |
| + "spark.shuffle.manager", |
| + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") |
| + .config("spark.comet.explainFallback.enabled", "true") |
| + .config("spark.sql.iceberg.parquet.reader-type", "COMET") |
| + .config("spark.memory.offHeap.enabled", "true") |
| + .config("spark.memory.offHeap.size", "10g") |
| + .config("spark.comet.use.lazyMaterialization", "false") |
| + .config("spark.comet.schemaEvolution.enabled", "true") |
| .enableHiveSupport() |
| .getOrCreate(); |
| |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java |
| index 9d2ce2b388a..5e233688488 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java |
| @@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog { |
| String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", ""); |
| |
| if (sparkFilter != null) { |
| - assertThat(planAsString) |
| - .as("Post scan filter should match") |
| - .contains("Filter (" + sparkFilter + ")"); |
| + assertThat(planAsString).as("Post scan filter should match").contains("CometFilter"); |
| } else { |
| assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter ("); |
| } |
| diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java |
| index 6719c45ca96..2515454401a 100644 |
| --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java |
| +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java |
| @@ -616,7 +616,7 @@ public class TestStoragePartitionedJoins extends TestBaseWithCatalog { |
| + "FROM %s t1 " |
| + "INNER JOIN %s t2 " |
| + "ON t1.id = t2.id AND t1.%s = t2.%s " |
| - + "ORDER BY t1.id, t1.%s", |
| + + "ORDER BY t1.id, t1.%s, t1.salary", |
| sourceColumnName, |
| tableName, |
| tableName(OTHER_TABLE_NAME), |