| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Supported Spark Data Sources |
| |
| ## File Formats |
| |
| ### Parquet |
| |
| When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types |
| in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, |
| enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native |
| execution to happen after that, but the process may not be efficient. |
| |
| ### Apache Iceberg |
| |
| Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for more information. |
| |
| [Iceberg Guide]: iceberg.md |
| |
| ### CSV |
| |
| Comet provides experimental native CSV scan support. When `spark.comet.scan.csv.v2.enabled` is enabled, CSV files |
| are read natively for improved performance. This feature is experimental and performance benefits are |
| workload-dependent. |
| |
| Alternatively, when `spark.comet.convert.csv.enabled` is enabled, data from Spark's CSV reader is immediately |
| converted into Arrow format, allowing native execution to happen after that. |
| |
| ### JSON |
| |
| Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately |
| converted into Arrow format, allowing native execution to happen after that. |
| |
| ## Data Catalogs |
| |
| ### Apache Iceberg |
| |
| See the dedicated [Comet and Iceberg Guide](iceberg.md). |
| |
| ## Supported Storages |
| |
| Comet supports most standard storage systems, such as local file system and object storage. |
| |
| ### HDFS |
| |
| Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources) |
| |
| ### Using experimental native DataFusion reader |
| |
| Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only |
| |
| To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed |
| |
| Example: |
| Build a Comet for `spark-3.5` provide a JDK path in `JAVA_HOME` |
| Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK |
| |
| ```shell |
| export JAVA_HOME="/opt/homebrew/opt/openjdk@11" |
| make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server" |
| ``` |
| |
| Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled) |
| and add additional parameters |
| |
| ```shell |
| --conf spark.comet.scan.impl=native_datafusion \ |
| --conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \ |
| --conf spark.hadoop.dfs.client.use.datanode.hostname = true \ |
| --conf dfs.client.use.datanode.hostname = true |
| ``` |
| |
| Query a struct type from Remote HDFS |
| |
| ```shell |
| spark.read.parquet("hdfs://namenode:9000/user/data").show(false) |
| |
| root |
| |-- id: integer (nullable = true) |
| |-- first_name: string (nullable = true) |
| |-- personal_info: struct (nullable = true) |
| | |-- firstName: string (nullable = true) |
| | |-- lastName: string (nullable = true) |
| | |-- ageInYears: integer (nullable = true) |
| |
| 25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version $COMET_VERSION initialized |
| == Physical Plan == |
| * CometColumnarToRow (2) |
| +- CometNativeScan: (1) |
| |
| |
| (1) CometNativeScan: |
| Output [3]: [id#0, first_name#1, personal_info#4] |
| Arguments: [id#0, first_name#1, personal_info#4] |
| |
| (2) CometColumnarToRow [codegen id : 1] |
| Input [3]: [id#0, first_name#1, personal_info#4] |
| |
| |
| 25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000) |
| +---+----------+-----------------+ |
| |id |first_name|personal_info | |
| +---+----------+-----------------+ |
| |2 |Jane |{Jane, Smith, 34}| |
| |1 |John |{John, Doe, 28} | |
| +---+----------+-----------------+ |
| |
| |
| |
| ``` |
| |
| Verify the native scan type should be `CometNativeScan`. |
| |
| More on [HDFS Reader](https://github.com/apache/datafusion-comet/blob/main/native/hdfs/README.md) |
| |
| ### Local HDFS development |
| |
| - Configure local machine network. Add hostname to `/etc/hosts` |
| |
| ```shell |
| 127.0.0.1 localhost namenode datanode1 datanode2 datanode3 |
| ::1 localhost namenode datanode1 datanode2 datanode3 |
| ``` |
| |
| - Start local HDFS cluster, 3 datanodes, namenode url is `namenode:9000` |
| |
| ```shell |
| docker compose -f kube/local/hdfs-docker-compose.yml up |
| ``` |
| |
| - Check the local namenode is up and running on `http://localhost:9870/dfshealth.html#tab-overview` |
| - Build a project with HDFS support |
| |
| ```shell |
| JAVA_HOME="/opt/homebrew/opt/openjdk@11" make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L /opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home/lib/server" |
| ``` |
| |
| - Run local test |
| |
| ```scala |
| |
| withSQLConf( |
| CometConf.COMET_ENABLED.key -> "true", |
| CometConf.COMET_EXEC_ENABLED.key -> "true", |
| CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, |
| SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", |
| "fs.defaultFS" -> "hdfs://namenode:9000", |
| "dfs.client.use.datanode.hostname" -> "true") { |
| val df = spark.read.parquet("/tmp/2") |
| df.show(false) |
| df.explain("extended") |
| } |
| } |
| ``` |
| |
| Or use `spark-shell` with HDFS support as described [above](#using-experimental-native-datafusion-reader) |
| |
| ## S3 |
| |
| #### Root CA Certificates |
| |
| One major difference between Spark and Comet is the mechanism for discovering Root |
| CA Certificates. Spark uses the JVM to read CA Certificates from the Java Trust Store, but native Comet |
| scans use system Root CA Certificates (typically stored |
| in `/etc/ssl/certs` on Linux). These scans will not be able to interact with S3 if the Root CA Certificates are not |
| installed. |
| |
| #### Supported Credential Providers |
| |
| AWS credential providers can be configured using the `fs.s3a.aws.credentials.provider` configuration. The following table shows the supported credential providers and their configuration options: |
| |
| | Credential provider | Description | Supported Options | |
| | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------- | |
| | `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` | Access S3 using access key and secret key | `fs.s3a.access.key`, `fs.s3a.secret.key` | |
| | `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider` | Access S3 using temporary credentials | `fs.s3a.access.key`, `fs.s3a.secret.key`, `fs.s3a.session.token` | |
| | `org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider` | Access S3 using AWS STS assume role | `fs.s3a.assumed.role.arn`, `fs.s3a.assumed.role.session.name` (optional), `fs.s3a.assumed.role.credentials.provider` (optional) | |
| | `org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider` | Access S3 using EC2 instance profile or ECS task credentials (tries ECS first, then IMDS) | None (auto-detected) | |
| | `org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider`<br/>`com.amazonaws.auth.AnonymousAWSCredentials`<br/>`software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider` | Access S3 without authentication (public buckets only) | None | |
| | `com.amazonaws.auth.EnvironmentVariableCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider` | Load credentials from environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`) | None | |
| | `com.amazonaws.auth.InstanceProfileCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider` | Access S3 using EC2 instance metadata service (IMDS) | None | |
| | `com.amazonaws.auth.ContainerCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider`<br/>`com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper` | Access S3 using ECS task credentials | None | |
| | `com.amazonaws.auth.WebIdentityTokenCredentialsProvider`<br/>`software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider` | Authenticate using web identity token file | None | |
| |
| Multiple credential providers can be specified in a comma-separated list using the `fs.s3a.aws.credentials.provider` configuration, just as Hadoop AWS supports. If `fs.s3a.aws.credentials.provider` is not configured, Hadoop S3A's default credential provider chain will be used. All configuration options also support bucket-specific overrides using the pattern `fs.s3a.bucket.{bucket-name}.{option}`. |