| # Camel-Kafka-connector Infinispan Source With Authentication |
| |
| This is an example for Camel-Kafka-connector Infinispan Source With Authentication |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - An Infinispan instance |
| |
| ### Setting up Infinispan |
| |
| As first step you need to download the Infinispan Server with version 12.0.0.Final. |
| |
| You can now start your server |
| |
| ``` |
| > $INFINISPAN_HOME/bin/server.sh |
| bin/server.sh |
| 18:07:32,435 INFO (main) [BOOT] JVM OpenJDK 64-Bit Server VM AdoptOpenJDK 11.0.7+10 |
| 18:07:32,441 INFO (main) [BOOT] JVM arguments = [-server, -Xlog:gc*:file=/home/oscerd/playground/infinispan-server-12.0.0.Final/server/log/gc.log:time,uptimemillis:filecount=5,filesize=3M, -Xms64m, -Xmx512m, -XX:MetaspaceSize=64M, -Djava.net.preferIPv4Stack=true, -Djava.awt.headless=true, -Dvisualvm.display.name=infinispan-server, -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager, -Dinfinispan.server.home.path=/home/oscerd/playground/infinispan-server-12.0.0.Final, -classpath, :/home/oscerd/playground/infinispan-server-12.0.0.Final/boot/infinispan-server-runtime-12.0.0.Final-loader.jar, org.infinispan.server.loader.Loader, org.infinispan.server.Bootstrap] |
| 18:07:32,441 INFO (main) [BOOT] PID = 21270 |
| 18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080000: Infinispan Server starting |
| 18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080017: Server configuration: infinispan.xml |
| 18:07:32,466 INFO (main) [org.infinispan.SERVER] ISPN080032: Logging configuration: /home/oscerd/playground/infinispan-server-12.0.0.Final/server/conf/log4j2.xml |
| 18:07:32,823 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'query-dsl-filter-converter-factory' |
| 18:07:32,823 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'continuous-query-filter-converter-factory' |
| 18:07:32,825 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'iteration-filter-converter-factory' |
| 18:07:32,825 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'jdk.nashorn.api.scripting.NashornScriptEngineFactory' |
| 18:07:33,172 INFO (main) [org.infinispan.CONTAINER] ISPN000556: Starting user marshaller 'org.infinispan.commons.marshall.ImmutableProtoStreamMarshaller' |
| 18:07:33,262 WARN (main) [org.infinispan.PERSISTENCE] ISPN000554: jboss-marshalling is deprecated and planned for removal |
| 18:07:33,545 INFO (main) [org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl] ISPN028019: Registering protostream serialization context initializer: org.infinispan.query.core.stats.impl.PersistenceContextInitializerImpl |
| 18:07:33,547 INFO (main) [org.infinispan.CONTAINER] ISPN000128: Infinispan version: Infinispan 'Lockdown' 12.0.0.Final |
| 18:07:33,687 INFO (main) [org.infinispan.CLUSTER] ISPN000078: Starting JGroups channel cluster with stack tcp |
| 18:07:35,796 INFO (main) [org.jgroups.protocols.pbcast.GMS] ghost-43669: no members discovered after 2002 ms: creating cluster as coordinator |
| 18:07:35,832 INFO (main) [org.infinispan.CLUSTER] ISPN000094: Received new cluster view for channel cluster: [ghost-43669|0] (1) [ghost-43669] |
| 18:07:35,843 INFO (main) [org.infinispan.CLUSTER] ISPN000079: Channel cluster local address is ghost-43669, physical addresses are [10.36.115.182:7800] |
| 18:07:35,878 INFO (main) [org.infinispan.CONTAINER] ISPN000390: Persisted state, version=12.0.0.Final timestamp=2021-03-01T17:07:35.875687Z |
| 18:07:36,081 INFO (main) [org.jboss.threads] JBoss Threads version 2.3.3.Final |
| 18:07:36,150 INFO (main) [org.infinispan.CONTAINER] ISPN000104: Using EmbeddedTransactionManager |
| 18:07:36,447 INFO (main) [org.infinispan.server.core.RequestTracer] OpenTracing integration is disabled |
| 18:07:36,510 INFO (ForkJoinPool.commonPool-worker-3) [org.infinispan.SERVER] ISPN080018: Started connector HotRod (internal) |
| 18:07:36,601 INFO (main) [org.infinispan.SERVER] ISPN080018: Started connector REST (internal) |
| 18:07:36,762 INFO (main) [org.infinispan.SERVER] ISPN080004: Connector SINGLE_PORT (default) listening on 127.0.0.1:11222 |
| 18:07:36,762 INFO (main) [org.infinispan.SERVER] ISPN080034: Server 'ghost-43669' listening on http://127.0.0.1:11222 |
| 18:07:36,763 INFO (main) [org.infinispan.SERVER] ISPN080001: Infinispan Server 12.0.0.Final started in 4296ms |
| |
| ``` |
| |
| So, you'll need to run |
| |
| ``` |
| > $INFINISPAN_HOME/bin/cli.sh user create admin -p "password" |
| > $INFINISPAN_HOME/bin/cli.sh |
| [disconnected]> connect |
| Username: admin |
| Password: ******** |
| [ghost-35169@cluster//containers/default]> create cache --template=org.infinispan.DIST_SYNC mycache |
| [ghost-35169@cluster//containers/default]> describe caches/mycache |
| { |
| "distributed-cache" : { |
| "mode" : "SYNC", |
| "remote-timeout" : 17500, |
| "state-transfer" : { |
| "timeout" : 60000 |
| }, |
| "locking" : { |
| "concurrency-level" : 1000, |
| "acquire-timeout" : 15000, |
| "striping" : false |
| }, |
| "statistics" : true |
| } |
| } |
| |
| ``` |
| |
| It's important to add encoding to your cache configuration, otherwise consuming events won't work. |
| You'll need to: |
| |
| - Edit $INFINISPAN_HOME/server/data/caches.xml |
| |
| and add the encoding section in the configuration |
| |
| ``` |
| <?xml version="1.0" ?> |
| |
| <infinispan xmlns="urn:infinispan:config:12.0"> |
| <cache-container> |
| <distributed-cache mode="SYNC" remote-timeout="17500" name="mycache" statistics="true"> |
| <encoding> |
| <key media-type="text/plain; charset=UTF-8"/> |
| <value media-type="text/plain; charset=UTF-8"/> |
| </encoding> |
| <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/> |
| <state-transfer timeout="60000"/> |
| </distributed-cache> |
| </cache-container></infinispan> |
| ``` |
| |
| Now you should be able to run this example. |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ## Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| In this example we'll use `/home/oscerd/connectors/` |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-infinispan-kafka-connector/0.10.0/camel-infinispan-kafka-connector-0.10.0-package.tar.gz |
| > untar.gz camel-infinispan-kafka-connector-0.10.0-package.tar.gz |
| ``` |
| |
| Now it's time to setup the connectors |
| |
| Open the Infinispan source configuration file |
| |
| ``` |
| name=CamelInfinispanSourceConnector |
| connector.class=org.apache.camel.kafkaconnector.infinispan.CamelInfinispanSourceConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.converters.ByteArrayConverter |
| |
| topics=mytopic |
| |
| camel.source.endpoint.hosts=localhost |
| camel.source.path.cacheName=mycache |
| camel.source.endpoint.eventTypes=CLIENT_CACHE_ENTRY_CREATED |
| camel.component.infinispan.secure=true |
| camel.source.endpoint.username=admin |
| camel.component.infinispan.password=password |
| camel.component.infinispan.saslMechanism=DIGEST-MD5 |
| camel.component.infinispan.securityRealm=default |
| camel.component.infinispan.securityServerName=infinispan |
| ``` |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelInfinispanSourceConnector.properties |
| ``` |
| |
| On a different terminal run your cli.sh from the Infinispan server |
| |
| ``` |
| > $INFINISPAN_HOME/bin/cli.sh |
| [disconnected]> connect |
| Username: admin |
| Password: ******** |
| [ghost-43981@cluster//containers/default]> cache mycache |
| [ghost-43981@cluster//containers/default/caches/mycache]> put test test |
| ``` |
| |
| In another terminal, using kafkacat, you should be able to see the headers. |
| |
| ``` |
| > kafkacat -b localhost:9092 -t mytopic -C -f 'Headers: %h\n' |
| |
| Headers: CamelHeader.CamelInfinispanCacheName=mycache,CamelHeader.CamelInfinispanEventType=CLIENT_CACHE_ENTRY_CREATED,CamelHeader.CamelInfinispanIsPre=false,CamelHeader.CamelInfinispanKey=test,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000 |
| % Reached end of topic mytopic [0] at offset 1 |
| |
| ``` |
| |