blob: 0b36caa4b86f031d48212458a24a10a867a50604 [file] [log] [blame]
# Camel-Kafka-connector Infinispan Source
This is an example for Camel-Kafka-connector Infinispan Source
## Standalone
### What is needed
- An Infinispan instance
### Setting up Infinispan
As first step you need to download the Infinispan Server with version 11.0.3.Final.
Infinispan 11.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.
If you have Infinispan unzipped in $INFINISPAN_HOME, you'll need to:
- Edit $INFINISPAN_HOME/server/config/infinispan.xml
and the file content should be
```
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:11.0 https://infinispan.org/schemas/infinispan-config-11.0.xsd
urn:infinispan:server:11.0 https://infinispan.org/schemas/infinispan-server-11.0.xsd"
xmlns="urn:infinispan:config:11.0"
xmlns:server="urn:infinispan:server:11.0">
<cache-container name="default" statistics="true">
<transport cluster="${infinispan.cluster.name}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
</cache-container>
<server xmlns="urn:infinispan:server:11.0">
<interfaces>
<interface name="public">
<inet-address value="${infinispan.bind.address:127.0.0.1}"/>
</interface>
</interfaces>
<socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
<socket-binding name="default" port="${infinispan.bind.port:11222}"/>
<socket-binding name="memcached" port="11221"/>
</socket-bindings>
<endpoints socket-binding="default">
<hotrod-connector name="hotrod"/>
<rest-connector name="rest"/>
</endpoints>
</server>
</infinispan>
```
Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.
You can now start your server
```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh
06:57:51,378 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 25.252-b09
06:57:51,395 INFO (main) [BOOT] JVM arguments = [-Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-11.0.3.Final]
06:57:51,396 INFO (main) [BOOT] PID = 9678
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/infinispan.xml
06:57:51,441 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-11.0.3.Final/server/conf/log4j2.xml
06:57:51,959 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
06:57:51,960 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
06:57:51,961 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
06:57:52,367 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
06:57:52,919 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Corona Extra' 11.0.3.Final
06:57:52,921 INFO (main) [org.infinispan.CONTAINER] ISPN000389: Loaded global state, version=11.0.3.Final timestamp=2020-09-30T21:04:46.511Z
06:57:53,046 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
06:57:55,138 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-35169: no members discovered after 2001 ms: creating cluster as coordinator
06:57:55,150 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-35169|0] (1) [ghost-35169]
06:57:55,156 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-35169, physical addresses are [192.168.1.15:7800]
06:57:55,810 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
```
So, you'll need to run
```
> cd $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
"distributed-cache" : {
"mode" : "SYNC",
"remote-timeout" : 17500,
"state-transfer" : {
"timeout" : 60000
},
"transaction" : {
"mode" : "NONE"
},
"locking" : {
"concurrency-level" : 1000,
"acquire-timeout" : 15000,
"striping" : false
},
"statistics" : true
}
}
```
It's important to add encoding to your cache configuration, otherwise consuming events won't work.
You'll need to:
- Edit $INFINISPAN_HOME/server/data/caches.xml
and add the encoding section in the configuration
```
<?xml version="1.0" ?>
<infinispan>
<cache-container>
<distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
<encoding>
<key media-type="text/plain; charset=UTF-8"/>
<value media-type="text/plain; charset=UTF-8"/>
</encoding>
<locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NONE"/>
<state-transfer timeout="60000"/>
</distributed-cache>
<distributed-cache mode="SYNC" remote-timeout="17500" name="cachecache" statistics="true">
<locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NONE"/>
<state-transfer timeout="60000"/>
</distributed-cache>
</cache-container></infinispan>
```
Now you should be able to run this example.
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
## Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
In this example we'll use `/home/oscerd/connectors/`
```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.5.0/camel-infinispan-kafka-connector-0.5.0-package.zip
> unzip camel-infinispan-kafka-connector-0.5.0-package.zip
```
Now it's time to setup the connectors
Open the Infinispan source configuration file
```
name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=mytopic
camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=cachecache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
camel.source.endpoint.sync=false
```
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
```
On a different terminal run your cli.sh from the Infinispan server
```
> bin/cli.sh
[disconnected]> connect
[ghost-43981@cluster//containers/default]> cache mycache
[ghost-43981@cluster//containers/default/caches/mycache]> put test test
```
In another terminal, using kafkacta, you should be able to see the headers.
```
> kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n'
Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
% Reached end of topic test290 [0] at offset 1
```