# Camel-Kafka-connector Infinispan Source With Authentication

This is an example for Camel-Kafka-connector Infinispan Source With Authentication

## Standalone

### What is needed

- An Infinispan instance

### Setting up Infinispan

As first step you need to download the Infinispan Server with version 12.0.0.Final.

You can now start your server

```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh 
18:07:32,435 INFO  (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 11.0.7+10
18:07:32,441 INFO  (main) [BOOT] JVM arguments = [-server, -Xlog:gc*:file=/home/oscerd/playground/infinispan-server-12.0.0.Final/server/log/gc.log:time,uptimemillis:filecount=5,filesize=3M, -Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-12.0.0.Final, -classpath, :/home/oscerd/playground/infinispan-server-12.0.0.Final/boot/infinispan-server-runtime-12.0.0.Final-loader.jar, org.infinispan.server.loader.Loader, org.infinispan.server.Bootstrap]
18:07:32,441 INFO  (main) [BOOT] PID = 21270
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080017: Server configuration: infinispan.xml
18:07:32,466 INFO  (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-12.0.0.Final/server/conf/log4j2.xml
18:07:32,823 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
18:07:32,823 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
18:07:32,825 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
18:07:32,825 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
18:07:33,172 INFO  (main) [org.infinispan.CONTAINER] ISPN000556: Starting user marshaller 'org.infinispan.commons.marshall.ImmutableProtoStreamMarshaller'
18:07:33,262 WARN  (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
18:07:33,545 INFO  (main) [org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl] ISPN028019: Registering protostream serialization context initializer: org.infinispan.query.core.stats.impl.PersistenceContextInitializerImpl
18:07:33,547 INFO  (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Lockdown' 12.0.0.Final
18:07:33,687 INFO  (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
18:07:35,796 INFO  (main) [org.jgroups.protocols.pbcast.GMS] ghost-43669: no members discovered after 2002 ms: creating cluster as coordinator
18:07:35,832 INFO  (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-43669|0] (1) [ghost-43669]
18:07:35,843 INFO  (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-43669, physical addresses are [10.36.115.182:7800]
18:07:35,878 INFO  (main) [org.infinispan.CONTAINER] ISPN000390: Persisted state, version=12.0.0.Final timestamp=2021-03-01T17:07:35.875687Z
18:07:36,081 INFO  (main) [org.jboss.threads] JBoss Threads version 2.3.3.Final
18:07:36,150 INFO  (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
18:07:36,447 INFO  (main) [org.infinispan.server.core.RequestTracer] OpenTracing integration is disabled
18:07:36,510 INFO  (ForkJoinPool.commonPool-worker-3) [org.infinispan.SERVER] ISPN080018: Started connector HotRod (internal)
18:07:36,601 INFO  (main) [org.infinispan.SERVER] ISPN080018: Started connector REST (internal)
18:07:36,762 INFO  (main) [org.infinispan.SERVER] ISPN080004: Connector SINGLE_PORT (default) listening on 127.0.0.1:11222
18:07:36,762 INFO  (main) [org.infinispan.SERVER] ISPN080034: Server 'ghost-43669' listening on http://127.0.0.1:11222
18:07:36,763 INFO  (main) [org.infinispan.SERVER] ISPN080001: Infinispan Server 12.0.0.Final started in 4296ms

```

So, you'll need to run

```
> $INFINISPAN_HOME/bin/cli.sh user create admin -p "password"
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
Username: admin
Password: ********
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
  "distributed-cache" : {
    "mode" : "SYNC",
    "remote-timeout" : 17500,
    "state-transfer" : {
      "timeout" : 60000
    },
    "locking" : {
      "concurrency-level" : 1000,
      "acquire-timeout" : 15000,
      "striping" : false
    },
    "statistics" : true
  }
}

```

It's important to add encoding to your cache configuration, otherwise consuming events won't work.
You'll need to:

- Edit $INFINISPAN_HOME/server/data/caches.xml

and add the encoding section in the configuration

```
<?xml version="1.0" ?>

<infinispan xmlns="urn:infinispan:config:12.0">
    <cache-container>
        <distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
            <encoding>
                <key media-type="text/plain; charset=UTF-8"/>
                <value media-type="text/plain; charset=UTF-8"/>
            </encoding>
            <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
            <state-transfer timeout="60000"/>
        </distributed-cache>
    </cache-container></infinispan>
```

Now you should be able to run this example.

### Running Kafka

```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```

## Setting up the needed bits and running the example

You'll need to setup the plugin.path property in your kafka

Open the `$KAFKA_HOME/config/connect-standalone.properties`

and set the `plugin.path` property to your choosen location

In this example we'll use `/home/oscerd/connectors/`

```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.10.0/camel-infinispan-kafka-connector-0.10.0-package.tar.gz
> untar.gz camel-infinispan-kafka-connector-0.10.0-package.tar.gz
```

Now it's time to setup the connectors

Open the Infinispan source configuration file

```
name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

topics=mytopic

camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=mycache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
camel.component.infinispan.secure=true
camel.source.endpoint.username=admin
camel.component.infinispan.password=password
camel.component.infinispan.saslMechanism=DIGEST-MD5
camel.component.infinispan.securityRealm=default
camel.component.infinispan.securityServerName=infinispan
```

Now you can run the example

```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
```

On a different terminal run your cli.sh from the Infinispan server

```
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
Username: admin
Password: ********
[ghost-43981@cluster//containers/default]> cache mycache
[ghost-43981@cluster//containers/default/caches/mycache]> put test test
```

In another terminal, using kafkacat, you should be able to see the headers.

```
> kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n'

Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
% Reached end of topic mytopic [0] at offset 1

```

