moved cache creation errors to GeodeContext
added info logging
removed some debug logging
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 586817a..3c218d4 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -66,7 +66,6 @@
securityUserName = getString(SECURITY_USER);
securityPassword = getPassword(SECURITY_PASSWORD);
securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
-// System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
// if we registered a username/password instead of auth init, we should use the default auth
// init if one isn't specified
if (usesSecurity()) {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 418a476..5e377e3 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -61,28 +61,33 @@
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit, String securityUserName,
String securityPassword, boolean usesSecurity) {
- ClientCacheFactory ccf = new ClientCacheFactory();
+ try {
+ ClientCacheFactory ccf = new ClientCacheFactory();
- ccf.setPdxReadSerialized(true);
- if (usesSecurity) {
- if (securityUserName != null && securityPassword != null) {
- ccf.set(SECURITY_USER, securityUserName);
- ccf.set(SECURITY_PASSWORD, securityPassword);
+ ccf.setPdxReadSerialized(true);
+ if (usesSecurity) {
+ if (securityUserName != null && securityPassword != null) {
+ ccf.set(SECURITY_USER, securityUserName);
+ ccf.set(SECURITY_PASSWORD, securityPassword);
+ }
+ ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
- ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
- }
- if (!durableClientName.equals("")) {
- ccf.set("durable-client-id", durableClientName)
- .set("durable-client-timeout", durableClientTimeOut);
- }
- // currently we only allow using the default pool.
- // If we ever want to allow adding multiple pools we'll have to configure pool factories
- ccf.setPoolSubscriptionEnabled(true);
+ if (!durableClientName.equals("")) {
+ ccf.set("durable-client-id", durableClientName)
+ .set("durable-client-timeout", durableClientTimeOut);
+ }
+ // currently we only allow using the default pool.
+ // If we ever want to allow adding multiple pools we'll have to configure pool factories
+ ccf.setPoolSubscriptionEnabled(true);
- for (LocatorHostPort locator : locators) {
- ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+ for (LocatorHostPort locator : locators) {
+ ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+ }
+ return ccf.create();
+ } catch (Exception e) {
+ throw new ConnectException(
+ "Unable to create an client cache connected to Apache Geode cluster");
}
- return ccf.create();
}
public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
@@ -106,4 +111,8 @@
throw new ConnectException(e);
}
}
+
+ public void close(boolean keepAlive) {
+ clientCache.close(keepAlive);
+ }
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1688eb7..ef95450 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,16 +61,11 @@
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
- final ClientCache clientCache =
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
- if (clientCache == null) {
- throw new ConnectException(
- "Unable to create a client cache connected to the Apache Geode cluster");
- }
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -149,7 +144,7 @@
@Override
public void stop() {
- geodeContext.getClientCache().close(false);
+ geodeContext.close(false);
}
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index 8a54766..f2f3142 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -51,7 +51,7 @@
TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
- ex.printStackTrace();
+ logger.info("Thread interrupted while updating buffer", ex);
}
logger.info("GeodeKafkaSource Queue is full");
}
@@ -66,6 +66,7 @@
@Override
public void onCqDisconnected() {
// we should probably redistribute or reconnect
+ logger.info("cq has been disconnected");
}
@Override
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 24cd531..a04ea5e 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
@@ -68,22 +67,17 @@
@Override
public void start(Map<String, String> props) {
try {
- System.out.println("NABA ::" + props);
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
- final ClientCache clientCache =
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
- if (clientCache == null) {
- throw new ConnectException(
- "Unable to create an client cache connected to Apache Geode cluster");
- }
+
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -122,7 +116,7 @@
@Override
public void stop() {
- geodeContext.getClientCache().close(true);
+ geodeContext.close(true);
}
void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,