Kafka upgraded to version 2.3.1
diff --git a/parent/pom.xml b/parent/pom.xml
index d12adf0..9b0ac61 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -41,7 +41,7 @@
<jboss.snapshots.repo.url>https://repository.jboss.org/nexus/content/repositories/snapshots/</jboss.snapshots.repo.url>
<activemq.version>5.15.9</activemq.version>
- <kafka.version>2.1.1</kafka.version>
+ <kafka.version>2.3.1</kafka.version>
<junit.version>4.13</junit.version>
<camel.version>3.0.0</camel.version>
<jackson.version>2.10.2</jackson.version>
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafkaconnect/KafkaConnectRunner.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafkaconnect/KafkaConnectRunner.java
index 43c354a..2d552b8 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafkaconnect/KafkaConnectRunner.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/kafkaconnect/KafkaConnectRunner.java
@@ -29,6 +29,8 @@
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -125,6 +127,7 @@
StandaloneConfig config = new StandaloneConfig(standAloneProperties);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
RestServer rest = new RestServer(config);
@@ -132,13 +135,13 @@
According to the Kafka source code "... Worker runs a (dynamic) set of tasks
in a set of threads, doing the work of actually moving data to/from Kafka ..."
*/
- Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore());
+ Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
/*
From Kafka source code: " ... The herder interface tracks and manages workers
and connectors ..."
*/
- herder = new StandaloneHerder(worker, kafkaClusterId);
+ herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
connect = new Connect(herder, rest);
LOG.info("Finished initializing the worker");
}