# Camel-Kafka-connector Infinispan Source

This is an example for Camel-Kafka-connector Infinispan Source 

## Standalone

### What is needed

- An Infinispan instance

### Setting up Infinispan

As first step you need to download the Infinispan Server with version 12.0.0.Final.

Infinispan 12.x is secured by default. For the purpose of this example we'll remove the security check, so we won't need any authentication.

If you have Infinispan untar.gzped in $INFINISPAN_HOME, you'll need to:

- Edit $INFINISPAN_HOME/server/conf/infinispan.xml

and the file content should be

```
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:12.0 https://infinispan.org/schemas/infinispan-config-12.0.xsd
                            urn:infinispan:server:12.0 https://infinispan.org/schemas/infinispan-server-12.0.xsd"
        xmlns="urn:infinispan:config:12.0"
        xmlns:server="urn:infinispan:server:12.0">

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
   </cache-container>

   <server xmlns="urn:infinispan:server:12.0">
      <interfaces>
         <interface name="public">
            <inet-address value="${infinispan.bind.address:127.0.0.1}"/>
         </interface>
      </interfaces>

      <socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
         <socket-binding name="default" port="${infinispan.bind.port:11222}"/>
         <socket-binding name="memcached" port="11221"/>
      </socket-bindings>

      <endpoints socket-binding="default">
         <hotrod-connector/>
         <rest-connector/>
      </endpoints>
   </server>
</infinispan>
```

Now we need to create a cache, since the default cache is not available anymore out of the box with Infinispan 11.

You can now start your server

```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh 
18:07:32,435 INFO  (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 11.0.7+10
18:07:32,441 INFO  (main) [BOOT] JVM arguments = [-server, -Xlog:gc*:file=/home/oscerd/playground/infinispan-server-12.0.0.Final/server/log/gc.log:time,uptimemillis:filecount=5,filesize=3M, -Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-12.0.0.Final, -classpath, :/home/oscerd/playground/infinispan-server-12.0.0.Final/boot/infinispan-server-runtime-12.0.0.Final-loader.jar, org.infinispan.server.loader.Loader, org.infinispan.server.Bootstrap]
18:07:32,441 INFO  (main) [BOOT] PID = 21270
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080017: Server configuration: infinispan.xml
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-12.0.0.Final/server/conf/log4j2.xml
18:07:32,823 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
18:07:32,823 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
18:07:32,825 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
18:07:32,825 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
18:07:33,172 INFO  (main) [org.infinispan.CONTAINER] ISPN000556: Starting user marshaller 'org.infinispan.commons.marshall.ImmutableProtoStreamMarshaller'
18:07:33,262 WARN  (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
18:07:33,545 INFO  (main) [org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl] ISPN028019: Registering protostream serialization context initializer: org.infinispan.query.core.stats.impl.PersistenceContextInitializerImpl
18:07:33,547 INFO  (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Lockdown' 12.0.0.Final
18:07:33,687 INFO  (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
18:07:35,796 INFO  (main) [org.jgroups.protocols.pbcast.GMS] ghost-43669: no members discovered after 2002 ms: creating cluster as coordinator
18:07:35,832 INFO  (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-43669|0] (1) [ghost-43669]
18:07:35,843 INFO  (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-43669, physical addresses are [10.36.115.182:7800]
18:07:35,878 INFO  (main) [org.infinispan.CONTAINER] ISPN000390: Persisted state, version=12.0.0.Final timestamp=2021-03-01T17:07:35.875687Z
18:07:36,081 INFO  (main) [org.jboss.threads] JBoss Threads version 2.3.3.Final
18:07:36,150 INFO  (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
18:07:36,447 INFO  (main) [org.infinispan.server.core.RequestTracer] OpenTracing integration is disabled
18:07:36,510 INFO  (ForkJoinPool.commonPool-worker-3) [org.infinispan.SERVER] ISPN080018: Started connector HotRod (internal)
18:07:36,601 INFO  (main) [org.infinispan.SERVER] ISPN080018: Started connector REST (internal)
18:07:36,762 INFO  (main) [org.infinispan.SERVER] ISPN080004: Connector SINGLE_PORT (default) listening on 127.0.0.1:11222
18:07:36,762 INFO  (main) [org.infinispan.SERVER] ISPN080034: Server 'ghost-43669' listening on http://127.0.0.1:11222
18:07:36,763 INFO  (main) [org.infinispan.SERVER] ISPN080001: Infinispan Server 12.0.0.Final started in 4296ms

```

So, you'll need to run

```
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
  "distributed-cache" : {
    "mode" : "SYNC",
    "remote-timeout" : 17500,
    "state-transfer" : {
      "timeout" : 60000
    },
    "locking" : {
      "concurrency-level" : 1000,
      "acquire-timeout" : 15000,
      "striping" : false
    },
    "statistics" : true
  }
}

```

It's important to add encoding to your cache configuration, otherwise consuming events won't work.
You'll need to:

- Edit $INFINISPAN_HOME/server/data/caches.xml

and add the encoding section in the configuration

```
<infinispan xmlns="urn:infinispan:config:12.0">
    <cache-container>
        <distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
            <encoding>
                <key media-type="text/plain; charset=UTF-8"/>
                <value media-type="text/plain; charset=UTF-8"/>
            </encoding>
            <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
            <state-transfer timeout="60000"/>
        </distributed-cache>
    </cache-container></infinispan>
```

Now you should be able to run this example.

### Running Kafka

```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```

## Setting up the needed bits and running the example

You'll need to setup the plugin.path property in your kafka

Open the `$KAFKA_HOME/config/connect-standalone.properties`

and set the `plugin.path` property to your choosen location

In this example we'll use `/home/oscerd/connectors/`

```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.11.5/camel-infinispan-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-infinispan-kafka-connector-0.11.5-package.tar.gz
```

Now it's time to setup the connectors

Open the Infinispan source configuration file

```
name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

topics=mytopic

camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=mycache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
```

Now you can run the example

```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
```

On a different terminal run your cli.sh from the Infinispan server

```
> $INFINISPAN_HOME/bin/cli.sh 
[disconnected]> connect
[ghost-43981@cluster//containers/default]> cache mycache
[ghost-43981@cluster//containers/default/caches/mycache]> put test test
```

In another terminal, using kafkacat, you should be able to see the headers.

```
> kafkacat -b localhost:9092 -t mytopic -C   -f 'Headers: %h\n'

Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
% Reached end of topic mytopic [0] at offset 1

```

