Modified initial results to consume the correct type
Modified security
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index c09f1cf..2860a8f 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -38,13 +38,13 @@
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
- public static final String SECURITY_USER = "securityUsername";
- public static final String SECURITY_PASSWORD= "securityPassword";
+ public static final String SECURITY_USER = "security-username";
+ public static final String SECURITY_PASSWORD= "security-password";
protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
- private String securityUser;
+ private String securityUserName;
private String securityPassword;
//Just for testing
@@ -64,7 +64,7 @@
super(configDef, connectorProperties);
taskId = getInt(TASK_ID);
locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
- securityUser = getString(SECURITY_USER);
+ securityUserName = getString(SECURITY_USER);
securityPassword = getString(SECURITY_PASSWORD);
securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
//if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
@@ -160,7 +160,15 @@
return securityClientAuthInit;
}
+ public String getSecurityUserName() {
+ return securityUserName;
+ }
+
+ public String getSecurityPassword() {
+ return securityPassword;
+ }
+
public boolean usesSecurity() {
- return securityClientAuthInit != null || securityUser != null;
+ return securityClientAuthInit != null || securityUserName != null;
}
}
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java
index 2858e4d..6190ef2 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.List;
+import org.apache.geode.cache.query.CqResults;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.geode.cache.client.ClientCache;
@@ -28,6 +29,8 @@
import org.apache.geode.cache.query.RegionNotFoundException;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
public class GeodeContext {
@@ -37,15 +40,15 @@
public GeodeContext() {}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String durableClientId, String durableClientTimeout, String securityAuthInit, boolean usesSecurity) {
+ String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
- securityAuthInit, usesSecurity);
+ securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String securityAuthInit, boolean usesSecurity) {
- clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, usesSecurity);
+ String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+ clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}
@@ -54,10 +57,14 @@
}
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
- String durableClientTimeOut, String securityAuthInit, boolean usesSecurity) {
+ String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();
- if (usesSecurity ) {
+ if (usesSecurity) {
+ if (securityUserName != null && securityPassword != null) {
+ ccf.set(SECURITY_USER, securityUserName);
+ ccf.set(SECURITY_PASSWORD, securityPassword);
+ }
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
if (!durableClientName.equals("")) {
@@ -85,8 +92,8 @@
}
}
- public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
- boolean isDurable) throws ConnectException {
+ public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
+ boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index db48699..cc525a2 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -27,8 +27,8 @@
public Properties getCredentials(Properties securityProps, DistributedMember server,
boolean isPeer) throws AuthenticationFailedException {
Properties extractedProperties = new Properties();
- extractedProperties.put("security-username", System.getProperty(GeodeConnectorConfig.SECURITY_USER));
- extractedProperties.put("security-password", System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
+ extractedProperties.put("security-username", securityProps.get("security-username"));
+ extractedProperties.put("security-password", securityProps.get("security-password"));
return extractedProperties;
}
}
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 4552e09..7db384f 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,7 +61,7 @@
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java b/src/main/java/org/geode/kafka/source/GeodeEvent.java
index 2333955..5b51d07 100644
--- a/src/main/java/org/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/geode/kafka/source/GeodeEvent.java
@@ -22,18 +22,29 @@
public class GeodeEvent {
private String regionName;
- private CqEvent event;
+ private Object key;
+ private Object value;
public GeodeEvent(String regionName, CqEvent event) {
+ this(regionName, event.getKey(), event.getNewValue());
+ }
+
+ public GeodeEvent(String regionName, Object key, Object value) {
this.regionName = regionName;
- this.event = event;
+ this.key = key;
+ this.value = value;
}
public String getRegionName() {
return regionName;
}
- public CqEvent getEvent() {
- return event;
+ public Object getKey() {
+ return key;
}
+
+ public Object getValue() {
+ return value;
+ }
+
}
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b829335..b1c289f 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,6 +21,8 @@
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -68,7 +70,7 @@
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
- geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -98,7 +100,7 @@
List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
- null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
+ null, event.getKey(), null, event.getValue()));
}
}
return records;
@@ -134,12 +136,11 @@
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
if (loadEntireRegion) {
- Collection<CqEvent> events =
+ CqResults events =
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
- eventBuffer.get().addAll(
- events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+ eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 29a901a..7901426 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -36,6 +36,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.LinkedStructSet;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.StructImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.geode.kafka.GeodeContext;
import org.junit.Test;
@@ -52,9 +61,9 @@
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = true;
boolean isDurable = false;
- List<CqEvent> fakeInitialResults = new LinkedList<>();
+ CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
- fakeInitialResults.add(mock(CqEvent.class));
+ fakeInitialResults.add(mock(Struct.class));
}
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
@@ -71,7 +80,7 @@
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = false;
boolean isDurable = false;
- List<CqEvent> fakeInitialResults = new LinkedList<>();
+ CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
}
@@ -92,7 +101,7 @@
boolean isDurable = false;
when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
- .thenReturn(new ArrayList());
+ .thenReturn(mock(CqResults.class));
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
GeodeKafkaSourceListener listener =
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
@@ -140,7 +149,7 @@
GeodeContext geodeContext = mock(GeodeContext.class);
when(geodeContext.getClientCache()).thenReturn(clientCache);
-
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
Map<String, List<String>> regionToTopicsMap = new HashMap<>();
regionToTopicsMap.put("region1", new ArrayList());