Modified initial results to consume the correct type
Modified security
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index c09f1cf..2860a8f 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -38,13 +38,13 @@
   public static final String DEFAULT_LOCATOR = "localhost[10334]";
   public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
   private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
-  public static final String SECURITY_USER = "securityUsername";
-  public static final String SECURITY_PASSWORD= "securityPassword";
+  public static final String SECURITY_USER = "security-username";
+  public static final String SECURITY_PASSWORD= "security-password";
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
   private String securityClientAuthInit;
-  private String securityUser;
+  private String securityUserName;
   private String securityPassword;
 
   //Just for testing
@@ -64,7 +64,7 @@
     super(configDef, connectorProperties);
     taskId = getInt(TASK_ID);
     locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
-    securityUser = getString(SECURITY_USER);
+    securityUserName = getString(SECURITY_USER);
     securityPassword = getString(SECURITY_PASSWORD);
     securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
     //if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
@@ -160,7 +160,15 @@
     return securityClientAuthInit;
   }
 
+  public String getSecurityUserName() {
+    return securityUserName;
+  }
+
+  public String getSecurityPassword() {
+    return securityPassword;
+  }
+
   public boolean usesSecurity() {
-    return securityClientAuthInit != null || securityUser != null;
+    return securityClientAuthInit != null || securityUserName != null;
   }
 }
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java
index 2858e4d..6190ef2 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -17,6 +17,7 @@
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.geode.cache.query.CqResults;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import org.apache.geode.cache.client.ClientCache;
@@ -28,6 +29,8 @@
 import org.apache.geode.cache.query.RegionNotFoundException;
 
 import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
 
 public class GeodeContext {
 
@@ -37,15 +40,15 @@
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String securityAuthInit, boolean usesSecurity) {
+      String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
-        securityAuthInit, usesSecurity);
+        securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit, boolean usesSecurity) {
-    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, usesSecurity);
+      String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
+    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
@@ -54,10 +57,14 @@
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
-      String durableClientTimeOut, String securityAuthInit, boolean usesSecurity) {
+      String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
-    if (usesSecurity ) {
+    if (usesSecurity) {
+      if (securityUserName != null && securityPassword != null) {
+        ccf.set(SECURITY_USER, securityUserName);
+        ccf.set(SECURITY_PASSWORD, securityPassword);
+      }
       ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
     }
     if (!durableClientName.equals("")) {
@@ -85,8 +92,8 @@
     }
   }
 
-  public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
-      boolean isDurable) throws ConnectException {
+  public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
+                                                   boolean isDurable) throws ConnectException {
     try {
       CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
       return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index db48699..cc525a2 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -27,8 +27,8 @@
   public Properties getCredentials(Properties securityProps, DistributedMember server,
       boolean isPeer) throws AuthenticationFailedException {
     Properties extractedProperties = new Properties();
-    extractedProperties.put("security-username", System.getProperty(GeodeConnectorConfig.SECURITY_USER));
-    extractedProperties.put("security-password", System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
+    extractedProperties.put("security-username", securityProps.get("security-username"));
+    extractedProperties.put("security-password", securityProps.get("security-password"));
     return extractedProperties;
   }
 }
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 4552e09..7db384f 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,7 +61,7 @@
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/source/GeodeEvent.java b/src/main/java/org/geode/kafka/source/GeodeEvent.java
index 2333955..5b51d07 100644
--- a/src/main/java/org/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/geode/kafka/source/GeodeEvent.java
@@ -22,18 +22,29 @@
 public class GeodeEvent {
 
   private String regionName;
-  private CqEvent event;
+  private Object key;
+  private Object value;
 
   public GeodeEvent(String regionName, CqEvent event) {
+    this(regionName, event.getKey(), event.getNewValue());
+  }
+
+  public GeodeEvent(String regionName, Object key, Object value) {
     this.regionName = regionName;
-    this.event = event;
+    this.key = key;
+    this.value = value;
   }
 
   public String getRegionName() {
     return regionName;
   }
 
-  public CqEvent getEvent() {
-    return event;
+  public Object getKey() {
+    return key;
   }
+
+  public Object getValue() {
+    return value;
+  }
+
 }
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b829335..b1c289f 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,6 +21,8 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
 import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -68,7 +70,7 @@
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
           geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
 
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -98,7 +100,7 @@
         List<String> topics = regionToTopics.get(regionName);
         for (String topic : topics) {
           records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
-              null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
+              null, event.getKey(), null, event.getValue()));
         }
       }
       return records;
@@ -134,12 +136,11 @@
     CqAttributes cqAttributes = cqAttributesFactory.create();
     try {
       if (loadEntireRegion) {
-        Collection<CqEvent> events =
+        CqResults events =
             geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
                 "select * from /" + regionName, cqAttributes,
                 isDurable);
-        eventBuffer.get().addAll(
-            events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+        eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
       } else {
         geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
             "select * from /" + regionName, cqAttributes,
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 29a901a..7901426 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -36,6 +36,15 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.LinkedStructSet;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.StructImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.geode.kafka.GeodeContext;
 import org.junit.Test;
 
@@ -52,9 +61,9 @@
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
     boolean loadEntireRegion = true;
     boolean isDurable = false;
-    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
-      fakeInitialResults.add(mock(CqEvent.class));
+      fakeInitialResults.add(mock(Struct.class));
     }
 
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
@@ -71,7 +80,7 @@
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
     boolean loadEntireRegion = false;
     boolean isDurable = false;
-    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    CqResults fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(CqEvent.class));
     }
@@ -92,7 +101,7 @@
     boolean isDurable = false;
 
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
-        .thenReturn(new ArrayList());
+        .thenReturn(mock(CqResults.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     GeodeKafkaSourceListener listener =
         task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
@@ -140,7 +149,7 @@
 
     GeodeContext geodeContext = mock(GeodeContext.class);
     when(geodeContext.getClientCache()).thenReturn(clientCache);
-
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
     regionToTopicsMap.put("region1", new ArrayList());