blob: 8410159acf63d43317aaf1fcc030abcd158d0b5a [file] [log] [blame]
= Clustering
Camel offers the following cluster related SPI:
- *Cluster Service*
+
A regular Camel service that manages cluster resources such as _views_ (see below)
- *Cluster View*
+
Represent a view of the cluster with its own set of isolated resources. As today views provide supports for:
+
* Leader Election
* Topology events like members joining/leaving the cluster
- *Cluster Member*
+
Represent a member of the cluster.
== Cluster SPI Setup
A _Cluster Service_ is just like any other camel service so set it up you only need to register your implementations
to the `CamelContext`:
[source,java]
----
MyClusterServiceImpl service = new MyClusterServiceImpl();
context.addService(service);
----
The configuration of the _Cluster Service_ depends on the implementation you have chose.
Out of the box camel provides the following implementations:
[cols="1,1,2", options="header"]
|====
|Type |Module | Class
|consul |camel-consul | org.apache.camel.component.consul.cluster.ConsulClusterService
|file |camel-file | org.apache.camel.component.file.cluster.FileLockClusterService
|infinispan |camel-infinispan | org.apache.camel.component.infinispan.cluster.InfinispanClusterService
|jgroups-raft |camel-jgroups-raft | org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService
|kubernetes |camel-kubernetes | org.apache.camel.component.kubernetes.cluster.KubernetesClusterService
|zookeeper |camel-zookeeper | org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService
|====
Configuration options:
*ConsulClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| sessionTtl | The Consul session TTL in seconds | 60 | int
| sessionLockDelay | The Consul session lock delay in seconds | 5 | int
| sessionRefreshInterval | The Consul session refresh interval in seconds | 5 | int
| rootPath | The Consul cluster root directory path | /camel | String
|===
*FileLockClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| acquireLockDelay | The time to wait before starting to try to acquire the cluster lock | 1 | long
| acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit
| acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long
| acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit
| heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable | 5 | int
| rootPath | The file cluster root directory path | | String
|===
*InfinispanClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| lifespan | The lifespan of the cache entry for the local cluster member registered to the inventory | 30 | long
| lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit
|===
*JGroupsRaftClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| jgroupsConfig | The path to the JGroups Raft configuration | raft.xml | String
| jgroupsClusterName | The name of the cluster | jgroupsraft-master | String
| raftHandle | The RaftHandle | | org.jgroups.raft.RaftHandle
| raftId | Unique Raft id | | String
|===
*KubernetesClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| leaseResourceType | Kubernetes resource type used to hold the leases | LeaseResourceType.Lease | LeaseResourceType
| kubernetesResourcesNamespace | Kubernetes namespace containing the pods and the ConfigMap used for locking | | String
| kubernetesResourceName | Name of the resource used for locking (or prefix, in case multiple ones are used) | leaders | String
| groupName | Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap | | String
| podName | Name of the current pod (defaults to host name) | | String
| clusterLabels | Labels used to identify the members of the cluster | empty map | Map
| jitterFactor | A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant | 1.2 | double
| leaseDurationMillis | The default duration of the lease for the [.line-through]#current# leader | 15000 | long
| renewDeadlineMillis | The deadline after which the leader must stop its services because it may have lost the leadership | 10000 | long
| retryPeriodMillis | The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor | 2000 | long
|===
*ZooKeeperClusterService*
[options="header", cols="15,55,15,15"]
|===
| Name | Description | Default | Type
| nodes | The Zookeeper server hosts (multiple servers can be separated by comma) | | List
| namespace | ZooKeeper namespace. If a namespace is set here, all paths will get pre-pended with the namespace | | String
| reconnectBaseSleepTime | Initial amount of time to wait between retries | | long
| reconnectBaseSleepTimeUnit | ReconnectBaseSleepTime TimeUnit. Default is | MILLISECONDS | TimeUnit
| reconnectMaxRetries | Max number of times to retry | 3 | int
| sessionTimeout | The session timeout in milliseconds | 60000 | long
| sessionTimeoutUnit | The session timeout TimeUnit | MILLISECONDS | TimeUnit
| connectionTimeout | The connection timeout in milliseconds | 15000 | long
| connectionTimeoutUnit | The connection timeout TimeUnit | TimeUnit.MILLISECONDS | TimeUnit
| authInfoList | List of AuthInfo objects with scheme and auth | | List
| maxCloseWait | Time to wait during close to join background threads | 1000 | long
| maxCloseWaitUnit | MaxCloseWait TimeUnit | MILLISECONDS | TimeUnit
| retryPolicy | The retry policy to use. | | RetryPolicy
| basePath | The base path to store in ZooKeeper | | String
|===
Configuration examples:
- *Spring Boot*
+
[source,properties]
----
camel.cluster.file.enabled = true
camel.cluster.file.id = ${random.uuid}
camel.cluster.file.root = ${java.io.tmpdir}
----
- *Spring XML*
+
[source,xml]
----
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService">
<property name="id" value="node-1"/>
<property name="basePath" value="/camel/cluster"/>
<property name="nodes" value="localhost:2181"/>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false">
...
</camelContext>
</beans>
----
== Cluster SPI Usage
The _Cluster SPI_ is leveraged by the following new implementations:
- *ClusteredRoutePolicy*
+
This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership
+
[source,java]
----
context.addRoutes(new RouteBuilder {
@Override
public void configure() throws Exception {
// Create the route policy
RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns");
// bind the policy to one or more routes
from("timer:clustered?delay=1000&period=1000")
.routePolicy(policy)
.log("Route ${routeId} is running ...");
}
});
----
+
To apply the same policy to all the routes a dedicated _RoutePolicyFactory_ can be used:
+
[source,java]
----
// add the clustered route policy factory to context
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
context.addRoutes(new RouteBuilder {
@Override
public void configure() throws Exception {
// bind the policy to one or more routes
from("timer:clustered?delay=1000&period=1000")
.log("Route ${routeId} is running ...");
}
});
----
- *ClusteredRouteController*
+
This is an implementation of the _RouteController SPI_ that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have your routes set-up like:
+
[source,java]
----
@Bean
public RouteBuilder routeBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:heartbeat?period=10000")
.routeId("heartbeat")
.log("HeartBeat route (timer) ...");
from("timer:clustered?period=5000")
.routeId("clustered")
.log("Clustered route (timer) ...");
}
};
}
----
+
You can then leverage Spring Boot configuration to make them clustered:
+
[source,properties]
----
# enable the route controller
camel.clustered.controller.enabled = true
# define the default namespace for routes
camel.clustered.controller.namespace = my-ns
# exclude the route with id 'heartbeat' from the clustered ones
camel.clustered.controller.routes[heartbeat].clustered = false
----
- *Master Component*
+
The master component is similar to a _ClusteredRoutePolicy_ but it works on consumer level so it ensures the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax:
+
[source,text]
----
master:namespace:delegateUri
----
+
A concrete example:
+
[source,java]
----
@Bean
public RouteBuilder routeBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:heartbeat?period=10000")
.routeId("heartbeat")
.log("HeartBeat route (timer) ...");
from("master:my-ns:timer:clustered?period=5000")
.routeId("clustered")
.log("Clustered route (timer) ...");
}
};
}
----