Removed system outs
Passing through the security auth init property
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index 88f5a02..ac9a31f 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -32,9 +32,11 @@
*/
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
+ public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
+ private String securityClientAuthInit;
protected GeodeConnectorConfig() {
taskId = 0;
@@ -43,6 +45,7 @@
public GeodeConnectorConfig(Map<String, String> connectorProperties) {
taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+ securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
}
@@ -116,4 +119,8 @@
public List<LocatorHostPort> getLocatorHostPorts() {
return locatorHostPorts;
}
+
+ public String getSecurityClientAuthInit() {
+ return securityClientAuthInit;
+ }
}
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index 2078782..ff8a8c3 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -26,6 +26,8 @@
import java.util.Collection;
import java.util.List;
+import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
public class GeodeContext {
private ClientCache clientCache;
@@ -34,13 +36,13 @@
public GeodeContext() {
}
- public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout) {
- clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout);
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout, String securityAuthInit) {
+ clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit);
return clientCache;
}
- public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList) {
- clientCache = createClientCache(locatorHostPortList, "", "");
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String securityAuthInit) {
+ clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
return clientCache;
}
@@ -55,8 +57,12 @@
* @param durableClientTimeOut
* @return
*/
- public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
+ public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut, String securityAuthInit) {
ClientCacheFactory ccf = new ClientCacheFactory();
+
+ if (securityAuthInit != null) {
+ ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
+ }
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut);
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index a1d7ecf..66e528f 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -58,12 +58,11 @@
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts());
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getSecurityClientAuthInit());
topicToRegions = geodeConnectorConfig.getTopicToRegions();
regionNameToRegion = createProxyRegions(topicToRegions.values());
nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
} catch (Exception e) {
- e.printStackTrace();
logger.error("Unable to start sink task", e);
throw e;
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 0f6b50c..8d827a1 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -68,11 +68,10 @@
@Override
public void start(Map<String, String> props) {
try {
- System.out.println("JASON start task");
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout());
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
@@ -86,12 +85,9 @@
boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
} catch (Exception e) {
- System.out.println("JASON start task failed" + e);
- e.printStackTrace();
logger.error("Unable to start source task", e);
throw e;
}
- System.out.println("JASON end task");
}