blob: 443a888f5b127337f5aa7ab4a52808244a317bcb [file] [log] [blame]
# Camel-Kafka-connector Infinispan Source With Authentication
This is an example for Camel-Kafka-connector Infinispan Source With Authentication
## Standalone
### What is needed
- An Infinispan instance
### Setting up Infinispan
As first step you need to download the Infinispan Server with version 12.0.0.Final.
You can now start your server
```
> $INFINISPAN_HOME/bin/server.sh
bin/server.sh
18:07:32,435 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 11.0.7+10
18:07:32,441 INFO (main) [BOOT] JVM arguments = [-server, -Xlog:gc*:file=/home/oscerd/playground/infinispan-server-12.0.0.Final/server/log/gc.log:time,uptimemillis:filecount=5,filesize=3M, -Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-12.0.0.Final, -classpath, :/home/oscerd/playground/infinispan-server-12.0.0.Final/boot/infinispan-server-runtime-12.0.0.Final-loader.jar, org.infinispan.server.loader.Loader, org.infinispan.server.Bootstrap]
18:07:32,441 INFO (main) [BOOT] PID = 21270
18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting
18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: infinispan.xml
18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-12.0.0.Final/server/conf/log4j2.xml
18:07:32,823 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory'
18:07:32,823 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory'
18:07:32,825 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory'
18:07:32,825 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory'
18:07:33,172 INFO (main) [org.infinispan.CONTAINER] ISPN000556: Starting user marshaller 'org.infinispan.commons.marshall.ImmutableProtoStreamMarshaller'
18:07:33,262 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal
18:07:33,545 INFO (main) [org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl] ISPN028019: Registering protostream serialization context initializer: org.infinispan.query.core.stats.impl.PersistenceContextInitializerImpl
18:07:33,547 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Lockdown' 12.0.0.Final
18:07:33,687 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp
18:07:35,796 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-43669: no members discovered after 2002 ms: creating cluster as coordinator
18:07:35,832 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-43669|0] (1) [ghost-43669]
18:07:35,843 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-43669, physical addresses are [10.36.115.182:7800]
18:07:35,878 INFO (main) [org.infinispan.CONTAINER] ISPN000390: Persisted state, version=12.0.0.Final timestamp=2021-03-01T17:07:35.875687Z
18:07:36,081 INFO (main) [org.jboss.threads] JBoss Threads version 2.3.3.Final
18:07:36,150 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager
18:07:36,447 INFO (main) [org.infinispan.server.core.RequestTracer] OpenTracing integration is disabled
18:07:36,510 INFO (ForkJoinPool.commonPool-worker-3) [org.infinispan.SERVER] ISPN080018: Started connector HotRod (internal)
18:07:36,601 INFO (main) [org.infinispan.SERVER] ISPN080018: Started connector REST (internal)
18:07:36,762 INFO (main) [org.infinispan.SERVER] ISPN080004: Connector SINGLE_PORT (default) listening on 127.0.0.1:11222
18:07:36,762 INFO (main) [org.infinispan.SERVER] ISPN080034: Server 'ghost-43669' listening on http://127.0.0.1:11222
18:07:36,763 INFO (main) [org.infinispan.SERVER] ISPN080001: Infinispan Server 12.0.0.Final started in 4296ms
```
So, you'll need to run
```
> $INFINISPAN_HOME/bin/cli.sh user create admin -p "password"
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
Username: admin
Password: ********
[ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache
[ghost-35169@cluster//containers/default]> describe caches/mycache
{
"distributed-cache" : {
"mode" : "SYNC",
"remote-timeout" : 17500,
"state-transfer" : {
"timeout" : 60000
},
"locking" : {
"concurrency-level" : 1000,
"acquire-timeout" : 15000,
"striping" : false
},
"statistics" : true
}
}
```
It's important to add encoding to your cache configuration, otherwise consuming events won't work.
You'll need to:
- Edit $INFINISPAN_HOME/server/data/caches.xml
and add the encoding section in the configuration
```
<?xml version="1.0" ?>
<infinispan xmlns="urn:infinispan:config:12.0">
<cache-container>
<distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true">
<encoding>
<key media-type="text/plain; charset=UTF-8"/>
<value media-type="text/plain; charset=UTF-8"/>
</encoding>
<locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<state-transfer timeout="60000"/>
</distributed-cache>
</cache-container></infinispan>
```
Now you should be able to run this example.
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
## Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
In this example we'll use `/home/oscerd/connectors/`
```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.10.1/camel-infinispan-kafka-connector-0.10.1-package.tar.gz
> untar.gz camel-infinispan-kafka-connector-0.10.1-package.tar.gz
```
Now it's time to setup the connectors
Open the Infinispan source configuration file
```
name=CamelInfinispanSourceConnector
connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=mytopic
camel.source.endpoint.hosts=localhost
camel.source.path.cacheName=mycache
camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED
camel.component.infinispan.secure=true
camel.source.endpoint.username=admin
camel.component.infinispan.password=password
camel.component.infinispan.saslMechanism=DIGEST-MD5
camel.component.infinispan.securityRealm=default
camel.component.infinispan.securityServerName=infinispan
```
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties
```
On a different terminal run your cli.sh from the Infinispan server
```
> $INFINISPAN_HOME/bin/cli.sh
[disconnected]> connect
Username: admin
Password: ********
[ghost-43981@cluster//containers/default]> cache mycache
[ghost-43981@cluster//containers/default/caches/mycache]> put test test
```
In another terminal, using kafkacat, you should be able to see the headers.
```
> kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n'
Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000
% Reached end of topic mytopic [0] at offset 1
```