Adding the configuration changes as per spec
diff --git a/pom.xml b/pom.xml
index b6dc832..6c2676b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,17 +227,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-plugin.version}</version>
+ <inherited>true</inherited>
<configuration>
<source>1.8</source>
<target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>${maven-plugin.version}</version>
- <inherited>true</inherited>
- <configuration>
<compilerArgs>
<arg>-Xlint:-processing</arg>
<arg>-Xlint:all</arg>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 717fef6..05dd5f2 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -25,6 +25,8 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.geode.annotations.VisibleForTesting;
+
public class GeodeConnectorConfig extends AbstractConfig {
// GeodeKafka Specific Configuration
@@ -35,13 +37,24 @@
/**
* Specifies which Locators to connect to Apache Geode
*/
- public static final String LOCATORS = "locators";
- public static final String DEFAULT_LOCATOR = "localhost[10334]";
- public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+ private static final String LOCATORS = "locators";
+ private static final String DEFAULT_LOCATOR = "localhost[10334]";
+ private static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
private static final String DEFAULT_SECURITY_AUTH_INIT =
"org.apache.geode.kafka.security.SystemPropertyAuthInit";
- public static final String SECURITY_USER = "security-username";
- public static final String SECURITY_PASSWORD = "security-password";
+ private static final String SECURITY_USER = "security-username";
+ private static final String SECURITY_PASSWORD = "security-password";
+ private static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
+ private static final String
+ LOCATORS_DOCUMENTATION =
+ "A comma separated string of locators that configure which locators to connect to";
+ private static final String
+ SECURITY_USER_DOCUMENTATION =
+ "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
+ private static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
+ private static final String
+ SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
+ "Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();
@@ -52,7 +65,7 @@
private String securityUserName;
private String securityPassword;
- // Just for testing
+ @VisibleForTesting
protected GeodeConnectorConfig() {
super(new ConfigDef(), new HashMap<>());
taskId = 0;
@@ -75,17 +88,38 @@
protected static ConfigDef configurables() {
ConfigDef configDef = new ConfigDef();
- configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,
- "Internally used to identify each task");
- configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH,
- "A comma separated string of locators that configure which locators to connect to");
- configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
- "Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user");
- configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
- "Supply a password to be used to authenticate with Geode");
- configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
+ configDef.define(
+ TASK_ID,
+ ConfigDef.Type.INT,
+ "0",
+ ConfigDef.Importance.MEDIUM,
+ TASK_ID_DOCUMENTATION);
+ configDef.define(
+ LOCATORS,
+ ConfigDef.Type.STRING,
+ DEFAULT_LOCATOR,
ConfigDef.Importance.HIGH,
- "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
+ LOCATORS_DOCUMENTATION);
+ configDef.define(
+ SECURITY_USER,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.HIGH,
+ SECURITY_USER_DOCUMENTATION);
+
+ configDef.define(
+ SECURITY_PASSWORD,
+ ConfigDef.Type.PASSWORD,
+ null,
+ ConfigDef.Importance.HIGH,
+ SECURITY_PASSWORD_DOCUMENTATION);
+
+ configDef.define(
+ SECURITY_CLIENT_AUTH_INIT,
+ ConfigDef.Type.PASSWORD,
+ null,
+ ConfigDef.Importance.HIGH,
+ SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION);
return configDef;
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index cd49778..46cd6c0 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -25,10 +25,16 @@
public static final ConfigDef SINK_CONFIG_DEF = configurables();
// Used by sink
- public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
- public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
- public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
- public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+ private static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
+ private static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
+ private static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
+ private static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+ private static final String
+ NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
+ "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
+ private static final String
+ TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
+ "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
private final Map<String, List<String>> topicToRegions;
private final boolean nullValuesMeanRemove;
@@ -41,12 +47,19 @@
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
- configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
- DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
- "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]");
- configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
- DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
- "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region");
+ configDef.define(
+ TOPIC_TO_REGION_BINDINGS,
+ ConfigDef.Type.STRING,
+ DEFAULT_TOPIC_TO_REGION_BINDING,
+ ConfigDef.Importance.HIGH,
+ TOPIC_TO_REGION_BINDINGS_DOCUMENTATION);
+
+ configDef.define(
+ NULL_VALUES_MEAN_REMOVE,
+ ConfigDef.Type.BOOLEAN,
+ DEFAULT_NULL_VALUES_MEAN_REMOVE,
+ ConfigDef.Importance.MEDIUM,
+ NULL_VALUES_MEAN_REMOVE_DOCUMENTATION);
return configDef;
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index 01294c9..05d4fd6 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -53,6 +53,27 @@
public static final String LOAD_ENTIRE_REGION = "load-entire-region";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+ private static final String
+ CQS_TO_REGISTER_DOCUMENTATION =
+ "Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
+ private static final String
+ REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
+ "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
+ private static final String
+ DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
+ "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client";
+ private static final String
+ LOAD_ENTIRE_REGION_DOCUMENTATION =
+ "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a CQ";
+ private static final String
+ DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
+ "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
+ private static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
+ private static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
+ private static final String
+ QUEUE_SIZE_DOCUMENTATION =
+ "Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
+
private final String durableClientId;
private final String durableClientTimeout;
private final String cqPrefix;
@@ -82,27 +103,60 @@
protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
- configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
- "Internally created and used parameter, for signalling a task to register cqs");
- configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
- DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
- "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"");
- configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID,
+
+ configDef.define(
+ CQS_TO_REGISTER,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.HIGH,
+ CQS_TO_REGISTER_DOCUMENTATION);
+
+ configDef.define(
+ REGION_TO_TOPIC_BINDINGS,
+ ConfigDef.Type.STRING,
+ DEFAULT_REGION_TO_TOPIC_BINDING,
+ ConfigDef.Importance.HIGH,
+ REGION_TO_TOPIC_BINDINGS_DOCUMENTATION);
+
+ configDef.define(
+ DURABLE_CLIENT_ID_PREFIX,
+ ConfigDef.Type.STRING,
+ DEFAULT_DURABLE_CLIENT_ID,
ConfigDef.Importance.LOW,
- "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client");
- configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT,
+ DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION);
+
+ configDef.define(DURABLE_CLIENT_TIME_OUT,
+ ConfigDef.Type.STRING,
+ DEFAULT_DURABLE_CLIENT_TIMEOUT,
ConfigDef.Importance.LOW,
- "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated");
- configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW,
- "Prefix string to identify Connector cq's on a Geode server");
- configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
- ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll");
- configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
+ DURABLE_CLIENT_TIME_OUT_DOCUMENTATION);
+
+ configDef.define(CQ_PREFIX,
+ ConfigDef.Type.STRING,
+ DEFAULT_CQ_PREFIX,
+ ConfigDef.Importance.LOW,
+ CQ_PREFIX_DOCUMENTATION);
+
+ configDef.define(
+ BATCH_SIZE,
+ ConfigDef.Type.INT,
+ DEFAULT_BATCH_SIZE,
ConfigDef.Importance.MEDIUM,
- "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue ");
- configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION,
+ BATCH_SIZE_DOCUMENTATION);
+
+ configDef.define(
+ QUEUE_SIZE,
+ ConfigDef.Type.INT,
+ DEFAULT_QUEUE_SIZE,
ConfigDef.Importance.MEDIUM,
- "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq");
+ QUEUE_SIZE_DOCUMENTATION);
+
+ configDef.define(LOAD_ENTIRE_REGION,
+ ConfigDef.Type.BOOLEAN,
+ DEFAULT_LOAD_ENTIRE_REGION,
+ ConfigDef.Importance.MEDIUM,
+ LOAD_ENTIRE_REGION_DOCUMENTATION);
+
return configDef;
}