| # Camel-Kafka-connector Infinispan Sink |
| |
| This is an example for Camel-Kafka-connector Infinispan Sink |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - An Infinispan instance |
| |
| ### Setting up Infinispan |
| |
| As first step you need to download the Infinispan Server with version 11.0.3.Final. |
| |
| Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication. |
| |
| If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to: |
| |
| - Edit $INFINISPAN_HOME/server/config/infinispan.xml |
| |
| and the file content should be |
| |
| ``` |
| <infinispan |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
| xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd |
| urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd" |
| xmlns="urn:infinispan:config:11.0" |
| xmlns:server="urn:infinispan:server:11.0"> |
| |
| <cache-container name="default" statistics="true"> |
| <transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> |
| </cache-container> |
| |
| <server xmlns="urn:infinispan:server:11.0"> |
| <interfaces> |
| <interface name="public"> |
| <inet-address value="${infinispan.bind.address:127.0.0.1}"/> |
| </interface> |
| </interfaces> |
| |
| <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}"> |
| <socket-binding name="default" port="${infinispan.bind.port:11222}"/> |
| <socket-binding name="memcached" port="11221"/> |
| </socket-bindings> |
| |
| <endpoints socket-binding="default"> |
| <hotrod-connector name="hotrod"/> |
| <rest-connector name="rest"/> |
| </endpoints> |
| </server> |
| </infinispan> |
| ``` |
| |
| Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11. |
| |
| You can now start your server |
| |
| ``` |
| > $INFINISPAN_HOME/bin/server.sh |
| bin/server.sh |
| 06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09 |
| 06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final] |
| 06:57:51,396 INFO (main) [BOOT] PID = 9678 |
| 06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting |
| 06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml |
| 06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml |
| 06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory' |
| 06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory' |
| 06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory' |
| 06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory' |
| 06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal |
| 06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final |
| 06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z |
| 06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp |
| 06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator |
| 06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169] |
| 06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800] |
| 06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager |
| ``` |
| |
| So, you'll need to run |
| |
| ``` |
| > cd $INFINISPAN_HOME/bin/cli.sh |
| [disconnected]> connect |
| [ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache |
| [ghost-35169@cluster//containers/default]> describe caches/mycache |
| { |
| "distributed-cache" : { |
| "mode" : "SYNC", |
| "remote-timeout" : 17500, |
| "state-transfer" : { |
| "timeout" : 60000 |
| }, |
| "transaction" : { |
| "mode" : "NONE" |
| }, |
| "locking" : { |
| "concurrency-level" : 1000, |
| "acquire-timeout" : 15000, |
| "striping" : false |
| }, |
| "statistics" : true |
| } |
| } |
| ``` |
| Now we should be ready to work on this. |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ## Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| In this example we'll use `/home/oscerd/connectors/` |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.7.0/camel-infinispan-kafka-connector-0.7.0-package.zip |
| > unzip camel-infinispan-kafka-connector-0.7.0-package.zip |
| ``` |
| |
| Now it's time to setup the connectors |
| |
| Open the AWS2 SNS configuration file |
| |
| ``` |
| name=CamelInfinispanSinkConnector |
| connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSinkConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.sink.endpoint.hosts=localhost |
| camel.sink.path.cacheName=mycache |
| ``` |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSinkConnector.properties |
| ``` |
| |
| On a different terminal run the kafka-producer and send messages to your Kafka Broker. |
| |
| ``` |
| bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic |
| Kafka to Infinispan message 1 |
| Kafka to Infinispan message 2 |
| ``` |
| |
| You should see the stats of cache changing. You can check this by running |
| |
| ``` |
| > $INFINISPAN_HOME/bin/cli.sh |
| [disconnected]> connect |
| [ghost-35169@cluster//containers/default]> cache mycache |
| [ghost-35169@cluster//containers/default/caches/mycache]> stats |
| { |
| "total_number_of_entries" : 0, |
| "off_heap_memory_used" : 0, |
| "time_since_start" : 350, |
| "time_since_reset" : 350, |
| "stores" : 0, |
| "current_number_of_entries" : 0, |
| "data_memory_used" : 0, |
| "misses" : 0, |
| "remove_hits" : 0, |
| "remove_misses" : 0, |
| "evictions" : 0, |
| "average_read_time" : 0, |
| "average_read_time_nanos" : 0, |
| "average_write_time" : 0, |
| "average_write_time_nanos" : 0, |
| "average_remove_time" : 0, |
| "average_remove_time_nanos" : 0, |
| "required_minimum_number_of_nodes" : 1, |
| "current_number_of_entries_in_memory" : 0, |
| "retrievals" : 0, |
| "hits" : 0 |
| } |
| ``` |
| |