Code improvements:

	* Exceptions being thrown as ConnectException
	* Return values are used.
	* static analyzer recommendations are implemented.
diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
index e630b4e..03a6f0a 100644
--- a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
+++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
@@ -28,7 +28,7 @@
   public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
   // Default value = false
   public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
-  private Map<String, String> internalConfig = new HashMap<>();
+  final private Map<String, String> internalConfig = new HashMap<>();
 
   @Override
   public void configure(Map<String, ?> configs, boolean isKey) {
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 7974abd..909cd7c 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -44,8 +44,8 @@
 
   public void addRemoveOperation(SinkRecord record) {
     // if a previous operation added to the update map
-    // let's just remove it so we don't do a put and then a remove
-    // depending on the order of operations (putAll then removeAll or removeAll or putAll)...
+    // let's just remove it so, we don't do a put and then a remove
+    // depending on the order of operations (putAll then removeAll or putAll)...
     // ...we could remove one of the if statements.
     if (updateMap.containsKey(record.key())) {
       updateMap.remove(record.key());
@@ -56,7 +56,7 @@
 
   public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
     // it's assumed the records in are order
-    // if so if a previous value was in the remove list
+    // if so then a previous value was in the remove list
     // let's not remove it at the end of this operation
     if (nullValuesMeansRemove) {
       removeList.remove(record.key());
@@ -70,7 +70,7 @@
       region.putAll(updateMap);
       region.removeAll(removeList);
     } else {
-      logger.info("Unable to locate proxy region is null");
+      logger.info("Unable to locate a proxy region. Value is null");
     }
   }
 }
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1f50ea4..873d89b 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -28,6 +28,7 @@
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.kafka.GeodeContext;
 import org.apache.geode.kafka.Version;
@@ -61,13 +62,20 @@
       GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
-      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(),
-          geodeConnectorConfig.getSecurityUserName(),
-          geodeConnectorConfig.getSecurityPassword(),
-          geodeConnectorConfig.usesSecurity());
+      ClientCache clientCache =
+          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+              geodeConnectorConfig.getSecurityClientAuthInit(),
+              geodeConnectorConfig.getSecurityUserName(),
+              geodeConnectorConfig.getSecurityPassword(),
+              geodeConnectorConfig.usesSecurity());
+      if (clientCache == null) {
+        throw new ConnectException("Unable start client cache in the sink task");
+      }
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
+      if (e instanceof ConnectException) {
+        throw e;
+      }
       throw new ConnectException("Unable to start sink task", e);
     }
   }
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 2ace395..7a60513 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -79,11 +79,7 @@
   }
 
   public boolean getNullValueBehavior() {
-    if (nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE)) {
-      return true;
-    } else {
-      return false;
-    }
+    return nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE);
   }
 
 }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 13e5b60..4ce7371 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,6 +29,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.geode.cache.query.CqQuery;
@@ -72,14 +73,17 @@
       GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
-      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getDurableClientId(),
-          geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(),
-          geodeConnectorConfig.getSecurityUserName(),
-          geodeConnectorConfig.getSecurityPassword(),
-          geodeConnectorConfig.usesSecurity());
-
+      ClientCache clientCache =
+          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+              geodeConnectorConfig.getDurableClientId(),
+              geodeConnectorConfig.getDurableClientTimeout(),
+              geodeConnectorConfig.getSecurityClientAuthInit(),
+              geodeConnectorConfig.getSecurityUserName(),
+              geodeConnectorConfig.getSecurityPassword(),
+              geodeConnectorConfig.usesSecurity());
+      if (clientCache == null) {
+        throw new ConnectException("Unable to create client cache in the source task");
+      }
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
@@ -92,9 +96,12 @@
           loadEntireRegion);
       logger.info("Started Apache Geode source task");
     } catch (Exception e) {
-      e.printStackTrace();
       logger.error("Unable to start source task", e);
-      throw e;
+      if (e instanceof ConnectException) {
+        throw e;
+      } else {
+        throw new ConnectException(e);
+      }
     }
   }
 
diff --git a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
index 1c9e1ff..f7c2a61 100644
--- a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
+++ b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
@@ -28,7 +28,7 @@
   }
 
   public static <T> EnumValidator in(T[] enumerators) {
-    Set<String> validValues = new HashSet<String>(enumerators.length);
+    Set<String> validValues = new HashSet<>(enumerators.length);
     for (T e : enumerators) {
       validValues.add(e.toString().toLowerCase());
     }
diff --git a/src/main/resources/kafka-connect-geode-version.properties b/src/main/resources/kafka-connect-geode-version.properties
index 5ea51b3..5efe2ea 100644
--- a/src/main/resources/kafka-connect-geode-version.properties
+++ b/src/main/resources/kafka-connect-geode-version.properties
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-version=${project.version} 
\ No newline at end of file
+version=${project.version}
\ No newline at end of file
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
index 46470c1..dfb990b 100644
--- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -57,16 +57,16 @@
 
 public class JsonPdxConverterDUnitTest {
   @Rule
-  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+  final public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
 
   @Rule
   public TestName testName = new TestName();
 
   @ClassRule
-  public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
+  final public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
 
   @Rule
-  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+  final public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
 
   @BeforeClass
   public static void setup()
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 411316c..92c78e2 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -71,11 +71,6 @@
   public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
     BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
-    CqResults<Object> fakeInitialResults = new ResultsBag();
-    for (int i = 0; i < 10; i++) {
-      fakeInitialResults.add(mock(CqEvent.class));
-    }
-
     when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
         .thenReturn(mock(CqQuery.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();