Added sink task and sink class
Added end to end sink test
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 68460e4..08af2ad 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -1,76 +1,81 @@
package geode.kafka.sink;
+import geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-public class GeodeKafkaSink {
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
-//
-// /** Sink properties. */
-// private Map<String, String> configProps;
-//
-// /** Expected configurations. */
-// private static final ConfigDef CONFIG_DEF = new ConfigDef();
-//
-// /** {@inheritDoc} */
-// @Override public String version() {
-// return AppInfoParser.getVersion();
-// }
-//
-// /**
-// * A sink lifecycle method. Validates grid-specific sink properties.
-// *
-// * @param props Sink properties.
-// */
-// @Override public void start(Map<String, String> props) {
-// configProps = props;
-//
-// try {
-// A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
-// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
-// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
-// }
-// catch (IllegalArgumentException e) {
-// throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
-// }
-// }
-//
-// /**
-// * Obtains a sink task class to be instantiated for feeding data into grid.
-// *
-// * @return IgniteSinkTask class.
-// */
-// @Override public Class<? extends Task> taskClass() {
-// return IgniteSinkTask.class;
-// }
-//
-// /**
-// * Builds each config for <tt>maxTasks</tt> tasks.
-// *
-// * @param maxTasks Max number of tasks.
-// * @return Task configs.
-// */
-// @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
-// List<Map<String, String>> taskConfigs = new ArrayList<>();
-// Map<String, String> taskProps = new HashMap<>();
-//
-// taskProps.putAll(configProps);
-//
-// for (int i = 0; i < maxTasks; i++)
-// taskConfigs.add(taskProps);
-//
-// return taskConfigs;
-// }
-//
-// /** {@inheritDoc} */
-// @Override public void stop() {
-// // No-op.
-// }
-//
-// /** {@inheritDoc} */
-// @Override public ConfigDef config() {
-// return CONFIG_DEF;
-// }
+public class GeodeKafkaSink extends SinkConnector {
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
+ private Map<String, String> sharedProps;
+
+ @Override
+ public void start(Map<String, String> props) {
+ sharedProps = computeMissingConfigurations(props);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return GeodeKafkaSinkTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ Map<String, String> taskProps = new HashMap<>();
+
+ taskProps.putAll(sharedProps);
+
+ for (int i = 0; i < maxTasks; i++) {
+ taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+ taskConfigs.add(taskProps);
+ }
+
+ return taskConfigs;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public String version() {
+ //TODO
+ return "unknown";
+ }
+
+
+ private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+ props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+ props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
+ props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
+ props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
+ props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
+ props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
+ return props;
+ }
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index d5da62a..36b9942 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -28,6 +28,7 @@
public class GeodeKafkaSource extends SourceConnector {
private Map<String, String> sharedProps;
+ //TODO maybe club this into GeodeConnnectorConfig
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -40,7 +41,6 @@
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
-
taskProps.putAll(sharedProps);
for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 3f2ac80..829eb29 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -69,7 +69,7 @@
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
}
catch (Exception e) {
- logger.error("Unable to start task", e);
+ logger.error("Unable to start source task", e);
throw e;
}
}
@@ -81,7 +81,7 @@
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
for (String topic : topics) {
- records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
+ records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
}
}
return records;
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index f1a6dff..f0d7f06 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -13,12 +13,19 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -26,12 +33,15 @@
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
public class GeodeKafkaTestCluster {
@@ -54,10 +64,17 @@
startKafka();
startGeode();
createTopic();
-
startWorker();
consumer = createConsumer();
- Thread.sleep(5000);
+ }
+
+ @Before
+ public void beforeTests() {
+ }
+
+ @After
+ public void afterTests() {
+
}
@AfterClass
@@ -86,7 +103,7 @@
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(TEST_TOPICS,3
+ adminZkClient.createTopic(TEST_TOPICS,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
}
@@ -161,6 +178,21 @@
return consumer;
}
+ //consumer props, less important, just for testing?
+ public static Producer<String,String> createProducer() {
+ final Properties props = new Properties();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+
+ // Create the producer using props.
+ final Producer<String, String> producer =
+ new KafkaProducer<>(props);
+ return producer;
+ }
+
@Test
public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
@@ -179,4 +211,18 @@
});
}
+ @Test
+ public void endToEndSinkTest() {
+ ClientCache client = createGeodeClient();
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
+
+ Producer<String, String> producer = createProducer();
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
+ }
+
+ int i = 0;
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
+ }
+
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index cc8e27b..d6fc7a6 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -1,5 +1,6 @@
package geode.kafka;
+import geode.kafka.sink.GeodeKafkaSink;
import geode.kafka.source.GeodeKafkaSource;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -62,8 +63,20 @@
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
sourceProps, true, (error, result)->{
- System.out.println("CALLBACK: " + result + "::: error?" + error);
});
+ Map<String, String> sinkProps = new HashMap<>();
+ sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
+ sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
+ sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+ sinkProps.put(REGIONS, TEST_REGIONS);
+ sinkProps.put(TOPICS, TEST_TOPICS);
+
+ herder.putConnectorConfig(
+ sinkProps.get(ConnectorConfig.NAME_CONFIG),
+ sinkProps, true, (error, result)->{
+ });
+
+
}
}