Added JsonPdxConverter (#2)

- Allows PdxInstance objects to be converted to JSON bytes when sourced
from a Geode region into a Kafka topic
- Allows JSON bytes to be converted to PdxInstance objects when sinked
into a Geode region from a Kafka topic
- Added unit and DUnit tests for JsonPdxConverter
- Added functionality to the test framework to specify custom key
converter, custom value converter and configuration properties for each
- Added TestObject class to allow validation of
serialization/deserialization

Authored-by: Donal Evans <doevans@pivotal.io>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 452c59f..270733f 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -23,6 +23,7 @@
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.storage.StringConverter;
 
 public class GeodeConnectorConfig extends AbstractConfig {
 
@@ -42,6 +43,9 @@
   public static final String SECURITY_USER = "security-username";
   public static final String SECURITY_PASSWORD = "security-password";
 
+  public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
+  public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
+
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
   private String securityClientAuthInit;
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 8833fd9..99ef11c 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -63,6 +63,7 @@
       String securityPassword, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
+    ccf.setPdxReadSerialized(true);
     if (usesSecurity) {
       if (securityUserName != null && securityPassword != null) {
         ccf.set(SECURITY_USER, securityUserName);
diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
new file mode 100644
index 0000000..e630b4e
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+import org.apache.geode.pdx.JSONFormatter;
+import org.apache.geode.pdx.PdxInstance;
+
+public class JsonPdxConverter implements Converter {
+  public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
+  // Default value = false
+  public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
+  private Map<String, String> internalConfig = new HashMap<>();
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    if (configs != null) {
+      configs.forEach((key, value) -> internalConfig.put(key, (String) value));
+    }
+  }
+
+  @Override
+  public byte[] fromConnectData(String topic, Schema schema, Object value) {
+    PdxInstance pdxInstanceValue = (PdxInstance) value;
+    byte[] jsonBytes = getJsonBytes(pdxInstanceValue);
+    if (!shouldAddTypeAnnotation()) {
+      return jsonBytes;
+    }
+    String jsonString = new String(jsonBytes);
+    if (!jsonString.contains(JSON_TYPE_ANNOTATION)) {
+      jsonString = jsonString.replaceFirst("\\{",
+          "{" + JSON_TYPE_ANNOTATION + " : \"" + pdxInstanceValue.getClassName() + "\",");
+    }
+    return jsonString.getBytes();
+  }
+
+  @Override
+  public SchemaAndValue toConnectData(String topic, byte[] value) {
+    return new SchemaAndValue(null, JSONFormatter.fromJSON(value));
+  }
+
+  byte[] getJsonBytes(PdxInstance pdxInstanceValue) {
+    return JSONFormatter.toJSONByteArray(pdxInstanceValue);
+  }
+
+  boolean shouldAddTypeAnnotation() {
+    return Boolean.parseBoolean(internalConfig.get(ADD_TYPE_ANNOTATION_TO_JSON));
+  }
+
+  public Map<String, String> getInternalConfig() {
+    return internalConfig;
+  }
+}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index ecee4bf..668681d 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -101,7 +101,6 @@
 
   @Test
   public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {
-
     MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
     int locatorPort = locator.getPort();
     MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index 108d86a..f00965c 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -54,7 +54,6 @@
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
-
   @Rule
   public TestName testName = new TestName();
 
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
new file mode 100644
index 0000000..d62ccc1
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_KEY_CONVERTER;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.utils.Time;
+import org.assertj.core.util.Arrays;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.kafka.utilities.KafkaLocalCluster;
+import org.apache.geode.kafka.utilities.TestObject;
+import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class JsonPdxConverterDUnitTest {
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup()
+      throws Exception {
+    startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+        false,
+        200000,
+        15000,
+        10,
+        Time.SYSTEM,
+        "myGroup",
+        "myMetricType",
+        null);
+
+    zkClient.close();
+  }
+
+  @Test
+  public void jsonPdxConverterCanConvertPdxInstanceToJsonAndBackWhenDataMovesFromRegionToTopicToRegion()
+      throws Exception {
+    MemberVM locator = clusterStartupRule.startLocatorVM(0);
+    int locatorPort = locator.getPort();
+    // .withPDXReadSerialized()
+    MemberVM server1 = clusterStartupRule.startServerVM(1, server -> server
+        .withConnectionToLocator(locatorPort)
+    // .withPDXReadSerialized()
+    );
+    ClientVM client1 = clusterStartupRule.startClientVM(2, client -> client
+        .withLocatorConnection(locatorPort)
+        .withCacheSetup(
+            cf -> cf.setPdxSerializer(new ReflectionBasedAutoSerializer("org.apache.geode.kafka.*"))
+                .setPdxReadSerialized(true)));
+
+    // Set unique names for all the different components
+    String sourceRegionName = "SOURCE_REGION";
+    String sinkRegionName = "SINK_REGION";
+    // We only need one topic for this test, which we will both source to and sink from
+    String topicName = "TEST_TOPIC";
+
+    /*
+     * Start the Apache Geode cluster and create the source and sink regions.
+     * Create a Apache Geode client which can insert data into the source and get data from the sink
+     */
+    server1.invoke(() -> {
+      ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sourceRegionName);
+      ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sinkRegionName);
+    });
+    client1.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(sourceRegionName);
+      clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(sinkRegionName);
+    });
+
+    /*
+     * Start the Kafka Cluster, workers and the topic to which the Apache Geode will connect as
+     * a source
+     */
+    WorkerAndHerderCluster workerAndHerderCluster = null;
+    KafkaLocalCluster kafkaLocalCluster = null;
+    try {
+      kafkaLocalCluster = startKafka(
+          getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+      createTopic(topicName, 1, 1);
+      // Create workers and herder cluster
+      workerAndHerderCluster = startWorkerAndHerderCluster(1, sourceRegionName, sinkRegionName,
+          topicName, topicName, temporaryFolderForOffset.getRoot().getAbsolutePath(),
+          "localhost[" + locatorPort + "]", DEFAULT_KEY_CONVERTER, "",
+          JsonPdxConverter.class.getCanonicalName(),
+          JsonPdxConverter.ADD_TYPE_ANNOTATION_TO_JSON + "=true");
+
+      // Insert data into the Apache Geode source and retrieve the data from the Apache Geode sink
+      // from the client
+      client1.invoke(() -> {
+        // Create an object that will be serialized into a PdxInstance
+        String name = "testName";
+        int age = 42;
+        double number = 3.141;
+        List<String> words = new ArrayList<>();
+        words.add("words1");
+        words.add("words2");
+        words.add("words3");
+        TestObject originalObject = new TestObject(name, age, number, words);
+
+        ClientCache clientCache = ClusterStartupRule.getClientCache();
+
+        // Create a PdxInstance from the test object
+        PdxInstanceFactory instanceFactory =
+            clientCache.createPdxInstanceFactory(originalObject.getClass().getName());
+        Arrays.asList(originalObject.getClass().getFields())
+            .stream()
+            .map(field -> (Field) field)
+            .forEach(field -> {
+              try {
+                Object value = field.get(originalObject);
+                Class type = field.getType();
+                instanceFactory.writeField(field.getName(), value, type);
+              } catch (IllegalAccessException ignore) {
+              }
+            });
+        PdxInstance putInstance = instanceFactory.create();
+
+        // Put the PdxInstance into the source region
+        String key = "key1";
+        clientCache.getRegion(sourceRegionName).put(key, putInstance);
+
+        // Assert that the data that arrives in the sink region is the same as the data that was put
+        // into the source region
+        Region<Object, Object> sinkRegion = clientCache.getRegion(sinkRegionName);
+        await().atMost(10, TimeUnit.SECONDS)
+            .untilAsserted(() -> assertEquals(1, sinkRegion.sizeOnServer()));
+        PdxInstance getInstance = (PdxInstance) sinkRegion.get(key);
+
+        assertEquals(originalObject, getInstance.getObject());
+      });
+
+    } finally {
+      // Clean up by deleting the topic
+      deleteTopic(topicName);
+      if (workerAndHerderCluster != null) {
+        workerAndHerderCluster.stop();
+      }
+      if (kafkaLocalCluster != null) {
+        kafkaLocalCluster.stop();
+      }
+    }
+  }
+}
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java
new file mode 100644
index 0000000..6ffcdd4
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.converter;
+
+import static org.apache.geode.kafka.converter.JsonPdxConverter.JSON_TYPE_ANNOTATION;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.pdx.PdxInstance;
+
+public class JsonPdxConverterTest {
+  private JsonPdxConverter spyConverter;
+  PdxInstance mockPdxInstance;
+  private final String className = "className";
+  private final String jsonWithoutTypeAnnotation = "{\"name\" : \"test\"}";
+  private final String jsonWithTypeAnnotation =
+      "{" + JSON_TYPE_ANNOTATION + " : \"" + className + "\",\"name\" : \"test\"}";
+
+  @Before
+  public void setUp() {
+    spyConverter = spy(new JsonPdxConverter());
+    mockPdxInstance = mock(PdxInstance.class);
+    when(mockPdxInstance.getClassName()).thenReturn(className);
+  }
+
+  @Test
+  public void configurePopulatesInternalConfiguration() {
+    Map<String, String> map = new HashMap<>();
+    IntStream.range(0, 10).forEach(i -> map.put("key" + i, "value" + i));
+    spyConverter.configure(map, false);
+
+    assertThat(spyConverter.getInternalConfig(), is(map));
+  }
+
+  @Test
+  public void fromConnectDataAddsTypeAnnotationWhenConfigurationIsSetAndJsonDoesNotAlreadyContainAnnotation() {
+    byte[] jsonBytes = jsonWithoutTypeAnnotation.getBytes();
+
+    doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+    doReturn(true).when(spyConverter).shouldAddTypeAnnotation();
+
+    byte[] expectedOutputJsonBytes = jsonWithTypeAnnotation.getBytes();
+
+    assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance),
+        is(expectedOutputJsonBytes));
+  }
+
+  @Test
+  public void fromConnectDataDoesNotAddTypeAnnotationWhenConfigurationIsSetAndJsonAlreadyContainsAnnotation() {
+    byte[] jsonBytes = jsonWithTypeAnnotation.getBytes();
+
+    doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+    doReturn(true).when(spyConverter).shouldAddTypeAnnotation();
+
+    assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance), is(jsonBytes));
+  }
+
+  @Test
+  public void fromConnectDataDoesNotAddTypeAnnotationWhenConfigurationIsNotSet() {
+    byte[] jsonBytes = jsonWithoutTypeAnnotation.getBytes();
+
+    doReturn(jsonBytes).when(spyConverter).getJsonBytes(mockPdxInstance);
+    doReturn(false).when(spyConverter).shouldAddTypeAnnotation();
+
+    assertThat(spyConverter.fromConnectData(null, null, mockPdxInstance), is(jsonBytes));
+  }
+}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
index f9c1e2b..198e129 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.kafka.utilities;
 
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_KEY_CONVERTER;
+import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_VALUE_CONVERTER;
 import static org.awaitility.Awaitility.await;
 
 import java.io.IOException;
@@ -136,10 +138,26 @@
       String sinkTopic,
       String offsetPath,
       String locatorString) {
+    return startWorkerAndHerderCluster(maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic,
+        offsetPath, locatorString, DEFAULT_KEY_CONVERTER, "", DEFAULT_VALUE_CONVERTER, "");
+  }
+
+  public static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks,
+      String sourceRegion,
+      String sinkRegion,
+      String sourceTopic,
+      String sinkTopic,
+      String offsetPath,
+      String locatorString,
+      String keyConverter,
+      String keyConverterArgs,
+      String valueConverter,
+      String valueConverterArgs) {
     WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster();
     try {
       workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, sinkRegion, sourceTopic,
-          sinkTopic, offsetPath, locatorString);
+          sinkTopic, offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter,
+          valueConverterArgs);
       Thread.sleep(20000);
     } catch (Exception e) {
       throw new RuntimeException("Could not start the worker and herder cluster" + e);
diff --git a/src/test/java/org/apache/geode/kafka/utilities/TestObject.java b/src/test/java/org/apache/geode/kafka/utilities/TestObject.java
new file mode 100644
index 0000000..a1ddd60
--- /dev/null
+++ b/src/test/java/org/apache/geode/kafka/utilities/TestObject.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.kafka.utilities;
+
+import java.util.List;
+import java.util.Objects;
+
+public class TestObject {
+  public String name;
+  public int age;
+  public double number;
+  public List<String> words;
+
+  @SuppressWarnings("unused")
+  public TestObject() {}
+
+  public TestObject(String name, int age, double number, List<String> words) {
+    this.name = name;
+    this.age = age;
+    this.number = number;
+    this.words = words;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TestObject that = (TestObject) o;
+    return age == that.age &&
+        Double.compare(that.number, number) == 0 &&
+        Objects.equals(name, that.name) &&
+        Objects.equals(words, that.words);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, age, number, words);
+  }
+}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 7c58bc0..098b2e6 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -29,10 +29,12 @@
   }
 
   public void start(String maxTasks, String sourceRegion, String sinkRegion, String sourceTopic,
-      String sinkTopic, String offsetPath, String locatorString)
+      String sinkTopic, String offsetPath, String locatorString, String keyConverter,
+      String keyConverterArgs, String valueConverter, String valueConverterArgs)
       throws IOException, InterruptedException {
     String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic,
-        offsetPath, locatorString};
+        offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter,
+        valueConverterArgs};
     workerAndHerder.exec(args);
   }
 
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 3fe6e66..3723018 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -17,8 +17,10 @@
 import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -40,7 +42,7 @@
 public class WorkerAndHerderWrapper {
 
   public static void main(String[] args) throws IOException {
-    if (args.length != 7) {
+    if (args.length != 11) {
       throw new RuntimeException("Insufficient arguments to start workers and herders");
     }
     String maxTasks = args[0];
@@ -52,6 +54,18 @@
     String regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
     String topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
     String locatorString = args[6];
+    String keyConverter = args[7];
+    String keyConverterArgs = args[8];
+    Map<String, String> keyConverterProps = new HashMap<>();
+    if (keyConverterArgs != null && !keyConverterArgs.isEmpty()) {
+      keyConverterProps = parseArguments(keyConverterArgs, true);
+    }
+    String valueConverter = args[9];
+    String valueConverterArgs = args[10];
+    Map<String, String> valueConverterProps = new HashMap<>();
+    if (valueConverterArgs != null && !valueConverterArgs.isEmpty()) {
+      valueConverterProps = parseArguments(valueConverterArgs, false);
+    }
 
     Map props = new HashMap();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -61,10 +75,10 @@
 
     props.put("internal.key.converter.schemas.enable", "false");
     props.put("internal.value.converter.schemas.enable", "false");
-    props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
-        "org.apache.kafka.connect.storage.StringConverter");
-    props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
-        "org.apache.kafka.connect.storage.StringConverter");
+    props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, keyConverter);
+    props.putAll(keyConverterProps);
+    props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, valueConverter);
+    props.putAll(valueConverterProps);
     props.put("key.converter.schemas.enable", "false");
     props.put("value.converter.schemas.enable", "false");
     props.put(GeodeConnectorConfig.LOCATORS, locatorString);
@@ -108,4 +122,19 @@
 
 
   }
+
+  // We expect that converter properties will be supplied as a comma separated list of the form
+  // "first.property.name=first.property.value,second.property.name=second.property.value"
+  public static Map<String, String> parseArguments(String args, boolean isKeyConverter) {
+    String propertyNamePrefix;
+    if (isKeyConverter) {
+      propertyNamePrefix = "key.converter.";
+    } else {
+      propertyNamePrefix = "value.converter.";
+    }
+    return Arrays.stream(args.split(","))
+        .collect(Collectors.toMap(
+            string -> propertyNamePrefix + string.substring(0, string.indexOf("=")).trim(),
+            string -> string.substring(string.indexOf("=") + 1).trim()));
+  }
 }