---
title: Storm Cassandra Integration
layout: documentation
documentation: true
---

### Bolt API implementation for Apache Cassandra

This library provides core storm bolt on top of Apache Cassandra.
Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.


### Configuration
The following properties may be passed to storm configuration.

| **Property name**                            | **Description** | **Default**         |
| ---------------------------------------------| ----------------| --------------------|
| **cassandra.keyspace**                       | -               |                     |
| **cassandra.nodes**                          | -               | {"localhost"}       |
| **cassandra.username**                       | -               | -                   |
| **cassandra.password**                       | -               | -                   |
| **cassandra.port**                           | -               | 9092                |
| **cassandra.output.consistencyLevel**        | -               | ONE                 |
| **cassandra.batch.size.rows**                | -               | 100                 |
| **cassandra.retryPolicy**                    | -               | DefaultRetryPolicy  |
| **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)            |
| **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)          |

### CassandraWriterBolt

####Static import

```java
import static org.apache.storm.cassandra.DynamicStatementBuilder.*
```

#### Insert Query Builder
##### Insert query including only the specified tuple fields.

```java

    new CassandraWriterBolt(
        async(
            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
                .with(
                    fields("title", "year", "performer", "genre", "tracks")
                 )
            )
    );
```

##### Insert query including all tuple fields.

```java

    new CassandraWriterBolt(
        async(
            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
                .with( all() )
            )
    );
```

##### Insert multiple queries from one input tuple.

```java
    new CassandraWriterBolt(
        async(
            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
        )
    );
```

##### Insert query using QueryBuilder

```java
    new CassandraWriterBolt(
        async(
            simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
                .with(all()))
            )
    )
```

##### Insert query with static bound query

```java
    new CassandraWriterBolt(
         async(
            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
                .bind(all());
         )
    );
```

##### Insert query with static bound query using named setters and aliases

```java
    new CassandraWriterBolt(
         async(
            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
                .bind(
                    field("ti"),as("title"),
                    field("ye").as("year")),
                    field("pe").as("performer")),
                    field("ge").as("genre")),
                    field("tr").as("tracks"))
                ).byNamedSetters()
         )
    );
```

##### Insert query with bound statement load from storm configuration

```java
    new CassandraWriterBolt(
         boundQuery(named("insertIntoAlbum"))
            .bind(all());
```

##### Insert query with bound statement load from tuple field

```java
    new CassandraWriterBolt(
         boundQuery(namedByField("cql"))
            .bind(all());
```

##### Insert query with batch statement

```java
    // Logged
    new CassandraWriterBolt(loggedBatch(
            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
        )
    );
// UnLogged
    new CassandraWriterBolt(unLoggedBatch(
            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
        )
    );
```

### How to handle query execution results

The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle.

```java
public interface ExecutionResultHandler extends Serializable {
    void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);

    void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);

    void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);

    void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);

    void onQuerySuccess(OutputCollector collector, Tuple tuple);
}
```

By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) .

```java
    new CassandraWriterBolt(insertInto("album").values(with(all()).build())
            .withResultHandler(new MyCustomResultHandler());
```

### Declare Output fields

A CassandraBolt can declare output fields / stream output fields.
For instance, this may be used to remit a new tuple on error, or to chain queries.

```java
    new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
            .withResultHandler(new EmitOnDriverExceptionResultHandler());
            .withStreamOutputFields("stream_error", new Fields("message");

    public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
        @Override
        protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
            LOG.error("An error occurred while executing cassandra statement", e);
            collector.emit("stream_error", new Values(e.getMessage()));
            collector.ack(tuple);
        }
    }
```

### Murmur3FieldGrouping

[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java)  can be used to optimise cassandra writes.
The stream is partitioned among the bolt's tasks based on the specified row partition keys.

```java
CassandraWriterBolt bolt = new CassandraWriterBolt(
    insertInto("album")
        .values(
            with(fields("title", "year", "performer", "genre", "tracks")
            ).build());
builder.setBolt("BOLT_WRITER", bolt, 4)
        .customGrouping("spout", new Murmur3StreamGrouping("title"))
```

### Trident API support
storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 

```java
        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
                "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
        options.withCQLStatementTupleMapper(insertTemperatureValues);
        CassandraStateFactory insertValuesStateFactory =  new CassandraStateFactory(options);
        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
```

Below `state` API for `querying` data from Cassandra.

```java
        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
                 .bind(with(field("weather_station_id").as("id")));
        options.withCQLStatementTupleMapper(insertTemperatureValues);
        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
        CassandraStateFactory selectWeatherStationStateFactory =  new CassandraStateFactory(options);
        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
```
