Added JsonPdxConverter (#2)
- Allows PdxInstance objects to be converted to JSON bytes when sourced
from a Geode region into a Kafka topic
- Allows JSON bytes to be converted to PdxInstance objects when sinked
into a Geode region from a Kafka topic
- Added unit and DUnit tests for JsonPdxConverter
- Added functionality to the test framework to specify custom key
converter, custom value converter and configuration properties for each
- Added TestObject class to allow validation of
serialization/deserialization
Authored-by: Donal Evans <doevans@pivotal.io>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 452c59f..270733f 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.storage.StringConverter;
public class GeodeConnectorConfig extends AbstractConfig {
@@ -42,6 +43,9 @@
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD = "security-password";
+ public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
+ public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
+
protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 8833fd9..99ef11c 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -63,6 +63,7 @@
String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();
+ ccf.setPdxReadSerialized(true);
if (usesSecurity) {
if (securityUserName != null && securityPassword != null) {
ccf.set(SECURITY_USER, securityUserName);
diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
new file mode 100644
index 0000000..e630b4e
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+import org.apache.geode.pdx.JSONFormatter;
+import org.apache.geode.pdx.PdxInstance;
+
+public class JsonPdxConverter implements Converter {
+ public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
+ // Default value = false
+ public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
+ private Map<String, String> internalConfig = new HashMap<>();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ if (configs != null) {
+ configs.forEach((key, value) -> internalConfig.put(key, (String) value));
+ }
+ }
+
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ PdxInstance pdxInstanceValue = (PdxInstance) value;
+ byte[] jsonBytes = getJsonBytes(pdxInstanceValue);
+ if (!shouldAddTypeAnnotation()) {
+ return jsonBytes;
+ }
+ String jsonString = new String(jsonBytes);
+ if (!jsonString.contains(JSON_TYPE_ANNOTATION)) {
+ jsonString = jsonString.replaceFirst("\\{",
+ "{" + JSON_TYPE_ANNOTATION + " : \"" + pdxInstanceValue.getClassName() + "\",");
+ }
+ return jsonString.getBytes();
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ return new SchemaAndValue(null, JSONFormatter.fromJSON(value));
+ }
+
+ byte[] getJsonBytes(PdxInstance pdxInstanceValue) {
+ return JSONFormatter.toJSONByteArray(pdxInstanceValue);
+ }
+
+ boolean shouldAddTypeAnnotation() {
+ return Boolean.parseBoolean(internalConfig.get(ADD_TYPE_ANNOTATION_TO_JSON));
+ }
+
+ public Map<String, String> getInternalConfig() {
+ return internalConfig;
+ }
+}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index ecee4bf..668681d 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -101,7 +101,6 @@
@Test
public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {
-
MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 108d86a..f00965c 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -54,7 +54,6 @@
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
-
@Rule
public TestName testName = new TestName();
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
new file mode 100644
index 0000000..d62ccc1
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_KEY_CONVERTER;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.utils.Time;
+import org.assertj.core.util.Arrays;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.TestObject;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class JsonPdxConverterDUnitTest {
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
+
+ @Rule
+ public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup()
+ throws Exception {
+ startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+ false,
+ 200000,
+ 15000,
+ 10,
+ Time.SYSTEM,
+ "myGroup",
+ "myMetricType",
+ null);
+
+ zkClient.close();
+ }
+
+ @Test
+ public void jsonPdxConverterCanConvertPdxInstanceToJsonAndBackWhenDataMovesFromRegionToTopicToRegion()
+ throws Exception {
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ int locatorPort = locator.getPort();
+ // .withPDXReadSerialized()
+ MemberVM server1 = clusterStartupRule.startServerVM(1, server -> server
+ .withConnectionToLocator(locatorPort)
+ // .withPDXReadSerialized()
+ );
+ ClientVM client1 = clusterStartupRule.startClientVM(2, client -> client
+ .withLocatorConnection(locatorPort)
+ .withCacheSetup(
+ cf -> cf.setPdxSerializer(new ReflectionBasedAutoSerializer("org.apache.geode.kafka.*"))
+ .setPdxReadSerialized(true)));
+
+ // Set unique names for all the different components
+ String sourceRegionName = "SOURCE_REGION";
+ String sinkRegionName = "SINK_REGION";
+ // We only need one topic for this test, which we will both source to and sink from
+ String topicName = "TEST_TOPIC";
+
+ /*
+ * Start the Apache Geode cluster and create the source and sink regions.
+ * Create a Apache Geode client which can insert data into the source and get data from the sink
+ */
+ server1.invoke(() -> {
+ ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sourceRegionName);
+ ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .create(sinkRegionName);
+ });
+ client1.invoke(() -> {
+ ClientCache clientCache = ClusterStartupRule.getClientCache();
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(sourceRegionName);
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(sinkRegionName);
+ });
+
+ /*
+ * Start the Kafka Cluster, workers and the topic to which the Apache Geode will connect as
+ * a source
+ */
+ WorkerAndHerderCluster workerAndHerderCluster = null;
+ KafkaLocalCluster kafkaLocalCluster = null;
+ try {
+ kafkaLocalCluster = startKafka(
+ getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+ createTopic(topicName, 1, 1);
+ // Create workers and herder cluster
+ workerAndHerderCluster = startWorkerAndHerderCluster(1, sourceRegionName, sinkRegionName,
+ topicName, topicName, temporaryFolderForOffset.getRoot().getAbsolutePath(),
+ "localhost[" + locatorPort + "]", DEFAULT_KEY_CONVERTER, "",
+ JsonPdxConverter.class.getCanonicalName(),
+ JsonPdxConverter.ADD_TYPE_ANNOTATION_TO_JSON + "=true");
+
+ // Insert data into the Apache Geode source and retrieve the data from the Apache Geode sink
+ // from the client
+ client1.invoke(() -> {
+ // Create an object that will be serialized into a PdxInstance
+ String name = "testName";
+ int age = 42;
+ double number = 3.141;
+ List<String> words = new ArrayList<>();
+ words.add("words1");
+ words.add("words2");
+ words.add("words3");
+ TestObject originalObject = new TestObject(name, age, number, words);
+
+ ClientCache clientCache = ClusterStartupRule.getClientCache();
+
+ // Create a PdxInstance from the test object
+ PdxInstanceFactory instanceFactory =
+ clientCache.createPdxInstanceFactory(originalObject.getClass().getName());
+ Arrays.asList(originalObject.getClass().getFields())
+ .stream()
+ .map(field -> (Field) field)
+ .forEach(field -> {
+ try {
+ Object value = field.get(originalObject);
+ Class type = field.getType();
+ instanceFactory.writeField(field.getName(), value, type);
+ } catch (IllegalAccessException ignore) {
+ }
+ });
+ PdxInstance putInstance = instanceFactory.create();
+
+ // Put the PdxInstance into the source region
+ String key = "key1";
+ clientCache.getRegion(sourceRegionName).put(key, putInstance);
+
+ // Assert that the data that arrives in the sink region is the same as the data that was put
+ // into the source region
+ Region<Object, Object> sinkRegion = clientCache.getRegion(sinkRegionName);
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(1, sinkRegion.sizeOnServer()));
+ PdxInstance getInstance = (PdxInstance) sinkRegion.get(key);
+
+ assertEquals(originalObject, getInstance.getObject());
+ });
+
+ } finally {
+ // Clean up by deleting the topic
+ deleteTopic(topicName);
+ if (workerAndHerderCluster != null) {
+ workerAndHerderCluster.stop();
+ }
+ if (kafkaLocalCluster != null) {
+ kafkaLocalCluster.stop();
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java
new file mode 100644
index 0000000..6ffcdd4
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import static org.apache.geode.kafka.converter.JsonPdxConverter.JSON_TYPE_ANNOTATION;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.pdx.PdxInstance;
+
+public class JsonPdxConverterTest {
+ private JsonPdxConverter spyConverter;
+ PdxInstance mockPdxInstance;
+ private final String className = "className";
+ private final String jsonWithoutTypeAnnotation = "{\"name\" : \"test\"}";
+ private final String jsonWithTypeAnnotation =
+ "{" + JSON_TYPE_ANNOTATION + " : \"" + className + "\",\"name\" : \"test\"}";
+
+ @Before
+ public void setUp() {
+ spyConverter = spy(new JsonPdxConverter());
+ mockPdxInstance = mock(PdxInstance.class);
+ when(mockPdxInstance.getClassName()).thenReturn(className);
+ }
+
+ @Test
+ public void configurePopulatesInternalConfiguration() {
+ Map<String, String> map = new HashMap<>();
+ IntStream.range(0, 10).forEach(i -> map.put("key" + i, "value" + i));
+ spyConverter.configure(map, false);
+
+ assertThat(spyConverter.getInternalConfig(), is(map));
+ }
+
+ @Test
+ public void fromConnectDataAddsTypeAnnotationWhenConfigurationIsSetAndJsonDoesNotAlreadyContainAnnotation() {
+ byte[] jsonBytes = jsonWithoutTypeAnnotation.getBytes();
+
+ doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+ doReturn(true).when(spyConverter).shouldAddTypeAnnotation();
+
+ byte[] expectedOutputJsonBytes = jsonWithTypeAnnotation.getBytes();
+
+ assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance),
+ is(expectedOutputJsonBytes));
+ }
+
+ @Test
+ public void fromConnectDataDoesNotAddTypeAnnotationWhenConfigurationIsSetAndJsonAlreadyContainsAnnotation() {
+ byte[] jsonBytes = jsonWithTypeAnnotation.getBytes();
+
+ doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+ doReturn(true).when(spyConverter).shouldAddTypeAnnotation();
+
+ assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance), is(jsonBytes));
+ }
+
+ @Test
+ public void fromConnectDataDoesNotAddTypeAnnotationWhenConfigurationIsNotSet() {
+ byte[] jsonBytes = jsonWithoutTypeAnnotation.getBytes();
+
+ doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+ doReturn(false).when(spyConverter).shouldAddTypeAnnotation();
+
+ assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance), is(jsonBytes));
+ }
+}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
index f9c1e2b..198e129 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.kafka.utilities;
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_KEY_CONVERTER;
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_VALUE_CONVERTER;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
@@ -136,10 +138,26 @@
String sinkTopic,
String offsetPath,
String locatorString) {
+ return startWorkerAndHerderCluster(maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic,
+ offsetPath, locatorString, DEFAULT_KEY_CONVERTER, "", DEFAULT_VALUE_CONVERTER, "");
+ }
+
+ public static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks,
+ String sourceRegion,
+ String sinkRegion,
+ String sourceTopic,
+ String sinkTopic,
+ String offsetPath,
+ String locatorString,
+ String keyConverter,
+ String keyConverterArgs,
+ String valueConverter,
+ String valueConverterArgs) {
WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster();
try {
workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, sinkRegion, sourceTopic,
- sinkTopic, offsetPath, locatorString);
+ sinkTopic, offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter,
+ valueConverterArgs);
Thread.sleep(20000);
} catch (Exception e) {
throw new RuntimeException("Could not start the worker and herder cluster" + e);
diff --git a/src/test/java/org/apache/geode/kafka/utilities/TestObject.java b/src/test/java/org/apache/geode/kafka/utilities/TestObject.java
new file mode 100644
index 0000000..a1ddd60
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/utilities/TestObject.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.utilities;
+
+import java.util.List;
+import java.util.Objects;
+
+public class TestObject {
+ public String name;
+ public int age;
+ public double number;
+ public List<String> words;
+
+ @SuppressWarnings("unused")
+ public TestObject() {}
+
+ public TestObject(String name, int age, double number, List<String> words) {
+ this.name = name;
+ this.age = age;
+ this.number = number;
+ this.words = words;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestObject that = (TestObject) o;
+ return age == that.age &&
+ Double.compare(that.number, number) == 0 &&
+ Objects.equals(name, that.name) &&
+ Objects.equals(words, that.words);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, age, number, words);
+ }
+}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 7c58bc0..098b2e6 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -29,10 +29,12 @@
}
public void start(String maxTasks, String sourceRegion, String sinkRegion, String sourceTopic,
- String sinkTopic, String offsetPath, String locatorString)
+ String sinkTopic, String offsetPath, String locatorString, String keyConverter,
+ String keyConverterArgs, String valueConverter, String valueConverterArgs)
throws IOException, InterruptedException {
String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic,
- offsetPath, locatorString};
+ offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter,
+ valueConverterArgs};
workerAndHerder.exec(args);
}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 3fe6e66..3723018 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -17,8 +17,10 @@
import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -40,7 +42,7 @@
public class WorkerAndHerderWrapper {
public static void main(String[] args) throws IOException {
- if (args.length != 7) {
+ if (args.length != 11) {
throw new RuntimeException("Insufficient arguments to start workers and herders");
}
String maxTasks = args[0];
@@ -52,6 +54,18 @@
String regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
String topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
String locatorString = args[6];
+ String keyConverter = args[7];
+ String keyConverterArgs = args[8];
+ Map<String, String> keyConverterProps = new HashMap<>();
+ if (keyConverterArgs != null && !keyConverterArgs.isEmpty()) {
+ keyConverterProps = parseArguments(keyConverterArgs, true);
+ }
+ String valueConverter = args[9];
+ String valueConverterArgs = args[10];
+ Map<String, String> valueConverterProps = new HashMap<>();
+ if (valueConverterArgs != null && !valueConverterArgs.isEmpty()) {
+ valueConverterProps = parseArguments(valueConverterArgs, false);
+ }
Map props = new HashMap();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -61,10 +75,10 @@
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
- props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
- "org.apache.kafka.connect.storage.StringConverter");
- props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
- "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, keyConverter);
+ props.putAll(keyConverterProps);
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, valueConverter);
+ props.putAll(valueConverterProps);
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");
props.put(GeodeConnectorConfig.LOCATORS, locatorString);
@@ -108,4 +122,19 @@
}
+
+ // We expect that converter properties will be supplied as a comma separated list of the form
+ // "first.property.name=first.property.value,second.property.name=second.property.value"
+ public static Map<String, String> parseArguments(String args, boolean isKeyConverter) {
+ String propertyNamePrefix;
+ if (isKeyConverter) {
+ propertyNamePrefix = "key.converter.";
+ } else {
+ propertyNamePrefix = "value.converter.";
+ }
+ return Arrays.stream(args.split(","))
+ .collect(Collectors.toMap(
+ string -> propertyNamePrefix + string.substring(0, string.indexOf("=")).trim(),
+ string -> string.substring(string.indexOf("=") + 1).trim()));
+ }
}