Supported Spark Data Sources

File Formats

Parquet

When spark.comet.scan.enabled is enabled, Parquet scans will be performed natively by Comet if all data types in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, enabling spark.comet.convert.parquet.enabled will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient.

Apache Iceberg

Comet accelerates Iceberg scans of Parquet files. See the Iceberg Guide for more information.

CSV

Comet provides experimental native CSV scan support. When spark.comet.scan.csv.v2.enabled is enabled, CSV files are read natively for improved performance. This feature is experimental and performance benefits are workload-dependent.

Alternatively, when spark.comet.convert.csv.enabled is enabled, data from Spark's CSV reader is immediately converted into Arrow format, allowing native execution to happen after that.

JSON

Comet does not provide native JSON scan, but when spark.comet.convert.json.enabled is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that.

Data Catalogs

Apache Iceberg

See the dedicated Comet and Iceberg Guide.

Supported Storages

Comet supports most standard storage systems, such as local file system and object storage.

HDFS

Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for supported formats

Using experimental native DataFusion reader

Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only

To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed

Example: Build a Comet for spark-3.5 provide a JDK path in JAVA_HOME Provide the JRE linker path in RUSTFLAGS, the path can vary depending on the system. Typically JRE linker is a part of installed JDK

export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"

Start Comet with experimental reader and HDFS support as described and add additional parameters

--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
--conf dfs.client.use.datanode.hostname = true

Query a struct type from Remote HDFS

spark.read.parquet("hdfs://namenode:9000/user/data").show(false)

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- personal_info: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |    |-- ageInYears: integer (nullable = true)

25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version $COMET_VERSION initialized
== Physical Plan ==
* CometColumnarToRow (2)
+- CometNativeScan:  (1)


(1) CometNativeScan:
Output [3]: [id#0, first_name#1, personal_info#4]
Arguments: [id#0, first_name#1, personal_info#4]

(2) CometColumnarToRow [codegen id : 1]
Input [3]: [id#0, first_name#1, personal_info#4]


25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
+---+----------+-----------------+
|id |first_name|personal_info    |
+---+----------+-----------------+
|2  |Jane      |{Jane, Smith, 34}|
|1  |John      |{John, Doe, 28}  |
+---+----------+-----------------+



Verify the native scan type should be CometNativeScan.

More on HDFS Reader

Local HDFS development

  • Configure local machine network. Add hostname to /etc/hosts
127.0.0.1	localhost   namenode datanode1 datanode2 datanode3
::1             localhost namenode datanode1 datanode2 datanode3
  • Start local HDFS cluster, 3 datanodes, namenode url is namenode:9000
docker compose -f kube/local/hdfs-docker-compose.yml up
  • Check the local namenode is up and running on http://localhost:9870/dfshealth.html#tab-overview
  • Build a project with HDFS support
JAVA_HOME="/opt/homebrew/opt/openjdk@11" make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L /opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home/lib/server"
  • Run local test

withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", "fs.defaultFS" -> "hdfs://namenode:9000", "dfs.client.use.datanode.hostname" -> "true") { val df = spark.read.parquet("/tmp/2") df.show(false) df.explain("extended") } }

Or use spark-shell with HDFS support as described above

S3

Root CA Certificates

One major difference between Spark and Comet is the mechanism for discovering Root CA Certificates. Spark uses the JVM to read CA Certificates from the Java Trust Store, but native Comet scans use system Root CA Certificates (typically stored in /etc/ssl/certs on Linux). These scans will not be able to interact with S3 if the Root CA Certificates are not installed.

Supported Credential Providers

AWS credential providers can be configured using the fs.s3a.aws.credentials.provider configuration. The following table shows the supported credential providers and their configuration options:

Credential providerDescriptionSupported Options
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProviderAccess S3 using access key and secret keyfs.s3a.access.key, fs.s3a.secret.key
org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProviderAccess S3 using temporary credentialsfs.s3a.access.key, fs.s3a.secret.key, fs.s3a.session.token
org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProviderAccess S3 using AWS STS assume rolefs.s3a.assumed.role.arn, fs.s3a.assumed.role.session.name (optional), fs.s3a.assumed.role.credentials.provider (optional)
org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProviderAccess S3 using EC2 instance profile or ECS task credentials (tries ECS first, then IMDS)None (auto-detected)
org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider
com.amazonaws.auth.AnonymousAWSCredentials
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
Access S3 without authentication (public buckets only)None
com.amazonaws.auth.EnvironmentVariableCredentialsProvider
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
Load credentials from environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN)None
com.amazonaws.auth.InstanceProfileCredentialsProvider
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
Access S3 using EC2 instance metadata service (IMDS)None
com.amazonaws.auth.ContainerCredentialsProvider
software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper
Access S3 using ECS task credentialsNone
com.amazonaws.auth.WebIdentityTokenCredentialsProvider
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider
Authenticate using web identity token fileNone

Multiple credential providers can be specified in a comma-separated list using the fs.s3a.aws.credentials.provider configuration, just as Hadoop AWS supports. If fs.s3a.aws.credentials.provider is not configured, Hadoop S3A's default credential provider chain will be used. All configuration options also support bucket-specific overrides using the pattern fs.s3a.bucket.{bucket-name}.{option}.