All configurable parameters are now using '-' as delimiter instead of camelCase
Added documentation to the configDef definitions
Updated readme with latest configuration changes
diff --git a/README.md b/README.md
index 39dd423..73953a1 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@
name=geode-kafka-sink
connector.class=GeodeKafkaSink
tasks.max=1
-topicToRegions=[someTopicToSinkFrom:someRegionToConsume]
+topic-to-regions=[someTopicToSinkFrom:someRegionToConsume]
topics=someTopicToSinkFrom
locators=localHost[10334]
```
@@ -35,7 +35,7 @@
name=geode-kafka-source
connector.class=GeodeKafkaSource
tasks.max=1
-regionToTopics=[someRegionToSourceFrom:someTopicToConsume]
+region-to-topics=[someRegionToSourceFrom:someTopicToConsume]
locators=localHost[10334]
```
@@ -47,27 +47,29 @@
#### GeodeKafkaSink Properties
| Property | Required | Description| Default |
|---|---|---|---|
-| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
-|topicToRegions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None. This is required to be set in the source connector properties
+|locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
+|topic-to-regions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | "[gkctopic:gkcregion]"
|security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |
+|null-values-mean-remove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |
-* The topicToRegions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic.
+* The topic-to-regions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic.
#### GeodeKafkaSource Properties
| Property | Required| Description| Default |
|---|---|---|---|
| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
-|regionToTopics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None. This is required to be set in the source connector properties|
+|region-to-topics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | "[gkcregion:gkctopic]"|
|security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|geodeConnectorBatchSize| no | Maximum number of records to return on each poll| 100 |
-|geodeConnectorQueueSize| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
-| loadEntireRegion| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true |
-|durableClientIdPrefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" |
-| durableClientTimeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
-| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |
+|security-username| no | Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user| null|
+|security-password| no | Supply a password to be used to authenticate with Geode| null|
+|geode-connector-batch-size| no | Maximum number of records to return on each poll| 100 |
+|geode-connector-queue-size| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
+| load-entire-region| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true |
+|durable-client-id-prefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" |
+| durable-client-timeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
+| cq-prefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |
-* The regionToTopics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics
+* The region-to-topics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics
---
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index 2860a8f..cc151a4 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -14,9 +14,6 @@
*/
package org.geode.kafka;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -24,6 +21,9 @@
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
public class GeodeConnectorConfig extends AbstractConfig {
// GeodeKafka Specific Configuration
@@ -37,9 +37,10 @@
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
- private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
+ private static final String DEFAULT_SECURITY_AUTH_INIT =
+ "org.geode.kafka.security.SystemPropertyAuthInit";
public static final String SECURITY_USER = "security-username";
- public static final String SECURITY_PASSWORD= "security-password";
+ public static final String SECURITY_PASSWORD = "security-password";
protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
@@ -47,13 +48,13 @@
private String securityUserName;
private String securityPassword;
- //Just for testing
+ // Just for testing
protected GeodeConnectorConfig() {
super(new ConfigDef(), new HashMap());
taskId = 0;
}
- //Just for testing
+ // Just for testing
protected GeodeConnectorConfig(Map<String, String> props) {
super(new ConfigDef(), props);
taskId = 0;
@@ -67,19 +68,27 @@
securityUserName = getString(SECURITY_USER);
securityPassword = getString(SECURITY_PASSWORD);
securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
- //if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
+ // if we registered a username/password instead of auth init, we should use the default auth
+ // init if one isn't specified
if (usesSecurity()) {
- securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
+ securityClientAuthInit =
+ securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
}
}
protected static ConfigDef configurables() {
ConfigDef configDef = new ConfigDef();
- configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,"");
- configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, "");
- configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
- configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
- configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
+ configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,
+ "Internally used to identify each task");
+ configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH,
+ "A comma separated string of locators that configure which locators to connect to");
+ configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
+ "Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user");
+ configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
+ "Supply a password to be used to authenticate with Geode");
+ configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
+ ConfigDef.Importance.HIGH,
+ "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
return configDef;
}
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java
index 6190ef2..9f30242 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -14,10 +14,12 @@
*/
package org.geode.kafka;
-import java.util.Collection;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+
import java.util.List;
-import org.apache.geode.cache.query.CqResults;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.geode.cache.client.ClientCache;
@@ -26,12 +28,9 @@
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.RegionNotFoundException;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
-
public class GeodeContext {
private ClientCache clientCache;
@@ -40,15 +39,18 @@
public GeodeContext() {}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+ String durableClientId, String durableClientTimeout, String securityAuthInit,
+ String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
- clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
+ String securityAuthInit, String securityUserName, String securityPassword,
+ boolean usesSecurity) {
+ clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName,
+ securityPassword, usesSecurity);
return clientCache;
}
@@ -57,7 +59,8 @@
}
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
- String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+ String durableClientTimeOut, String securityAuthInit, String securityUserName,
+ String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();
if (usesSecurity) {
@@ -93,7 +96,7 @@
}
public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
- boolean isDurable) throws ConnectException {
+ boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index cc525a2..6b646ee 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -19,7 +19,6 @@
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.security.AuthInitialize;
import org.apache.geode.security.AuthenticationFailedException;
-import org.geode.kafka.GeodeConnectorConfig;
public class SystemPropertyAuthInit implements AuthInitialize {
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
index a8985c2..9ee5189 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,17 +14,17 @@
*/
package org.geode.kafka.sink;
+import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
-
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSink extends SinkConnector {
private Map<String, String> sharedProps;
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 7db384f..be44356 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -20,9 +20,9 @@
import java.util.Map;
import java.util.stream.Collectors;
-import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
+import org.geode.kafka.GeodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +61,9 @@
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
index bb51b0e..a074220 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,20 +14,19 @@
*/
package org.geode.kafka.sink;
-import org.apache.kafka.common.config.ConfigDef;
-
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
import org.geode.kafka.GeodeConnectorConfig;
public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SINK_CONFIG_DEF = configurables();
// Used by sink
- public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+ public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
- public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+ public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
private Map<String, List<String>> topicToRegions;
@@ -41,8 +40,12 @@
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
- configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
- configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
+ configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
+ DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
+ "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]");
+ configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
+ DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
+ "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region");
return configDef;
}
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
index dac94f6..7b4445e 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
@@ -14,19 +14,19 @@
*/
package org.geode.kafka.source;
+import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
-
-import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSource extends SourceConnector {
@@ -43,7 +43,8 @@
List<Map<String, String>> taskConfigs = new ArrayList<>();
List<String> bindings =
GeodeConnectorConfig
- .parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ .parseStringByComma(
+ sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b1c289f..4acc081 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,17 +21,16 @@
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.Struct;
-import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.geode.kafka.GeodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
public class GeodeKafkaSourceTask extends SourceTask {
@@ -70,7 +69,9 @@
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
- geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -140,7 +141,10 @@
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
- eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
+ eventBuffer.get()
+ .addAll((Collection<GeodeEvent>) events.stream().map(
+ e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value")))
+ .collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
index 78673cd..e96796b 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -26,29 +26,30 @@
public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
// Geode Configuration
- public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
+ public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
public static final String DEFAULT_DURABLE_CLIENT_ID = "";
- public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+ public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
- public static final String CQ_PREFIX = "cqPrefix";
+ public static final String CQ_PREFIX = "cq-prefix";
public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
/**
* Used as a key for source partitions
*/
public static final String REGION_PARTITION = "regionPartition";
- public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+ public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
- public static final String CQS_TO_REGISTER = "cqsToRegister"; //used internally so that only 1 task will register a cq
+ public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
+ // task will register a cq
- public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+ public static final String BATCH_SIZE = "geode-connector-batch-size";
public static final String DEFAULT_BATCH_SIZE = "100";
- public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+ public static final String QUEUE_SIZE = "geode-connector-queue-size";
public static final String DEFAULT_QUEUE_SIZE = "10000";
- public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+ public static final String LOAD_ENTIRE_REGION = "load-entire-region";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
private final String durableClientId;
@@ -81,14 +82,27 @@
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
- configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Internally created and used parameter, for signalling a task to register cqs");
- configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, "");
- configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, "");
- configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, ConfigDef.Importance.LOW, "");
- configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, "");
- configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, ConfigDef.Importance.MEDIUM, "");
- configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, ConfigDef.Importance.MEDIUM, "");
- configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, ConfigDef.Importance.MEDIUM, "");
+ configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
+ "Internally created and used parameter, for signalling a task to register cqs");
+ configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
+ DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
+ "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"");
+ configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID,
+ ConfigDef.Importance.LOW,
+ "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client");
+ configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT,
+ ConfigDef.Importance.LOW,
+ "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated");
+ configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW,
+ "Prefix string to identify Connector cq's on a Geode server");
+ configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
+ ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll");
+ configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
+ ConfigDef.Importance.MEDIUM,
+ "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue ");
+ configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION,
+ ConfigDef.Importance.MEDIUM,
+ "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq");
return configDef;
}
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
index 11a00f9..5c63d98 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
@@ -138,7 +138,8 @@
public void usesSecurityShouldBeTrueIfSecurityUserSet() {
Map<String, String> props = new HashMap<>();
props.put(SECURITY_USER, "some user");
- GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+ GeodeConnectorConfig config =
+ new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
}
@@ -146,14 +147,16 @@
public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
Map<String, String> props = new HashMap<>();
props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
- GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+ GeodeConnectorConfig config =
+ new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
}
@Test
public void usesSecurityShouldBeFalseIfSecurityUserAndSecurityClientAuthInitNotSet() {
Map<String, String> props = new HashMap<>();
- GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+ GeodeConnectorConfig config =
+ new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertFalse(config.usesSecurity());
}
@@ -161,14 +164,16 @@
public void securityClientAuthInitShouldBeSetIfUserIsSet() {
Map<String, String> props = new HashMap<>();
props.put(SECURITY_USER, "some user");
- GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+ GeodeConnectorConfig config =
+ new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertNotNull(config.getSecurityClientAuthInit());
}
@Test
public void securityClientAuthInitShouldNotBeSetIfUserIsNotSetAndNotSpecificallySet() {
Map<String, String> props = new HashMap<>();
- GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+ GeodeConnectorConfig config =
+ new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertNull(config.getSecurityClientAuthInit());
}
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index b4a7bbe..3afcde7 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -21,8 +21,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.geode.kafka.sink.GeodeKafkaSink;
-import org.geode.kafka.source.GeodeKafkaSource;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -34,6 +32,8 @@
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
+import org.geode.kafka.sink.GeodeKafkaSink;
+import org.geode.kafka.source.GeodeKafkaSource;
public class WorkerAndHerderWrapper {
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 7901426..4fa7d81 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -30,26 +30,20 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Struct;
-import org.apache.geode.cache.query.internal.LinkedStructSet;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.StructImpl;
-import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.geode.kafka.GeodeContext;
import org.junit.Test;
import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.ResultsBag;
public class GeodeKafkaSourceTaskTest {
@@ -149,7 +143,8 @@
GeodeContext geodeContext = mock(GeodeContext.class);
when(geodeContext.getClientCache()).thenReturn(clientCache);
- when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class),
+ anyBoolean())).thenReturn(new ResultsBag());
Map<String, List<String>> regionToTopicsMap = new HashMap<>();
regionToTopicsMap.put("region1", new ArrayList());