blob: 135f44ea05c744d8d73b29248aa96988b06d6880 [file] [log] [blame]
[[cql-component]]
= Cassandra CQL Component
:page-source: components/camel-cassandraql/src/main/docs/cql-component.adoc
*Available as of Camel version 2.15*
http://cassandra.apache.org[Apache Cassandra] is an open source NoSQL
database designed to handle large amounts on commodity hardware. Like
Amazon's DynamoDB, Cassandra has a peer-to-peer and master-less
architecture to avoid single point of failure and garanty high
availability. Like Google's BigTable, Cassandra data is structured using
column families which can be accessed through the Thrift RPC API or a
SQL-like API called CQL.
This component aims at integrating Cassandra 2.0+ using the CQL3 API
(not the Thrift API). It's based on
https://github.com/datastax/java-driver[Cassandra Java Driver] provided
by DataStax.
Maven users will need to add the following dependency to their
`pom.xml`:
*pom.xml*
[source,xml]
------------------------------------------------------------
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cassandraql</artifactId>
<version>x.y.z</version>
<!-- use the same version as your Camel core version -->
</dependency>
------------------------------------------------------------
== URI format
The endpoint can initiate the Cassandra connection or use an existing
one.
[cols="<,<",options="header",]
|======================================================================
|URI |Description
|`cql:localhost/keyspace` |Single host, default port, usual for testing
|`cql:host1,host2/keyspace` |Multi host, default port
|`cql:host1,host2:9042/keyspace` |Multi host, custom port
|`cql:host1,host2` |Default port and keyspace
|`cql:bean:sessionRef` |Provided Session reference
|`cql:bean:clusterRef/keyspace` |Provided Cluster reference
|======================================================================
To fine tune the Cassandra connection (SSL options, pooling options,
load balancing policy, retry policy, reconnection policy...), create
your own Cluster instance and give it to the Camel endpoint.
== Cassandra Options
// component options: START
The Cassandra CQL component supports 1 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
|===
// component options: END
// endpoint options: START
The Cassandra CQL endpoint is configured using URI syntax:
----
cql:beanRef:hosts:port/keyspace
----
with the following path and query parameters:
=== Path Parameters (4 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *beanRef* | beanRef is defined using bean:id | | String
| *hosts* | Hostname(s) cassansdra server(s). Multiple hosts can be separated by comma. | | String
| *port* | Port number of cassansdra server(s) | | Integer
| *keyspace* | Keyspace to use | | String
|===
=== Query Parameters (32 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *cluster* (common) | To use the Cluster instance (you would normally not use this option) | | Cluster
| *clusterName* (common) | Cluster name | | String
| *consistencyLevel* (common) | Consistency level to use | | ConsistencyLevel
| *cql* (common) | CQL query to perform. Can be overridden with the message header with key CamelCqlQuery. | | String
| *loadBalancingPolicy* (common) | To use a specific LoadBalancingPolicy | | String
| *password* (common) | Password for session authentication | | String
| *prepareStatements* (common) | Whether to use PreparedStatements or regular Statements | true | boolean
| *resultSetConversionStrategy* (common) | To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100... | | ResultSetConversionStrategy
| *session* (common) | To use the Session instance (you would normally not use this option) | | Session
| *username* (common) | Username for session authentication | | String
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
| *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern
| *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. | | PollingConsumerPollStrategy
| *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | false | boolean
| *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
| *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int
| *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int
| *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int
| *delay* (scheduler) | Milliseconds before the next poll. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour). | 500 | long
| *greedy* (scheduler) | If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. | false | boolean
| *initialDelay* (scheduler) | Milliseconds before the first poll starts. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour). | 1000 | long
| *repeatCount* (scheduler) | Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever. | 0 | long
| *runLoggingLevel* (scheduler) | The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. | TRACE | LoggingLevel
| *scheduledExecutorService* (scheduler) | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. | | ScheduledExecutorService
| *scheduler* (scheduler) | To use a cron scheduler from either camel-spring or camel-quartz component | none | String
| *schedulerProperties* (scheduler) | To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. | | Map
| *startScheduler* (scheduler) | Whether the scheduler should be auto started. | true | boolean
| *timeUnit* (scheduler) | Time unit for initialDelay and delay options. | MILLISECONDS | TimeUnit
| *useFixedDelay* (scheduler) | Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. | true | boolean
|===
// endpoint options: END
// spring-boot-auto-configure options: START
== Spring Boot Auto-Configuration
When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
[source,xml]
----
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cassandraql-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
----
The component supports 2 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *camel.component.cql.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
| *camel.component.cql.enabled* | Enable cql component | true | Boolean
|===
// spring-boot-auto-configure options: END
== Messages
=== Incoming Message
The Camel Cassandra endpoint expects a bunch of simple objects (`Object`
or `Object[]` or `Collection<Object>`) which will be bound to the CQL
statement as query parameters. If message body is null or empty, then
CQL query will be executed without binding parameters.
Headers:
* `CamelCqlQuery` (optional, `String` or `RegularStatement`): CQL query
either as a plain String or built using the `QueryBuilder`.
=== Outgoing Message
The Camel Cassandra endpoint produces one or many a Cassandra Row
objects depending on the `resultSetConversionStrategy`:
* `List<Row>` if `resultSetConversionStrategy` is `ALL` or
`LIMIT_[0-9]+`
* Single` Row` if `resultSetConversionStrategy` is `ONE`
* Anything else, if `resultSetConversionStrategy` is a custom
implementation of the `ResultSetConversionStrategy`
== Repositories
Cassandra can be used to store message keys or messages for the
idempotent and aggregation EIP.
Cassandra might not be the best tool for queuing use cases yet, read
http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets[Cassandra
anti-patterns queues and queue like datasets]. It's advised to use
LeveledCompaction and a small GC grace setting for these tables to allow
tombstoned rows to be removed quickly.
== Idempotent repository
The `NamedCassandraIdempotentRepository` stores messages keys in a
Cassandra table like this:
*CAMEL_IDEMPOTENT.cql*
[source,sql]
---------------------------------------------------------
CREATE TABLE CAMEL_IDEMPOTENT (
NAME varchar, -- Repository name
KEY varchar, -- Message key
PRIMARY KEY (NAME, KEY)
) WITH compaction = {'class':'LeveledCompactionStrategy'}
AND gc_grace_seconds = 86400;
---------------------------------------------------------
This repository implementation uses lightweight transactions (also known
as Compare and Set) and requires Cassandra 2.0.7+.
Alternatively, the `CassandraIdempotentRepository` does not have a
`NAME` column and can be extended to use a different data model.
[width="100%",cols="<34%,<33%,<33%",options="header",]
|=======================================================================
|Option |Default |Description
|`table` |`CAMEL_IDEMPOTENT` |Table name
|`pkColumns` |`NAME`,` KEY` |Primary key columns
|`name` | | Repository name, value used for `NAME` column
|`ttl` | | Key time to live
|`writeConsistencyLevel` | | Consistency level used to insert/delete key: `ANY`, `ONE`, `TWO`,
`QUORUM`, `LOCAL_QUORUM`…
|`readConsistencyLevel` | | Consistency level used to read/check key: `ONE`, `TWO`, `QUORUM`,
`LOCAL_QUORUM`…
|=======================================================================
== Aggregation repository
The `NamedCassandraAggregationRepository` stores exchanges by
correlation key in a Cassandra table like this:
*CAMEL_AGGREGATION.cql*
[source,sql]
---------------------------------------------------------
CREATE TABLE CAMEL_AGGREGATION (
NAME varchar, -- Repository name
KEY varchar, -- Correlation id
EXCHANGE_ID varchar, -- Exchange id
EXCHANGE blob, -- Serialized exchange
PRIMARY KEY (NAME, KEY)
) WITH compaction = {'class':'LeveledCompactionStrategy'}
AND gc_grace_seconds = 86400;
---------------------------------------------------------
Alternatively, the `CassandraAggregationRepository` does not have a
`NAME` column and can be extended to use a different data model.
[width="100%",cols="<34%,<33%,<33%",options="header",]
|=======================================================================
|Option |Default |Description
|`table` |`CAMEL_AGGREGATION` |Table name
|`pkColumns` |`NAME`,`KEY` |Primary key columns
|`exchangeIdColumn` |`EXCHANGE_ID` |Exchange Id column
|`exchangeColumn` |`EXCHANGE` |Exchange content column
|`name` | | Repository name, value used for `NAME` column
|`ttl` | | Exchange time to live
|`writeConsistencyLevel` | | Consistency level used to insert/delete exchange: `ANY`, `ONE`, `TWO`,
`QUORUM`, `LOCAL_QUORUM`…
|`readConsistencyLevel` | | Consistency level used to read/check exchange: `ONE`, `TWO`, `QUORUM`,
`LOCAL_QUORUM`…
|=======================================================================
== Examples
To insert something on a table you can use the following code:
[source,java]
---------------------------------------------------------
String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
from("direct:input")
.to("cql://localhost/camel_ks?cql=" + CQL);
---------------------------------------------------------
At this point you should be able to insert data by using a list as body
[source,java]
---------------------------------------------------------
Arrays.asList("davsclaus", "Claus", "Ibsen")
---------------------------------------------------------
The same approach can be used for updating or querying the table.