geode as source to kafka hack test successful
diff --git a/build.gradle b/build.gradle
index 9e23d87..c9500b0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,6 +14,7 @@
dependencies {
compile 'org.apache.geode:geode-core:1.11.0'
+ compile 'org.apache.geode:geode-cq:1.11.0'
compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java
index 1f74700..9608f07 100644
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ b/src/main/java/kafka/GeodeKafkaSourceTask.java
@@ -8,6 +8,7 @@
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -38,11 +39,12 @@
@Override
public void start(Map<String, String> props) {
+ System.out.println("JASON task start");
batchSize = 100;
queueSize = 100000;
String regionName = "someRegion";
eventBuffer = new LinkedBlockingQueue<>(queueSize);
- topics = new String[] {"myTopic"};
+ topics = new String[] {"someTopic"};
sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
@@ -50,10 +52,12 @@
offset.put("OFFSET", 0L);
installOnGeode("localHost", 10334, "someRegion");
+ System.out.println("JASON task start end");
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
+// System.out.println("JASON polling");
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<CqEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
@@ -63,9 +67,11 @@
records.add(new SourceRecord(sourcePartition, offset, topic, null, event));
}
+ System.out.println("JASON we polled and returning records" + records.size());
return records;
}
+// System.out.println("JASON we didn't poll any records");
return null;
}
@@ -80,16 +86,26 @@
.setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create();
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener());
+ System.out.println("JASON installing on Geode");
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
+ System.out.println("JASON installing new cq");
clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes,
- true);
+ true).execute();
+ System.out.println("JASON finished installing cq");
} catch (CqExistsException e) {
+ System.out.println("UHH");
e.printStackTrace();
- } catch (CqException e) {
+ } catch (CqException | RegionNotFoundException e) {
+ System.out.println("UHH e");
e.printStackTrace();
}
+ catch (Exception e) {
+ System.out.println("UHHHHHH " + e);
+ }
+ System.out.println("JASON task calling ready for events");
clientCache.readyForEvents();
+ System.out.println("JASON task ready for events");
}
private static class GeodeKafkaSourceListener implements CqListener {
@@ -97,6 +113,7 @@
@Override
public void onEvent(CqEvent aCqEvent) {
try {
+ System.out.println("JASON cqEvent and putting into eventBuffer");
eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java
index cb5f9ad..2b0293f 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/kafka/GeodeKafkaTestCluster.java
@@ -10,6 +10,12 @@
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -32,7 +38,10 @@
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -45,6 +54,8 @@
private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
private static GeodeLocalCluster geodeLocalCluster;
+ private static WorkerAndHerderCluster workerAndHerderCluster;
+ private static Consumer<String, String> consumer;
@BeforeClass
public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
@@ -52,63 +63,30 @@
startKafka();
startGeode();
createTopic();
+ Thread.sleep(5000);
startWorker();
+ consumer = createConsumer();
+ Thread.sleep(5000);
}
@AfterClass
public static void shutdown() {
+ workerAndHerderCluster.stop();
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic("someTopic");
+
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
}
- private static void startWorker() {
- Map props = new HashMap();
- props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
- props.put("offset.storage.file.filename", "/tmp/connect.offsets");
- // fast flushing for testing.
- props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
-
-
- props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put("internal.key.converter.schemas.enable", "false");
- props.put("internal.value.converter.schemas.enable", "false");
- props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put("key.converter.schemas.enable", "false");
- props.put("value.converter.schemas.enable", "false");
-
-
- WorkerConfig workerCfg = new StandaloneConfig(props);
-
- MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
- offBackingStore.configure(workerCfg);
-
- Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
- worker.start();
-
- Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
- herder.start();
-
-
-
-
- Map<String, String> sourceProps = new HashMap<>();
- sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
- sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
- sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-//
- herder.putConnectorConfig(
- sourceProps.get(ConnectorConfig.NAME_CONFIG),
- sourceProps, true, (error, result)->{
- System.out.println("CALLBACK: " + result);
- });
- System.out.println("Worker and herder started");
+ private static void startWorker() throws IOException, InterruptedException {
+ workerAndHerderCluster = new WorkerAndHerderCluster();
+ workerAndHerderCluster.start();
+ Thread.sleep(20000);
}
private static void createTopic() {
@@ -186,13 +164,40 @@
}
+ //consumer props, less important, just for testing?
+ public static Consumer<String,String> createConsumer() {
+ final Properties props = new Properties();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ "myGroup");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ // Create the consumer using props.
+ final Consumer<String, String> consumer =
+ new KafkaConsumer<>(props);
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList("someTopic"));
+ return consumer;
+ }
@Test
public void testX() throws InterruptedException {
ClientCache client = createGeodeClient();
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion");
region.put("JASON KEY", "JASON VALUE");
+ System.out.println("PUT COMPLETE!");
+ region.get("JASON KEY");
+// client.close();
+
+
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
+ for (ConsumerRecord<String, String> record: records) {
+ System.out.println("JASON we consumed a record:" + record);
+ }
System.out.println("TEST COMPLETE!");
+
}
}
diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/kafka/GeodeLocalCluster.java
index fd72dec..43ac8f5 100644
--- a/src/test/java/kafka/GeodeLocalCluster.java
+++ b/src/test/java/kafka/GeodeLocalCluster.java
@@ -13,12 +13,12 @@
}
public void start() throws IOException, InterruptedException {
+ System.out.println("starting locator");
locatorProcess.exec("10334");
- Thread.sleep(30000);
+ Thread.sleep(15000);
System.out.println("is alive?" + locatorProcess.process.isAlive());
serverProcess.exec("40404");
Thread.sleep(30000);
-
}
public void stop() {