Worker and herder added
Source taskes being instantiated
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index e6aa578..1f74700 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -42,7 +42,7 @@
queueSize = 100000;
String regionName = "someRegion";
eventBuffer = new LinkedBlockingQueue<>(queueSize);
- topics = new String[] {"default"};
+ topics = new String[] {"myTopic"};
sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index 824b148..cb5f9ad 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -1,10 +1,29 @@
package kafka;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import org.I0Itec.zkclient.ZkClient;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -13,6 +32,8 @@
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
public class GeodeKafkaTestCluster {
@@ -30,14 +51,73 @@
startZooKeeper();
startKafka();
startGeode();
+ createTopic();
+ startWorker();
}
@AfterClass
public static void shutdown() {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
+ 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ adminZkClient.deleteTopic("someTopic");
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
}
+
+ private static void startWorker() {
+ Map props = new HashMap();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+
+ props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
+
+
+ WorkerConfig workerCfg = new StandaloneConfig(props);
+
+ MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+ offBackingStore.configure(workerCfg);
+
+ Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
+ worker.start();
+
+ Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
+ herder.start();
+
+
+
+
+ Map<String, String> sourceProps = new HashMap<>();
+ sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
+ sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
+ sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+//
+ herder.putConnectorConfig(
+ sourceProps.get(ConnectorConfig.NAME_CONFIG),
+ sourceProps, true, (error, result)->{
+ System.out.println("CALLBACK: " + result);
+ });
+ System.out.println("Worker and herder started");
+ }
+
+ private static void createTopic() {
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
+ 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+
private ClientCache createGeodeClient() {
return new ClientCacheFactory().addPoolLocator("localhost", 10334).create();
}
@@ -87,17 +167,7 @@
//Specifically GeodeKafka connector configs
- /*
- props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
- props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
- props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
- props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
- props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
- props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
- props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
- props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
- props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
- */
+
/*
name=file-source