| = Clustering |
| |
| Camel offers the following cluster related SPI: |
| |
| - *Cluster Service* |
| + |
| A regular Camel service that manages cluster resources such as _views_ (see below) |
| |
| - *Cluster View* |
| + |
| Represent a view of the cluster with its own set of isolated resources. As today views provide supports for: |
| + |
| * Leader Election |
| * Topology events like members joining/leaving the cluster |
| |
| - *Cluster Member* |
| + |
| Represent a member of the cluster. |
| |
| |
| == Cluster SPI Setup |
| |
| A _Cluster Service_ is just like any other camel service so set it up you only need to register your implementations |
| to the `CamelContext`: |
| |
| [source,java] |
| ---- |
| MyClusterServiceImpl service = new MyClusterServiceImpl(); |
| context.addService(service); |
| ---- |
| |
| The configuration of the _Cluster Service_ depends on the implementation you have chose. |
| Out of the box camel provides the following implementations: |
| |
| [cols="1,1,2", options="header"] |
| |==== |
| |Type |Module | Class |
| |consul |camel-consul | org.apache.camel.component.consul.cluster.ConsulClusterService |
| |file |camel-file | org.apache.camel.component.file.cluster.FileLockClusterService |
| |infinispan |camel-infinispan | org.apache.camel.component.infinispan.cluster.InfinispanClusterService |
| |jgroups-raft |camel-jgroups-raft | org.apache.camel.component.jgroups.raft.cluster.JGroupsRaftClusterService |
| |kubernetes |camel-kubernetes | org.apache.camel.component.kubernetes.cluster.KubernetesClusterService |
| |zookeeper |camel-zookeeper | org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService |
| |==== |
| |
| Configuration options: |
| |
| *ConsulClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | sessionTtl | The Consul session TTL in seconds | 60 | int |
| | sessionLockDelay | The Consul session lock delay in seconds | 5 | int |
| | sessionRefreshInterval | The Consul session refresh interval in seconds | 5 | int |
| | rootPath | The Consul cluster root directory path | /camel | String |
| |=== |
| |
| *FileLockClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | acquireLockDelay | The time to wait before starting to try to acquire the cluster lock | 1 | long |
| | acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit |
| | acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long |
| | acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit |
| | heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable | 5 | int |
| | rootPath | The file cluster root directory path | | String |
| |=== |
| |
| *InfinispanClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | lifespan | The lifespan of the cache entry for the local cluster member registered to the inventory | 30 | long |
| | lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit |
| |=== |
| |
| *JGroupsRaftClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | jgroupsConfig | The path to the JGroups Raft configuration | raft.xml | String |
| | jgroupsClusterName | The name of the cluster | jgroupsraft-master | String |
| | raftHandle | The RaftHandle | | org.jgroups.raft.RaftHandle |
| | raftId | Unique Raft id | | String |
| |=== |
| |
| *KubernetesClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | leaseResourceType | Kubernetes resource type used to hold the leases | LeaseResourceType.Lease | LeaseResourceType |
| | kubernetesResourcesNamespace | Kubernetes namespace containing the pods and the ConfigMap used for locking | | String |
| | kubernetesResourceName | Name of the resource used for locking (or prefix, in case multiple ones are used) | leaders | String |
| | groupName | Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap | | String |
| | podName | Name of the current pod (defaults to host name) | | String |
| | clusterLabels | Labels used to identify the members of the cluster | empty map | Map |
| | jitterFactor | A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant | 1.2 | double |
| | leaseDurationMillis | The default duration of the lease for the [.line-through]#current# leader | 15000 | long |
| | renewDeadlineMillis | The deadline after which the leader must stop its services because it may have lost the leadership | 10000 | long |
| | retryPeriodMillis | The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor | 2000 | long |
| |=== |
| |
| *ZooKeeperClusterService* |
| |
| [options="header", cols="15,55,15,15"] |
| |=== |
| | Name | Description | Default | Type |
| | nodes | The Zookeeper server hosts (multiple servers can be separated by comma) | | List |
| | namespace | ZooKeeper namespace. If a namespace is set here, all paths will get pre-pended with the namespace | | String |
| | reconnectBaseSleepTime | Initial amount of time to wait between retries | | long |
| | reconnectBaseSleepTimeUnit | ReconnectBaseSleepTime TimeUnit. Default is | MILLISECONDS | TimeUnit |
| | reconnectMaxRetries | Max number of times to retry | 3 | int |
| | sessionTimeout | The session timeout in milliseconds | 60000 | long |
| | sessionTimeoutUnit | The session timeout TimeUnit | MILLISECONDS | TimeUnit |
| | connectionTimeout | The connection timeout in milliseconds | 15000 | long |
| | connectionTimeoutUnit | The connection timeout TimeUnit | TimeUnit.MILLISECONDS | TimeUnit |
| | authInfoList | List of AuthInfo objects with scheme and auth | | List |
| | maxCloseWait | Time to wait during close to join background threads | 1000 | long |
| | maxCloseWaitUnit | MaxCloseWait TimeUnit | MILLISECONDS | TimeUnit |
| | retryPolicy | The retry policy to use. | | RetryPolicy |
| | basePath | The base path to store in ZooKeeper | | String |
| |=== |
| |
| Configuration examples: |
| |
| - *Spring Boot* |
| + |
| [source,properties] |
| ---- |
| camel.cluster.file.enabled = true |
| camel.cluster.file.id = ${random.uuid} |
| camel.cluster.file.root = ${java.io.tmpdir} |
| ---- |
| |
| |
| - *Spring XML* |
| + |
| [source,xml] |
| ---- |
| <?xml version="1.0" encoding="UTF-8"?> |
| <beans xmlns="http://www.springframework.org/schema/beans" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
| xsi:schemaLocation=" |
| http://www.springframework.org/schema/beans |
| http://www.springframework.org/schema/beans/spring-beans.xsd |
| http://camel.apache.org/schema/spring |
| http://camel.apache.org/schema/spring/camel-spring.xsd"> |
| |
| <bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService"> |
| <property name="id" value="node-1"/> |
| <property name="basePath" value="/camel/cluster"/> |
| <property name="nodes" value="localhost:2181"/> |
| </bean> |
| |
| <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> |
| ... |
| </camelContext> |
| |
| </beans> |
| ---- |
| |
| == Cluster SPI Usage |
| |
| The _Cluster SPI_ is leveraged by the following new implementations: |
| |
| - *ClusteredRoutePolicy* |
| + |
| This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership |
| + |
| [source,java] |
| ---- |
| context.addRoutes(new RouteBuilder { |
| @Override |
| public void configure() throws Exception { |
| // Create the route policy |
| RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns"); |
| |
| // bind the policy to one or more routes |
| from("timer:clustered?delay=1000&period=1000") |
| .routePolicy(policy) |
| .log("Route ${routeId} is running ..."); |
| } |
| }); |
| ---- |
| + |
| To apply the same policy to all the routes a dedicated _RoutePolicyFactory_ can be used: |
| + |
| [source,java] |
| ---- |
| // add the clustered route policy factory to context |
| context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); |
| |
| context.addRoutes(new RouteBuilder { |
| @Override |
| public void configure() throws Exception { |
| // bind the policy to one or more routes |
| from("timer:clustered?delay=1000&period=1000") |
| .log("Route ${routeId} is running ..."); |
| } |
| }); |
| ---- |
| |
| - *ClusteredRouteController* |
| + |
| This is an implementation of the _RouteController SPI_ that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have your routes set-up like: |
| + |
| [source,java] |
| ---- |
| @Bean |
| public RouteBuilder routeBuilder() { |
| return new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("timer:heartbeat?period=10000") |
| .routeId("heartbeat") |
| .log("HeartBeat route (timer) ..."); |
| from("timer:clustered?period=5000") |
| .routeId("clustered") |
| .log("Clustered route (timer) ..."); |
| } |
| }; |
| } |
| ---- |
| + |
| You can then leverage Spring Boot configuration to make them clustered: |
| + |
| [source,properties] |
| ---- |
| # enable the route controller |
| camel.clustered.controller.enabled = true |
| |
| # define the default namespace for routes |
| camel.clustered.controller.namespace = my-ns |
| |
| # exclude the route with id 'heartbeat' from the clustered ones |
| camel.clustered.controller.routes[heartbeat].clustered = false |
| ---- |
| |
| - *Master Component* |
| + |
| The master component is similar to a _ClusteredRoutePolicy_ but it works on consumer level so it ensures the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax: |
| + |
| [source,text] |
| ---- |
| master:namespace:delegateUri |
| ---- |
| + |
| A concrete example: |
| + |
| [source,java] |
| ---- |
| @Bean |
| public RouteBuilder routeBuilder() { |
| return new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("timer:heartbeat?period=10000") |
| .routeId("heartbeat") |
| .log("HeartBeat route (timer) ..."); |
| |
| from("master:my-ns:timer:clustered?period=5000") |
| .routeId("clustered") |
| .log("Clustered route (timer) ..."); |
| } |
| }; |
| } |
| ---- |
| |