blob: 4ff59e21e729017313f0fd1e8bc5d0b2bf509096 [file] [log] [blame] [view]
# Documentation
## Accessing Cassandra data with CassandraRDD
This section describes how to access data from Cassandra table with Spark.
### Obtaining a Cassandra table as an `RDD`
To get a Spark RDD that represents a Cassandra table,
call the `cassandraTable` method on the `SparkContext` object.
```scala
sc.cassandraTable("keyspace name", "table name")
```
If no explicit type is given to `cassandraTable`, the result of this expression is `CassandraRDD[CassandraRow]`.
Create this keyspace and table in Cassandra using cqlsh:
```sql
CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.words (word text PRIMARY KEY, count int);
```
Load data into the table:
```scala
INSERT INTO test.words (word, count) VALUES ('foo', 20);
INSERT INTO test.words (word, count) VALUES ('bar', 20);
```
Now you can read that table as `RDD`:
```scala
val rdd = sc.cassandraTable("test", "words")
// rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.rdd.reader.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:41
rdd.toArray.foreach(println)
// CassandraRow{word: bar, count: 20}
// CassandraRow{word: foo, count: 20}
```
### Using emptyCassandraRDD implementation
To create an instance of `CassandraRDD` for a table which does **not** exist use the emptyCassandraRDD method.
`emptyCassandraRDD`s do not perform validation or create partitions so they can be used to represent absent
tables. To create one, either initialize a `CassandraRDD` as usual and then call `toEmptyCassandraRDD`
method on it or call `emptyCassandraTable` method on Spark context.
Example:
```scala
// validation is deferred, so it is not triggered during rdd creation
val rdd = sc.cassandraTable[SomeType]("ks", "not_existing_table")
val emptyRDD = rdd.toEmptyCassandraRDD
val emptyRDD2 = sc.emptyCassandraTable[SomeType]("ks", "not_existing_table"))
```
### Reading primitive column values
You can read columns in a Cassandra table using the get methods of the `CassandraRow` object.
The get methods access individual column values by column name or column index.
Type conversions are applied on the fly. Use `getOption` variants when you expect to receive Cassandra null values.
Continuing with the previous example, follow these steps to access individual column values.
Store the first item of the rdd in the firstRow value.
```scala
val firstRow = rdd.first
// firstRow: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{word: bar, count: 20}
```
Get the number of columns and column names:
```scala
firstRow.columnNames // Stream(word, count)
firstRow.size // 2
```
Use one of `getXXX` getters to obtain a column value converted to desired type:
```scala
firstRow.getInt("count") // 20
firstRow.getLong("count") // 20L
```
Or use a generic get to query the table by passing the return type directly:
```scala
firstRow.get[Int]("count") // 20
firstRow.get[Long]("count") // 20L
firstRow.get[BigInt]("count") // BigInt(20)
firstRow.get[java.math.BigInteger]("count") // BigInteger(20)
```
### Working with nullable data
When reading potentially `null` data, use the `Option` type on the Scala side to prevent getting a `NullPointerException`.
```scala
firstRow.getIntOption("count") // Some(20)
firstRow.get[Option[Int]]("count") // Some(20)
```
### Reading collections
You can read collection columns in a Cassandra table using the `getList`, `getSet`, `getMap` or generic `get`
methods of the `CassandraRow` object. The `get` methods access
the collection column and return a corresponding Scala collection.
The generic `get` method lets you specify the precise type of the returned collection.
Assuming you set up the test keyspace earlier, follow these steps to access a Cassandra collection.
In the test keyspace, set up a collection set using cqlsh:
```sql
CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>);
INSERT INTO test.users (username, emails)
VALUES ('someone', {'someone@email.com', 's@email.com'});
```
Then in your application, retrieve the first row:
```scala
val row = sc.cassandraTable("test", "users").first
// row: com.datastax.spark.connector.rdd.reader.CassandraRow = CassandraRow{username: someone, emails: [someone@email.com, s@email.com]}
```
Query the collection set in Cassandra from Spark:
```scala
row.getList[String]("emails") // Vector(someone@email.com, s@email.com)
row.get[List[String]]("emails") // List(someone@email.com, s@email.com)
row.get[Seq[String]]("emails") // List(someone@email.com, s@email.com) :Seq[String]
row.get[IndexedSeq[String]]("emails") // Vector(someone@email.com, s@email.com) :IndexedSeq[String]
row.get[Set[String]]("emails") // Set(someone@email.com, s@email.com)
```
It is also possible to convert a collection to CQL `String` representation:
```scala
row.get[String]("emails") // "[someone@email.com, s@email.com]"
```
A `null` collection is equivalent to an empty collection, therefore you don't need to use `get[Option[...]]`
with collections.
### Reading columns of Cassandra User Defined Types
UDT column values are represented by `com.datastax.spark.connector.UDTValue` type.
The same set of getters is available on `UDTValue` as on `CassandraRow`.
Assume the following table definition:
```sql
CREATE TYPE test.address (city text, street text, number int);
CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);
```
You can read the address field of the company in the following way:
```scala
val address: UDTValue = row.getUDTValue("address")
val city = address.getString("city")
val street = address.getString("street")
val number = address.getInt("number")
```
### Data type conversions
The following table shows recommended Scala types corresponding to Cassandra column types.
| Cassandra type | Scala types
|-------------------|--------------------------------------------
| `ascii`, `text` | `String`
| `bigint` | `Long`
| `blob` | `ByteBuffer`, `Array[Byte]`
| `boolean` | `Boolean`, `Int`
| `counter` | `Long`
| `decimal` | `BigDecimal`, `java.math.BigDecimal`
| `double` | `Double`
| `float` | `Float`
| `inet` | `java.net.InetAddress`
| `int` | `Int`
| `list` | `Vector`, `List`, `Iterable`, `Seq`, `IndexedSeq`, `java.util.List`
| `map` | `Map`, `TreeMap`, `java.util.HashMap`
| `set` | `Set`, `TreeSet`, `java.util.HashSet`
| `text` | `String`
| `timestamp` | `Long`, `java.util.Date`, `java.sql.Date`, `org.joda.time.DateTime`
| `uuid` | `java.util.UUID`
| `timeuuid` | `java.util.UUID`
| `varchar` | `String`
| `varint` | `BigInt`, `java.math.BigInteger`
| `frozen<tuple<>>` | `TupleValue`, `scala.Product`, `org.apache.commons.lang3.tuple.Pair`, `org.apache.commons.lang3.tuple.Triple`
| user defined | `UDTValue`
Other conversions might work, but may cause loss of precision or may not work for all values.
All types are convertible to strings. Converting strings to numbers, dates,
addresses or UUIDs is possible as long as the string has proper
contents, defined by the CQL3 standard. Maps can be implicitly converted to/from sequences of key-value tuples.
## Accessing Cassandra with SparkSQL (since 1.1)
It is possible to query Cassandra using SparkSQL. Configure your `SparkContext` object
to use Cassandra as usual and then wrap it in a `org.apache.spark.sql.cassandra.CassandraSQLContext` object.
To execute an SQL query, call `CassandraSQLContext#sql` method.
```scala
import org.apache.spark.sql.cassandra.CassandraSQLContext
val sc: SparkContext = ...
val cc = new CassandraSQLContext(sc)
val rdd: SchemaRDD = cc.sql("SELECT * from keyspace.table WHERE ...")
```
### Refresh local Cassandra table schema cache
`CassandraSQLContext` caches Cassandra table schema locally. The cache expires in 10 minutes by default. It can be manually
refreshed by calling `cassandraSQLContext.refreshCassandraSchema()` when a Cassandra table schema changes and user can't
wait for the cache expires automatically.
## Performing Efficient Joins With Cassandra Tables (since 1.2)
### Repartitioning RDDs based on a Cassandra Table's Replication
The method `repartitionByCassandraReplica` can be used to relocate data in an RDD to match the replication strategy of
a given table and keyspace. The method will look for partition key information in the given RDD and then use those values
to determine which nodes in the Cluster would be responsible for that data. You can control the resultant number of partitions
with the parameter `partitionsPerHost`.
```scala
//CREATE TABLE test.shopping_history ( cust_id INT, date TIMESTAMP, product TEXT, quantity INT, PRIMARY KEY (cust_id, date, product));
case class CustomerID(cust_id: Int) // Defines partition key
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_))
val repartitioned = idsOfInterest.repartitionByCassandraReplica("test", "shopping_history", 10)
repartitioned.partitions
//res0: Array[org.apache.spark.Partition] = Array(ReplicaPartition(0,Set(/127.0.0.1)), ...)
repartitioned.partitioner
//res1: Option[org.apache.spark.Partitioner] = Some(com.datastax.spark.connector.rdd.partitioner.ReplicaPartitioner@4484d6c2)
scala> repartitioned
//res2: com.datastax.spark.connector.rdd.partitioner.CassandraPartitionedRDD[CustomerID] = CassandraPartitionedRDD[5] at RDD at CassandraPartitionedRDD.scala:12
```
### Using joinWithCassandraTable
The connector supports using any RDD as a source of a direct join with a Cassandra Table through `joinWithCassandraTable`.
Any RDD which is writable to a Cassandra table via the `saveToCassandra` method can be used with this procedure as long
as the full partition key is specified.
`joinWithCassandraTable` utilizes the java drive to execute a single query for every partition
required by the source RDD so no un-needed data will be requested or serialized. This means a join between any RDD
and a Cassandra Table can be preformed without doing a full table scan. . When preformed
between two Cassandra Tables which share the same partition key this will *not* require movement of data between machines.
In all cases this method will use the source RDD's partitioning and placement for data locality.
`joinWithCassandraTable` is not affected by `cassandra.input.split.size_in_mb` since partitions are automatically inherited from
the source RDD. The other input properties have their normal effects.
####Join between two Cassandra Tables Sharing a Partition Key
```scala
//CREATE TABLE test.customer_info ( cust_id INT, name TEXT, address TEXT, PRIMARY KEY (cust_id));
val internalJoin = sc.cassandraTable("test","customer_info").joinWithCassandraTable("test","shopping_history")
internalJoin.toDebugString
//res4: String = (1) CassandraJoinRDD[9] at RDD at CassandraRDD.scala:14 []
internalJoin.collect
internalJoin.collect.foreach(println)
//(CassandraRow{cust_id: 3, address: Poland, name: Jacek},CassandraRow{cust_id: 3, date: 2015-03-09 13:59:25-0700, product: Guacamole, quantity: 2})
//(CassandraRow{cust_id: 0, address: West Coast, name: Russ},CassandraRow{cust_id: 0, date: 2015-03-09 13:58:14-0700, product: Scala is Fun, quantity: 1})
//(CassandraRow{cust_id: 0, address: West Coast, name: Russ},CassandraRow{cust_id: 0, date: 2015-03-09 13:59:04-0700, product: Candy, quantity: 3})
```
####Join with Generic RDD
```scala
val joinWithRDD = sc.parallelize(0 to 5).filter(_%2==0).map(CustomerID(_)).joinWithCassandraTable("test","customer_info")
joinWithRDD.collect.foreach(println)
//(CustomerID(0),CassandraRow{cust_id: 0, address: West Coast, name: Russ})
//(CustomerID(2),CassandraRow{cust_id: 2, address: Poland, name: Piotr})
```
The `repartitionByCassandraReplica` method can be used prior to calling joinWithCassandraTable to obtain data locality,
such that each spark partition will only require queries to their local node. This method can also be used with two
Cassandra Tables which have partitioned with different partition keys.
####Join with a generic RDD after repartitioning
```scala
val oddIds = sc.parallelize(0 to 5).filter(_%2==1).map(CustomerID(_))
val localQueryRDD = oddIds.repartitionByCassandraReplica("test","customer_info").joinWithCassandra("test","customer_info")
repartitionRDD.collect.foreach(println)
//(CustomerID(1),CassandraRow{cust_id: 1, address: East Coast, name: Helena})
//(CustomerID(3),CassandraRow{cust_id: 3, address: Poland, name: Jacek})
```
###Compatibility of joinWithCassandraTable and other CassandraRDD APIs
The result of a joinWithCassandraRDD is compatible with all of the standard CassandraRDD api options with one additional
function, `.on`. Use `.on(ColumnSelector)` for specifying which columns to join on. Since `.on` only applies to CassandraJoinRDDs
it must immediately follow the `joinWithCassandraTable` call.
Joining on any column or columns in
the primary key is supported as long as it can be made into a valid CQL query. This means the entire partition key must
be specified and if any clustering key is specified all previous clustering keys must be supplied as well.
####Cassandra Operations on a CassandraJoinRDD
```scala
val recentOrders = internalJoin.where("date > '2015-03-09'") // Where applied to every partition
val someOrders = internalJoin.limit(1) // Returns at most 1 CQL Row per Spark Partition
val numOrders = internalJoin.count() // Sums the total number of cql Rows
val orderQuantities = internalJoin.select("quantity") // Returns only the amount column as the right side of the join
val specifiedJoin = internalJoin.on(SomeColumns("cust_id")) // Joins on the cust_id column
val emptyJoin = internalJoin.toEmptyCassandraRDD // Makes an EmptyRDD
```
## Configuration Options for Adjusting Reads
The following options can be specified in the SparkConf object or as a jvm
-Doption to adjust the read parameters of a Cassandra table.
| Environment Variable | Controls | Default
|-------------------------------------------|------------------------------------------------------------|---------
| spark.cassandra.input.split.size_in_mb | approx amount of data to be fetched into a Spark partition | 64 MB
| spark.cassandra.input.fetch.size_in_rows | number of CQL rows fetched per driver request | 1000
| spark.cassandra.input.consistency.level | consistency level to use when reading | LOCAL_ONE
### Using Implicits for Configuration
In addition you are able to set these parameters on a per table basis by using `implicit vals`. This
allows a user to define a set of parameters in a separate object and import them into a block of
code rather than repeatedly passing the same [`ReadConf` object] (https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/ReadConf.scala#L7-L18).
```scala
object ReadConfigurationOne {
implicit val readConf = ReadConf(100,100)
}
import ReadConfigurationOne._
val rdd = sc.cassandraTable("write_test","collections")
rdd.readConf
//com.datastax.spark.connector.rdd.ReadConf = ReadConf(100,100,LOCAL_ONE,true)
```
Or you can define them implicitly in the same block as the `cassandraTable` call
```scala
implicit val anotherConf = ReadConf(200,200)
val rddWithADifferentConf = sc.cassandraTable("write_test","collections")
rddWithADifferentConf.readConf
//com.datastax.spark.connector.rdd.ReadConf = ReadConf(200,200,LOCAL_ONE,true)
```
[Next - Server-side data selection and filtering](3_selection.md)