blob: d6d8c5f26125cefa75ec064f12e76bb054d578fe [file] [log] [blame] [view]
# Documentation
## PySpark with Data Frames
With the inclusion of the Cassandra Data Source, PySpark can now be used with the Connector to
access Cassandra data. This does not require DataStax Enterprise but you are limited to DataFrame
only operations.
### Setup
To enable Cassandra access the Spark Cassandra Connector assembly jar must be included on both the
driver and executor classpath for the PySpark Java Gateway. This can be done by starting the PySpark
shell similarly to how the spark shell is started. The preferred method is now to use the maven artifact.
```bash
./bin/pyspark \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
```
### Catalogs
Spark allows you to manipulate external data with and without a Catalog.
For a short intro and more details about Catalogs see [Quick Start](0_quick_start.md) and
[Data Frames](14_data_frames.md).
#### Loading a DataFrame
Loading a data set with DatasourceV2 requires creating a Catalog Reference to your Cassandra Cluster.
```python
spark.conf.set("spark.sql.catalog.myCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.read.table("myCatalog.myKs.myTab").show()
```
#### Saving a DataFrame to Cassandra
A DataFrame can be saved to an *existing* Cassandra table by using the the `saveAsTable` method with a catalog, keyspace
and a table name specified.
```python
spark.range(1, 10)\
.selectExpr("id as k")\
.write\
.mode("append")\
.partitionBy("k")\
.saveAsTable("myCatalog.myKs.myTab")
```
### Manipulating data without a Catalog
#### Loading a DataFrame
A DataFrame can be created which links to Cassandra by using the the `org.apache.spark.sql.cassandra`
source and by specifying keyword arguments for `keyspace` and `table`.
#### Example Loading a Cassandra Table as a Pyspark DataFrame
```python
spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="kv", keyspace="test")\
.load().show()
```
```
+-+-+
|k|v|
+-+-+
|5|5|
|1|1|
|2|2|
|4|4|
|3|3|
+-+-+
```
#### Saving a DataFrame to Cassandra
A DataFrame can be saved to an *existing* Cassandra table by using the the `org.apache.spark.sql.cassandra` source and by specifying keyword arguments for `keyspace` and `table` and saving mode (`append`, `overwrite`, `error` or `ignore`, see [Data Sources API doc](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes)).
##### Example Saving to a Cassandra Table as a Pyspark DataFrame
```python
df.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="kv", keyspace="test")\
.save()
```
The options and parameters are identical to the Scala Data Frames Api so
please see [Data Frames](14_data_frames.md) for more information.
### Passing options with periods to the DataFrameReader
Python does not support using periods(".") in variable names. This makes it
slightly more difficult to pass SCC options to the DataFrameReader. The `options`
function takes `kwargs**` which means you can't directly pass in keys. There is a
workaround though. Python allows you to pass a dictionary as a representation of kwargs and dictionaries
can have keys with periods.
#### Example of using a dictionary as kwargs
load_options = { "table": "kv", "keyspace": "test", "spark.cassandra.input.split.size_in_mb": "10"}
spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load().show()
[Next - Spark Partitioners](16_partitioning.md)