Moved all string contansts to separate variables and classes
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 05dd5f2..586817a 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -14,6 +14,19 @@
*/
package org.apache.geode.kafka;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.DEFAULT_LOCATOR;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.DEFAULT_SECURITY_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID_DOCUMENTATION;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -23,6 +36,7 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.geode.annotations.VisibleForTesting;
@@ -30,40 +44,14 @@
public class GeodeConnectorConfig extends AbstractConfig {
// GeodeKafka Specific Configuration
- /**
- * Identifier for each task
- */
- public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
- /**
- * Specifies which Locators to connect to Apache Geode
- */
- private static final String LOCATORS = "locators";
- private static final String DEFAULT_LOCATOR = "localhost[10334]";
- private static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
- private static final String DEFAULT_SECURITY_AUTH_INIT =
- "org.apache.geode.kafka.security.SystemPropertyAuthInit";
- private static final String SECURITY_USER = "security-username";
- private static final String SECURITY_PASSWORD = "security-password";
- private static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
- private static final String
- LOCATORS_DOCUMENTATION =
- "A comma separated string of locators that configure which locators to connect to";
- private static final String
- SECURITY_USER_DOCUMENTATION =
- "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
- private static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
- private static final String
- SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
- "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
-
public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
- private String securityClientAuthInit;
+ private Password securityClientAuthInit;
private String securityUserName;
- private String securityPassword;
+ private Password securityPassword;
@VisibleForTesting
protected GeodeConnectorConfig() {
@@ -74,10 +62,11 @@
public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorProperties) {
super(configDef, connectorProperties);
taskId = getInt(TASK_ID);
- locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
+ locatorHostPorts = parseLocators(getString(LOCATORS));
securityUserName = getString(SECURITY_USER);
- securityPassword = getString(SECURITY_PASSWORD);
- securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
+ securityPassword = getPassword(SECURITY_PASSWORD);
+ securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
+// System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
// if we registered a username/password instead of auth init, we should use the default auth
// init if one isn't specified
if (usesSecurity()) {
@@ -196,7 +185,7 @@
}
public String getSecurityClientAuthInit() {
- return securityClientAuthInit;
+ return securityClientAuthInit == null ? null : securityClientAuthInit.value();
}
public String getSecurityUserName() {
@@ -204,7 +193,7 @@
}
public String getSecurityPassword() {
- return securityPassword;
+ return securityPassword == null ? null : securityPassword.value();
}
public boolean usesSecurity() {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 6be418f..418a476 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -14,9 +14,9 @@
*/
package org.apache.geode.kafka;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
import java.util.List;
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
index edf2f97..b09ba87 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.kafka.sink;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -23,8 +25,6 @@
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
public class GeodeKafkaSink extends SinkConnector {
private Map<String, String> sharedProps;
@@ -46,7 +46,7 @@
// have no control over partitioning in kafka and which tasks will fire
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>(sharedProps);
- taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+ taskProps.put(TASK_ID, "" + i);
taskConfigs.add(taskProps);
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 46cd6c0..cd12429 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,6 +14,13 @@
*/
package org.apache.geode.kafka.sink;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.DEFAULT_TOPIC_TO_REGION_BINDING;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS_DOCUMENTATION;
+
import java.util.List;
import java.util.Map;
@@ -23,19 +30,6 @@
public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SINK_CONFIG_DEF = configurables();
-
- // Used by sink
- private static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
- private static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
- private static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
- private static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
- private static final String
- NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
- "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
- private static final String
- TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
- "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
-
private final Map<String, List<String>> topicToRegions;
private final boolean nullValuesMeanRemove;
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index ca6dd0c..cdbe60f 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -15,6 +15,9 @@
package org.apache.geode.kafka.source;
import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
import java.util.ArrayList;
import java.util.HashMap;
@@ -45,13 +48,13 @@
List<String> bindings =
GeodeConnectorConfig
.parseStringByComma(
- sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ sharedProps.get(REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>(sharedProps);
- taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(GeodeSourceConnectorConfig.CQS_TO_REGISTER,
+ taskProps.put(TASK_ID, "" + i);
+ taskProps.put(CQS_TO_REGISTER,
GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 182efff..24cd531 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.kafka.source;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_PARTITION;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -66,6 +68,7 @@
@Override
public void start(Map<String, String> props) {
try {
+ System.out.println("NABA ::" + props);
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
@@ -175,9 +178,9 @@
Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put(GeodeSourceConnectorConfig.REGION_PARTITION, regionName);
+ sourcePartition.put(REGION_PARTITION, regionName);
return sourcePartition;
- }).collect(Collectors.toMap(s -> s.get(GeodeSourceConnectorConfig.REGION_PARTITION), s -> s));
+ }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
}
String generateCqName(int taskId, String cqPrefix, String regionName) {
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index 05d4fd6..9b10970 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -14,6 +14,30 @@
*/
package org.apache.geode.kafka.source;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.BATCH_SIZE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQ_PREFIX_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_LOAD_ENTIRE_REGION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_QUEUE_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_REGION_TO_TOPIC_BINDING;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_TIME_OUT_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.LOAD_ENTIRE_REGION_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.QUEUE_SIZE_DOCUMENTATION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS_DOCUMENTATION;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -26,53 +50,6 @@
public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
- // Geode Configuration
- public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
- public static final String DEFAULT_DURABLE_CLIENT_ID = "";
- public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
- public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
- public static final String CQ_PREFIX = "cq-prefix";
- public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
-
- /**
- * Used as a key for source partitions
- */
- public static final String REGION_PARTITION = "regionPartition";
- public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
- public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
- public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
- // task will register a cq
-
- public static final String BATCH_SIZE = "geode-connector-batch-size";
- public static final String DEFAULT_BATCH_SIZE = "100";
-
- public static final String QUEUE_SIZE = "geode-connector-queue-size";
- public static final String DEFAULT_QUEUE_SIZE = "10000";
-
- public static final String LOAD_ENTIRE_REGION = "load-entire-region";
- public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
-
- private static final String
- CQS_TO_REGISTER_DOCUMENTATION =
- "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
- private static final String
- REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
- "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
- private static final String
- DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
- "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client";
- private static final String
- LOAD_ENTIRE_REGION_DOCUMENTATION =
- "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a CQ";
- private static final String
- DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
- "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
- private static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
- private static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
- private static final String
- QUEUE_SIZE_DOCUMENTATION =
- "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
private final String durableClientId;
private final String durableClientTimeout;
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
new file mode 100644
index 0000000..a4a6811
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
@@ -0,0 +1,30 @@
+package org.apache.geode.kafka.utils;
+
+import org.apache.kafka.common.config.types.Password;
+
+public class GeodeConfigurationConstants {
+ /**
+ * GEODE SPECIFIC CONFIGURATION
+ */
+ //Identifier for each task
+ public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
+ //Specifies which Locators to connect to Apache Geode
+ public static final String LOCATORS = "locators";
+ public static final String DEFAULT_LOCATOR = "localhost[10334]";
+ public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+ public static final Password DEFAULT_SECURITY_AUTH_INIT =
+ new Password("org.apache.geode.kafka.security.SystemPropertyAuthInit");
+ public static final String SECURITY_USER = "security-username";
+ public static final String SECURITY_PASSWORD = "security-password";
+ public static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
+ public static final String
+ LOCATORS_DOCUMENTATION =
+ "A comma separated string of locators that configure which locators to connect to";
+ public static final String
+ SECURITY_USER_DOCUMENTATION =
+ "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
+ public static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
+ public static final String
+ SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
+ "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
+}
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
new file mode 100644
index 0000000..e3e3a6f
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
@@ -0,0 +1,17 @@
+package org.apache.geode.kafka.utils;
+
+public class GeodeSinkConfigurationConstants {
+ /**
+ * SINK SPECIFIC CONFIGURATION
+ */
+ public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
+ public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
+ public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
+ public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+ public static final String
+ NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
+ "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
+ public static final String
+ TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
+ "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
+}
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
new file mode 100644
index 0000000..52f4a84
--- /dev/null
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
@@ -0,0 +1,46 @@
+package org.apache.geode.kafka.utils;
+
+public class GeodeSourceConfigurationConstants {
+ /**
+ * SOURCE SPECIFIC CONFIGURATIONS
+ */
+ // Geode Configuration
+ public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix";
+ public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+ public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout";
+ public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+ public static final String CQ_PREFIX = "cq-prefix";
+ public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+ //Used as a key for source partitions
+ public static final String REGION_PARTITION = "regionPartition";
+ public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
+ public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
+ public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1
+ // task will register a cq
+ public static final String BATCH_SIZE = "geode-connector-batch-size";
+ public static final String DEFAULT_BATCH_SIZE = "100";
+ public static final String QUEUE_SIZE = "geode-connector-queue-size";
+ public static final String DEFAULT_QUEUE_SIZE = "10000";
+ public static final String LOAD_ENTIRE_REGION = "load-entire-region";
+ public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+ public static final String
+ CQS_TO_REGISTER_DOCUMENTATION =
+ "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
+ public static final String
+ REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
+ "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
+ public static final String
+ DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
+ "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client";
+ public static final String
+ LOAD_ENTIRE_REGION_DOCUMENTATION =
+ "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a CQ";
+ public static final String
+ DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
+ "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
+ public static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
+ public static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
+ public static final String
+ QUEUE_SIZE_DOCUMENTATION =
+ "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
+}
diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 4461dc4..ccc946f 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -16,6 +16,8 @@
import static org.apache.geode.kafka.GeodeConnectorConfig.parseStringByComma;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_CLIENT_AUTH_INIT;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -134,7 +136,7 @@
@Test
public void usesSecurityShouldBeTrueIfSecurityUserSet() {
Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
+ props.put(SECURITY_USER, "some user");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
@@ -143,7 +145,7 @@
@Test
public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, "some_class");
+ props.put(SECURITY_CLIENT_AUTH_INIT, "some_class");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertTrue(config.usesSecurity());
@@ -160,7 +162,7 @@
@Test
public void securityClientAuthInitShouldBeSetIfUserIsSet() {
Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.SECURITY_USER, "some user");
+ props.put(SECURITY_USER, "some user");
GeodeConnectorConfig config =
new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
assertNotNull(config.getSecurityClientAuthInit());
diff --git a/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
index 1ab05cc..648d77e 100644
--- a/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
+++ b/src/test/java/org/apache/geode/kafka/security/SystemPropertyAuthInitTest.java
@@ -14,8 +14,8 @@
*/
package org.apache.geode.kafka.security;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.apache.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_PASSWORD;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.SECURITY_USER;
import static org.junit.Assert.assertEquals;
import java.util.Properties;
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index 85b9954..c5e5056 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -14,8 +14,10 @@
*/
package org.apache.geode.kafka.sink;
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.NULL_VALUES_MEAN_REMOVE;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -31,16 +33,15 @@
import org.junit.Test;
import org.apache.geode.cache.Region;
-import org.apache.geode.kafka.GeodeConnectorConfig;
public class GeodeKafkaSinkTaskTest {
private HashMap<String, String> createTestSinkProps() {
HashMap<String, String> props = new HashMap<>();
props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
- props.put(GeodeConnectorConfig.TASK_ID, "0");
+ props.put(TASK_ID, "0");
props.put(NULL_VALUES_MEAN_REMOVE, "true");
- props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]");
+ props.put(LOCATORS, "localhost[10334]");
return props;
}
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
index d3974cb..170bbbb 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -14,7 +14,7 @@
*/
package org.apache.geode.kafka.sink;
-import static org.apache.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -25,7 +25,7 @@
import org.junit.Test;
-import org.apache.geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.kafka.utils.GeodeConfigurationConstants;
public class GeodeKafkaSinkTest {
@@ -65,7 +65,7 @@
Collection<Map<String, String>> tasks = sink.taskConfigs(5);
HashSet<String> seenIds = new HashSet<>();
for (Map<String, String> taskProp : tasks) {
- assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
+ assertTrue(seenIds.add(taskProp.get(GeodeConfigurationConstants.TASK_ID)));
}
}
}
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index a85786b..411316c 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -14,8 +14,8 @@
*/
package org.apache.geode.kafka.source;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DEFAULT_CQ_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_PARTITION;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
index 3019101..02f2790 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.kafka.source;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -25,8 +26,6 @@
import org.junit.Test;
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
public class GeodeKafkaSourceTest {
@Test
@@ -60,7 +59,7 @@
Collection<Map<String, String>> tasks = sink.taskConfigs(5);
HashSet<String> seenIds = new HashSet<>();
for (Map<String, String> taskProp : tasks) {
- assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
+ assertTrue(seenIds.add(taskProp.get(TASK_ID)));
}
}
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index d543d23..ea43585 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -14,7 +14,9 @@
*/
package org.apache.geode.kafka.source;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.DURABLE_CLIENT_ID_PREFIX;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
@@ -22,16 +24,14 @@
import org.junit.Test;
-import org.apache.geode.kafka.GeodeConnectorConfig;
-
public class GeodeSourceConnectorConfigTest {
@Test
public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
Map<String, String> props = new HashMap<>();
- props.put(GeodeConnectorConfig.TASK_ID, "0");
+ props.put(TASK_ID, "0");
props.put(DURABLE_CLIENT_ID_PREFIX, "");
- props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]");
+ props.put(LOCATORS, "localhost[10334]");
GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
assertEquals("", config.getDurableClientId());
}
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 5c7a7e4..8e39e6c 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.kafka.utilities;
-import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.LOCATORS;
+import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,10 +34,10 @@
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
-import org.apache.geode.kafka.GeodeConnectorConfig;
import org.apache.geode.kafka.sink.GeodeKafkaSink;
-import org.apache.geode.kafka.sink.GeodeSinkConnectorConfig;
import org.apache.geode.kafka.source.GeodeKafkaSource;
+import org.apache.geode.kafka.utils.GeodeConfigurationConstants;
+import org.apache.geode.kafka.utils.GeodeSinkConfigurationConstants;
public class WorkerAndHerderWrapper {
@@ -80,7 +81,7 @@
props.putAll(valueConverterProps);
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");
- props.put(GeodeConnectorConfig.LOCATORS, locatorString);
+ props.put(LOCATORS, locatorString);
WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
@@ -98,7 +99,7 @@
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sourceProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+ sourceProps.put(GeodeConfigurationConstants.LOCATORS, locatorString);
sourceProps.put(REGION_TO_TOPIC_BINDINGS, regionToTopicBinding);
herder.putConnectorConfig(
@@ -110,8 +111,8 @@
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sinkProps.put(GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
- sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+ sinkProps.put(GeodeSinkConfigurationConstants.TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
+ sinkProps.put(GeodeConfigurationConstants.LOCATORS, locatorString);
sinkProps.put("topics", sinkTopic);
herder.putConnectorConfig(