Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from source rather than using available artifacts in Maven
Run a Maven install so that we can compile Iceberg against latest Comet:
mvn install -DskipTests
Build the release JAR to be used from Spark:
make release
Set COMET_JAR env var:
export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.9.0.jar
Clone the Iceberg repository.
git clone git@github.com:apache/iceberg.git
It will be necessary to make some small changes to Iceberg:
0.9.0.import org.apache.comet.shaded.arrow.c.CometSchemaImporter; with import org.apache.comet.CometSchemaImporter;SparkBatchQueryScan so that it implements the SupportsComet interface// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' // relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
Perform a clean build
./gradlew clean ./gradlew build -x test -x integrationTest
Set ICEBERG_JAR environment variable.
export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.10.0-SNAPSHOT.jar
Launch Spark Shell:
$SPARK_HOME/bin/spark-shell \ --jars $COMET_JAR,$ICEBERG_JAR \ --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.spark_catalog.type=hadoop \ --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ --conf spark.sql.iceberg.parquet.reader-type=COMET \ --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=16g
Create an Iceberg table. Note that Comet will not accelerate this part.
scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")
Comet should now be able to accelerate reading the table:
scala> spark.sql(s"SELECT * from t1").show()
This should produce the following output:
scala> spark.sql(s"SELECT * from t1").show() 25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized 25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): CollectLimit +- Project [COMET: toprettystring is not supported] +- CometScanWrapper +---+---+ | c0| c1| +---+---+ | 0| 0| | 1| 1| | 2| 2| | 3| 3| | 4| 4| | 5| 5| | 6| 6| | 7| 7| | 8| 8| | 9| 9| | 10| 10| | 11| 11| | 12| 12| | 13| 13| | 14| 14| | 15| 15| | 16| 16| | 17| 17| | 18| 18| | 19| 19| +---+---+ only showing top 20 rows