| [[Clustering-Clustering]] |
| = Clustering |
| |
| *Since Camel 2.20* |
| |
| [WARNING] |
| ==== |
| *Experimental feature* |
| ==== |
| |
| In Camel 2.20 we have introduced the following cluster related SPI: |
| |
| - *Cluster Service* |
| + |
| A regular Camel service that manages cluster resources such as _views_ (see below) |
| |
| - *Cluster View* |
| + |
| Represent a view of the cluster with its own set of isolated resources. As today views provide supports for: |
| + |
| * Leader Election |
| * Topology events like members joining/leaving the clutser) |
| |
| |
| - *Cluster Member* |
| + |
| Represent a member of the clutser. |
| |
| |
| == Cluster SPI Set-Up |
| |
| A _Cluster Service_ is just like any other camel service so set it up you only need to register your implementations to the camel context: |
| |
| [source,java] |
| ---- |
| |
| MyClusterServiceImpl service = new MyClusterServiceImpl(); |
| |
| context.addService(service); |
| ---- |
| |
| The configuration of the _Cluster Service_ depends on the implementation you have choosed. |
| Out of the box camel provides the following implementations: |
| |
| [cols="1,1,2", options="header"] |
| |==== |
| |Type |Module | Class |
| |atomix |camel-atomix | org.apache.camel.component.atomix.cluster.AtomixClusterService |
| |consul |camel-consul | org.apache.camel.component.consul.cluster.ConsulClusterService |
| |file |camel-core | org.apache.camel.component.file.cluster.FileLockClusterService |
| |kubernetes |camel-kubernetes | org.apache.camel.component.kubernetes.cluster.KubernetesClusterService |
| |zookeeper |camel-zookeeper | org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService |
| |==== |
| |
| On Spring/Blueprint all the _Cluster Service_ instances are automatically added to the camel context. |
| |
| Configuration examples: |
| |
| - *Spring Boot* |
| + |
| [source,properties] |
| ---- |
| camel.component.file.cluster.service.enabled = true |
| camel.component.file.cluster.service.id = ${random.uuid} |
| camel.component.file.cluster.service.root = ${java.io.tmpdir} |
| ---- |
| |
| |
| - *Spring XML* |
| + |
| [source,xml] |
| ---- |
| <?xml version="1.0" encoding="UTF-8"?> |
| <beans xmlns="http://www.springframework.org/schema/beans" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
| xsi:schemaLocation=" |
| http://www.springframework.org/schema/beans |
| http://www.springframework.org/schema/beans/spring-beans.xsd |
| http://camel.apache.org/schema/spring |
| http://camel.apache.org/schema/spring/camel-spring.xsd"> |
| |
| |
| <bean id="zx" class="org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService"> |
| <property name="id" value="node-1"/> |
| <property name="basePath" value="/camel/cluster"/> |
| <property name="nodes" value="localhost:2181"/> |
| </bean> |
| |
| <camelContext xmlns="http://camel.apache.org/schema/spring" autoStartup="false"> |
| ... |
| </camelContext> |
| |
| </beans> |
| ---- |
| |
| == Cluster SPI Usage |
| |
| The _Cluster SPI_ is leveraged by the folowing new implementations: |
| |
| - *ClusteredRoutePolicy* |
| + |
| This is an implementation of a RoutePolicy that starts the routes it is associated to when the Cluster View it uses takes the leadership |
| + |
| [source,java] |
| ---- |
| context.addRoutes(new RouteBuilder { |
| @Override |
| public void configure() throws Exception { |
| // Create the route policy |
| RoutePolicy policy = ClusteredRoutePolicy.forNamespace("my-ns"); |
| |
| // bind the policy to one or more routes |
| from("timer:clustered?delay=1s&period=1s") |
| .routePolicy(policy) |
| .log("Route ${routeId} is running ..."); |
| } |
| }); |
| ---- |
| + |
| To apply the same policy to all the rooutes a dedicated _RoutePolicyFactory_ can be used: |
| + |
| [source,java] |
| ---- |
| // add the clustered route policy factory to context |
| context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); |
| |
| context.addRoutes(new RouteBuilder { |
| @Override |
| public void configure() throws Exception { |
| // bind the policy to one or more routes |
| from("timer:clustered?delay=1s&period=1s") |
| .log("Route ${routeId} is running ..."); |
| } |
| }); |
| ---- |
| |
| - *ClusteredRouteController* |
| + |
| This is an implementation of the _RouteController SPI_ that lets the camel context start then starts/stops the routes when the leadership is taken/lost. This is well integrated with spring-boot apps so assuming you have tour routes set-up like: |
| + |
| [source,java] |
| ---- |
| @Bean |
| public RouteBuilder routeBuilder() { |
| return new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("timer:heartbeat?period=10s") |
| .routeId("heartbeat") |
| .log("HeartBeat route (timer) ..."); |
| from("timer:clustered?period=5s") |
| .routeId("clustered") |
| .log("Clustered route (timer) ..."); |
| } |
| }; |
| } |
| ---- |
| + |
| You can then leverage spring-boot configuration to make them clustered: |
| + |
| [source,properties] |
| ---- |
| # enable the route controller |
| camel.clustered.controller.enabled = true |
| |
| # define the default namespace for routes |
| camel.clustered.controller.namespace = my-ns |
| |
| # exlude the route with id 'heartbeat' from the clustered ones |
| camel.clustered.controller.routes[heartbeat].clustered = false |
| ---- |
| |
| - *Master Component* |
| + |
| The master component is similar to a _ClusteredRoutePolicy_ but it works on consumer level so it ensure the only a single endpoint in a cluster is consuming resources at any point in time. Set it up is very easy and all you need is to prefix singleton endpoints according to the master component syntax: |
| + |
| [source] |
| ---- |
| master:namespace:delegateUri |
| ---- |
| + |
| A concrete example: |
| + |
| [source,java] |
| ---- |
| @Bean |
| public RouteBuilder routeBuilder() { |
| return new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("timer:heartbeat?period=10s") |
| .routeId("heartbeat") |
| .log("HeartBeat route (timer) ..."); |
| from("master:my-ns:timer:clustered?period=5s") |
| .routeId("clustered") |
| .log("Clustered route (timer) ..."); |
| } |
| }; |
| } |
| ---- |
| |