Addition of rat and spolessApply

	* All offending files were also corrected.
diff --git a/build.gradle b/build.gradle
index 114b3cd..124ef4b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,9 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+buildscript {
+    repositories {
+        maven { url "https://plugins.gradle.org/m2/" }
+        maven { url "https://dl.bintray.com/palantir/releases" }
+        jcenter()
+    }
+    dependencies {
+        classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.21.1'
+        classpath 'com.github.ben-manes:gradle-versions-plugin:0.21.0'
+        classpath 'com.netflix.nebula:gradle-lint-plugin:11.4.4'
+        classpath 'com.netflix.nebula:nebula-project-plugin:6.0.2'
+        classpath 'gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.22.1'
+        classpath 'io.spring.gradle:dependency-management-plugin:1.0.7.RELEASE'
+        classpath 'org.ajoberstar.grgit:grgit-gradle:3.1.1'
+        classpath 'org.nosphere.apache:creadur-rat-gradle:0.4.0'
+        classpath 'org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.7'
+    }
+}
 plugins {
     id 'java'
 }
 
-group 'org.apache.geode'
-version '1.0-SNAPSHOT'
+apply plugin: 'wrapper'
+apply plugin: 'nebula.facet'
+apply plugin: 'java-library'
+apply plugin: 'idea'
+apply plugin: 'eclipse'
+
+tasks.register('devBuild') {
+    description "A convenience target for a typical developer workflow: apply spotless and assemble all classes."
+    dependsOn tasks.named('assemble')
+    // Each subproject injects its SpotlessApply as a dependency to this task in the standard config
+}
+
+
+apply from: "${rootDir}/${scriptDir}/spotless.gradle"
+apply from: "${scriptDir}/rat.gradle"
+
 
 sourceCompatibility = 1.8
 
@@ -12,22 +60,22 @@
 }
 
 dependencies {
-
-    compile 'org.apache.geode:geode-core:1.10.0'
-    compile 'org.apache.geode:geode-cq:1.10.0'
+    compile('org.apache.geode:geode-core:1.10.0')
+    compile('org.apache.geode:geode-cq:1.10.0')
     compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
-    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
-    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
+    compile(group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0')
+    compile(group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0')
 
     testCompile(group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1')
     testCompile(group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '1.1.0')
     testCompile(group: 'org.apache.curator', name: 'curator-framework', version: '4.2.0')
     testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1')
 
-    testCompile group: 'junit', name: 'junit', version: '4.12'
-    testCompile 'org.mockito:mockito-core:3.2.4'
-    testCompile 'pl.pragmatists:JUnitParams:1.1.1'
+    testCompile(group: 'junit', name: 'junit', version: '4.12')
+    testCompile('org.mockito:mockito-core:3.2.4')
+    testCompile('pl.pragmatists:JUnitParams:1.1.1')
 
     testImplementation 'org.awaitility:awaitility:4.0.2'
 }
 
+
diff --git a/etc/eclipse-java-google-style.xml b/etc/eclipse-java-google-style.xml
new file mode 100644
index 0000000..629ef19
--- /dev/null
+++ b/etc/eclipse-java-google-style.xml
@@ -0,0 +1,337 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="13">
+<profile kind="CodeFormatterProfile" name="GoogleStyle" version="13">
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.7"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.7"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.7"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_field" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_field.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_local_variable" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_local_variable.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_method" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_method.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_package" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_package.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_parameter" value="1040"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_parameter.count_dependent" value="1040|-1|1040"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_type" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_type.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression.count_dependent" value="16|4|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_cascading_method_invocation_with_arguments" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_cascading_method_invocation_with_arguments.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants.count_dependent" value="16|5|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_field_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_for_statement" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_generic_type_arguments" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_generic_type_arguments.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_local_variable_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_new_anonymous_class" value="20"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_resources_in_try" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration.count_dependent" value="16|4|49"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="100"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment_new_line_at_start_of_html_paragraph" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.force_if_else_statement_brace" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comment_prefix" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="100"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="3"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_comment_inline_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_local_variable_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_member_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_package_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_parameter_annotation" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_type_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_prefer_two_fragments" value="false"/>
+</profile>
+</profiles>
diff --git a/etc/eclipseOrganizeImports.importorder b/etc/eclipseOrganizeImports.importorder
new file mode 100644
index 0000000..de96cae
--- /dev/null
+++ b/etc/eclipseOrganizeImports.importorder
@@ -0,0 +1,8 @@
+#Organize Import Order
+#Thu Sep 15 13:10:33 PDT 2016
+5=com.gemstone
+4=org.apache.geode
+3=
+2=javax
+1=java
+0=\#
diff --git a/etc/intellij-java-modified-google-style.xml b/etc/intellij-java-modified-google-style.xml
new file mode 100644
index 0000000..696500c
--- /dev/null
+++ b/etc/intellij-java-modified-google-style.xml
@@ -0,0 +1,255 @@
+<code_scheme name="GeodeStyle">
+  <option name="JAVA_INDENT_OPTIONS">
+    <value>
+      <option name="INDENT_SIZE" value="2" />
+      <option name="CONTINUATION_INDENT_SIZE" value="4" />
+      <option name="TAB_SIZE" value="8" />
+      <option name="USE_TAB_CHARACTER" value="false" />
+      <option name="SMART_TABS" value="false" />
+      <option name="LABEL_INDENT_SIZE" value="0" />
+      <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+      <option name="USE_RELATIVE_INDENTS" value="false" />
+    </value>
+  </option>
+  <option name="OTHER_INDENT_OPTIONS">
+    <value>
+      <option name="INDENT_SIZE" value="2" />
+      <option name="CONTINUATION_INDENT_SIZE" value="4" />
+      <option name="TAB_SIZE" value="8" />
+      <option name="USE_TAB_CHARACTER" value="false" />
+      <option name="SMART_TABS" value="false" />
+      <option name="LABEL_INDENT_SIZE" value="0" />
+      <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+      <option name="USE_RELATIVE_INDENTS" value="false" />
+    </value>
+  </option>
+  <option name="LINE_SEPARATOR" value="&#xA;" />
+  <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
+  <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
+  <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+    <value />
+  </option>
+  <option name="IMPORT_LAYOUT_TABLE">
+    <value>
+      <package name="" withSubpackages="true" static="true" />
+      <emptyLine />
+      <package name="java" withSubpackages="true" static="false" />
+      <emptyLine />
+      <package name="javax" withSubpackages="true" static="false" />
+      <emptyLine />
+      <package name="" withSubpackages="true" static="false" />
+      <emptyLine />
+      <package name="org.apache.geode" withSubpackages="true" static="false" />
+      <emptyLine />
+      <package name="com.gemstone" withSubpackages="true" static="false" />
+    </value>
+  </option>
+  <option name="STATIC_METHODS_ORDER_WEIGHT" value="5" />
+  <option name="METHODS_ORDER_WEIGHT" value="4" />
+  <option name="RIGHT_MARGIN" value="100" />
+  <option name="JD_ALIGN_PARAM_COMMENTS" value="false" />
+  <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false" />
+  <option name="JD_ADD_BLANK_AFTER_DESCRIPTION" value="false" />
+  <option name="JD_P_AT_EMPTY_LINES" value="false" />
+  <option name="JD_KEEP_EMPTY_PARAMETER" value="false" />
+  <option name="JD_KEEP_EMPTY_EXCEPTION" value="false" />
+  <option name="JD_KEEP_EMPTY_RETURN" value="false" />
+  <option name="HTML_KEEP_BLANK_LINES" value="1" />
+  <option name="HTML_ALIGN_TEXT" value="true" />
+  <option name="KEEP_LINE_BREAKS" value="false" />
+  <option name="KEEP_FIRST_COLUMN_COMMENT" value="false" />
+  <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+  <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1" />
+  <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+  <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1" />
+  <option name="BLANK_LINES_AROUND_CLASS" value="0" />
+  <option name="BLANK_LINES_AROUND_FIELD" value="1" />
+  <option name="BLANK_LINES_AFTER_CLASS_HEADER" value="1" />
+  <option name="BRACE_STYLE" value="2" />
+  <option name="CLASS_BRACE_STYLE" value="2" />
+  <option name="METHOD_BRACE_STYLE" value="2" />
+  <option name="ELSE_ON_NEW_LINE" value="true" />
+  <option name="WHILE_ON_NEW_LINE" value="true" />
+  <option name="CATCH_ON_NEW_LINE" value="true" />
+  <option name="FINALLY_ON_NEW_LINE" value="true" />
+  <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+  <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+  <option name="ALIGN_MULTILINE_ASSIGNMENT" value="true" />
+  <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+  <option name="ALIGN_MULTILINE_THROWS_LIST" value="true" />
+  <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+  <option name="ALIGN_MULTILINE_PARENTHESIZED_EXPRESSION" value="true" />
+  <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+  <option name="SPACE_AFTER_TYPE_CAST" value="false" />
+  <option name="SPACE_BEFORE_IF_PARENTHESES" value="false" />
+  <option name="SPACE_BEFORE_WHILE_PARENTHESES" value="false" />
+  <option name="SPACE_BEFORE_FOR_PARENTHESES" value="false" />
+  <option name="SPACE_BEFORE_CATCH_PARENTHESES" value="false" />
+  <option name="SPACE_BEFORE_SWITCH_PARENTHESES" value="false" />
+  <option name="SPACE_BEFORE_SYNCHRONIZED_PARENTHESES" value="false" />
+  <option name="CALL_PARAMETERS_WRAP" value="1" />
+  <option name="CALL_PARAMETERS_LPAREN_ON_NEXT_LINE" value="true" />
+  <option name="METHOD_PARAMETERS_WRAP" value="1" />
+  <option name="EXTENDS_LIST_WRAP" value="1" />
+  <option name="THROWS_LIST_WRAP" value="1" />
+  <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+  <option name="THROWS_KEYWORD_WRAP" value="1" />
+  <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+  <option name="BINARY_OPERATION_WRAP" value="1" />
+  <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+  <option name="TERNARY_OPERATION_WRAP" value="1" />
+  <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+  <option name="FOR_STATEMENT_WRAP" value="1" />
+  <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+  <option name="ASSIGNMENT_WRAP" value="5" />
+  <option name="WRAP_COMMENTS" value="true" />
+  <option name="IF_BRACE_FORCE" value="3" />
+  <option name="DOWHILE_BRACE_FORCE" value="3" />
+  <option name="WHILE_BRACE_FORCE" value="3" />
+  <option name="FOR_BRACE_FORCE" value="3" />
+  <JavaCodeStyleSettings>
+    <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true" />
+    <option name="CLASS_NAMES_IN_JAVADOC" value="3" />
+  </JavaCodeStyleSettings>
+  <MarkdownNavigatorCodeStyleSettings>
+    <option name="RIGHT_MARGIN" value="72" />
+  </MarkdownNavigatorCodeStyleSettings>
+  <XML>
+    <option name="XML_KEEP_BLANK_LINES" value="1" />
+    <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
+  </XML>
+  <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+    <option name="INDENT_SIZE" value="2" />
+  </ADDITIONAL_INDENT_OPTIONS>
+  <ADDITIONAL_INDENT_OPTIONS fileType="java">
+    <option name="INDENT_SIZE" value="2" />
+    <option name="CONTINUATION_INDENT_SIZE" value="4" />
+    <option name="TAB_SIZE" value="8" />
+  </ADDITIONAL_INDENT_OPTIONS>
+  <ADDITIONAL_INDENT_OPTIONS fileType="js">
+    <option name="CONTINUATION_INDENT_SIZE" value="4" />
+  </ADDITIONAL_INDENT_OPTIONS>
+  <ADDITIONAL_INDENT_OPTIONS fileType="sass">
+    <option name="INDENT_SIZE" value="2" />
+  </ADDITIONAL_INDENT_OPTIONS>
+  <ADDITIONAL_INDENT_OPTIONS fileType="yml">
+    <option name="INDENT_SIZE" value="2" />
+  </ADDITIONAL_INDENT_OPTIONS>
+  <codeStyleSettings language="ECMA Script Level 4">
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+    <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+    <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+    <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+    <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+    <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+    <option name="CALL_PARAMETERS_WRAP" value="1" />
+    <option name="METHOD_PARAMETERS_WRAP" value="1" />
+    <option name="EXTENDS_LIST_WRAP" value="1" />
+    <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+    <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+    <option name="BINARY_OPERATION_WRAP" value="1" />
+    <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+    <option name="TERNARY_OPERATION_WRAP" value="1" />
+    <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+    <option name="FOR_STATEMENT_WRAP" value="1" />
+    <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+    <option name="ASSIGNMENT_WRAP" value="5" />
+    <option name="WRAP_COMMENTS" value="true" />
+    <option name="IF_BRACE_FORCE" value="3" />
+    <option name="DOWHILE_BRACE_FORCE" value="3" />
+    <option name="WHILE_BRACE_FORCE" value="3" />
+    <option name="FOR_BRACE_FORCE" value="3" />
+    <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+  </codeStyleSettings>
+  <codeStyleSettings language="JAVA">
+    <option name="RIGHT_MARGIN" value="100" />
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+    <option name="CALL_PARAMETERS_WRAP" value="1" />
+    <option name="METHOD_PARAMETERS_WRAP" value="1" />
+    <option name="EXTENDS_LIST_WRAP" value="1" />
+    <option name="THROWS_LIST_WRAP" value="1" />
+    <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+    <option name="THROWS_KEYWORD_WRAP" value="1" />
+    <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+    <option name="BINARY_OPERATION_WRAP" value="1" />
+    <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+    <option name="TERNARY_OPERATION_WRAP" value="1" />
+    <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+    <option name="FOR_STATEMENT_WRAP" value="1" />
+    <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+    <option name="ASSIGNMENT_WRAP" value="5" />
+    <option name="IF_BRACE_FORCE" value="3" />
+    <option name="DOWHILE_BRACE_FORCE" value="3" />
+    <option name="WHILE_BRACE_FORCE" value="3" />
+    <option name="FOR_BRACE_FORCE" value="3" />
+    <indentOptions>
+      <option name="INDENT_SIZE" value="2" />
+      <option name="CONTINUATION_INDENT_SIZE" value="4" />
+      <option name="TAB_SIZE" value="8" />
+    </indentOptions>
+  </codeStyleSettings>
+  <codeStyleSettings language="JavaScript">
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+    <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+    <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+    <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+    <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+    <option name="CALL_PARAMETERS_WRAP" value="1" />
+    <option name="METHOD_PARAMETERS_WRAP" value="1" />
+    <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+    <option name="BINARY_OPERATION_WRAP" value="1" />
+    <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+    <option name="TERNARY_OPERATION_WRAP" value="1" />
+    <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+    <option name="FOR_STATEMENT_WRAP" value="1" />
+    <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+    <option name="ASSIGNMENT_WRAP" value="5" />
+    <option name="WRAP_COMMENTS" value="true" />
+    <option name="IF_BRACE_FORCE" value="3" />
+    <option name="DOWHILE_BRACE_FORCE" value="3" />
+    <option name="WHILE_BRACE_FORCE" value="3" />
+    <option name="FOR_BRACE_FORCE" value="3" />
+    <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+  </codeStyleSettings>
+  <codeStyleSettings language="PHP">
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+    <option name="BLANK_LINES_AFTER_CLASS_HEADER" value="1" />
+    <option name="ALIGN_MULTILINE_ASSIGNMENT" value="true" />
+    <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+    <option name="ALIGN_MULTILINE_THROWS_LIST" value="true" />
+    <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+    <option name="ALIGN_MULTILINE_PARENTHESIZED_EXPRESSION" value="true" />
+    <option name="CALL_PARAMETERS_WRAP" value="1" />
+    <option name="METHOD_PARAMETERS_WRAP" value="1" />
+    <option name="EXTENDS_LIST_WRAP" value="1" />
+    <option name="THROWS_LIST_WRAP" value="1" />
+    <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+    <option name="THROWS_KEYWORD_WRAP" value="1" />
+    <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+    <option name="BINARY_OPERATION_WRAP" value="1" />
+    <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+    <option name="TERNARY_OPERATION_WRAP" value="1" />
+    <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+    <option name="FOR_STATEMENT_WRAP" value="1" />
+    <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+    <option name="ASSIGNMENT_WRAP" value="5" />
+    <option name="WRAP_COMMENTS" value="true" />
+    <option name="IF_BRACE_FORCE" value="3" />
+    <option name="DOWHILE_BRACE_FORCE" value="3" />
+    <option name="WHILE_BRACE_FORCE" value="3" />
+    <option name="FOR_BRACE_FORCE" value="3" />
+    <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+    <indentOptions>
+      <option name="INDENT_SIZE" value="4" />
+      <option name="CONTINUATION_INDENT_SIZE" value="8" />
+      <option name="TAB_SIZE" value="4" />
+      <option name="USE_TAB_CHARACTER" value="false" />
+      <option name="SMART_TABS" value="false" />
+      <option name="LABEL_INDENT_SIZE" value="0" />
+      <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+      <option name="USE_RELATIVE_INDENTS" value="false" />
+    </indentOptions>
+  </codeStyleSettings>
+</code_scheme>
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..7e073d9
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version = '1.0-SNAPSHOT'
+group = 'org.apache.geode'
+
+
+# 'apply from:' location for gradle scripts, relative to the project root.  Specified here so that
+# it may be overridden by external projects or custom develop environment configurations
+scriptDir = gradle
+
+productName = Apache Geode Kafka Connector
+productOrg = Apache Software Foundation (ASF)
+
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
new file mode 100644
index 0000000..75ccac3
--- /dev/null
+++ b/gradle/rat.gradle
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: "org.nosphere.apache.rat"
+
+rat {
+    inputDir = rootDir
+    excludes = [
+            // git
+            '.git/**',
+            '**/.gitignore',
+            '**/.gitkeep',
+
+            // gradle
+            '**/.gradle/**',
+            'gradlew',
+            'gradlew.bat',
+            'gradle/wrapper/gradle-wrapper.properties',
+            'caches/**',
+            'daemon/**',
+            'native/**',
+            'wrapper/**',
+            '**/build/**',
+            '**/build-*/**',
+            '.buildinfo',
+            // IDE
+            'etc/eclipse-java-google-style.xml',
+            'etc/intellij-java-modified-google-style.xml',
+            'etc/eclipseOrganizeImports.importorder',
+            '**/.project',
+            '**/.classpath',
+            '**/.settings/**',
+            '**/build-eclipse/**',
+            '**/*.iml',
+            '**/*.ipr',
+            '**/*.iws',
+            '.idea/**',
+            '**/tags',
+            '**/out/**',
+
+            // text files
+            '**/*.fig',
+            '**/*.txt',
+            '**/*.md',
+            '**/*.json',
+            '**/*.tx0',
+            '**/*.txo',
+            '**/*.log',
+            '**/*.patch',
+            '**/*.diff',
+            '**/*.rej',
+            '**/*.orig',
+            '**/*.MF',
+
+            // binary files
+            '**/*.cer',
+            '**/*.dia',
+            '**/*.gfs',
+            '**/*.gif',
+            '**/*.ico',
+            '**/*.jpg',
+            '**/*.keystore',
+            '**/*.pdf',
+            '**/*.png',
+            '**/*.ser',
+            '**/*.svg',
+            '**/*.truststore',
+            '**/*.xls',
+            '**/publickeyfile',
+            '**/*.dat',
+
+            // other text files
+            '**/log4j*.xml',
+            '.java-version', // file created by `jenv local`
+            '**/META-INF/**',
+    ]
+}
+
+check.dependsOn rat
\ No newline at end of file
diff --git a/gradle/spotless.gradle b/gradle/spotless.gradle
new file mode 100644
index 0000000..70d3c95
--- /dev/null
+++ b/gradle/spotless.gradle
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// When a custom step changes, we need to bump the value passed to the method
+//   bumpThisNumberIfACustomStepChanges
+// This has been historically easy to forget, however, and can cause failures in some rare cases.
+// To safeguard against this, we instead use the (partial) md5 of this file as that method input.
+def thisFile = file("${rootDir}/${scriptDir}/spotless.gradle")
+def thisFileMd5 = thisFile.text.md5() as String
+def thisFileMd5Piece = thisFileMd5.substring(0, 8)
+def thisFileIntegerHash = Integer.parseUnsignedInt(thisFileMd5Piece, 16)
+logger.debug("Using partial md5 (${thisFileIntegerHash}) of file ${thisFile} as spotless bump.")
+
+project.ext.set("spotless-file-hash", thisFileIntegerHash)
+
+
+apply plugin: "com.diffplug.gradle.spotless"
+spotless {
+    lineEndings = 'unix'
+    java {
+        target project.fileTree(project.projectDir) {
+            include '**/*.java'
+            exclude '**/generated-src/**'
+            exclude '**/build/**'
+        }
+
+        // As the method name suggests, bump this number if any of the below "custom" rules change.
+        // Spotless will not run on unchanged files unless this number changes.
+        bumpThisNumberIfACustomStepChanges(project.ext.'spotless-file-hash')
+
+        removeUnusedImports()
+
+        custom 'Remove commented-out import statements', {
+            it.replaceAll(/\n\/\/ import .*?;.*/, '')
+        }
+        custom 'Refuse wildcard imports', {
+            // Wildcard imports can't be resolved by spotless itself.
+            // This will require the developer themselves to adhere to best practices.
+            if (it =~ /\nimport .*\*;/) {
+                throw new AssertionError("Do not use wildcard imports.  'spotlessApply' cannot resolve this issue.")
+            }
+        }
+//        custom 'Refuse Awaitility import', {
+//            if (it =~ /import\s+(static\s+)?org.awaitility.Awaitility.*/) {
+//                throw new AssertionError("Do not use Awaitility.await(). Use GeodeAwaitility.await() instead. 'spotlessApply' cannot resolve this issue.")
+//            }
+//        }
+        importOrderFile "${rootDir}/${scriptDir}/../etc/eclipseOrganizeImports.importorder"
+
+        custom 'Remove unhelpful javadoc stubs', {
+            // e.g., remove the following lines:
+            // "* @param paramName"
+            // "* @throws ExceptionType"
+            // "* @return returnType"'
+            // Multiline to allow anchors on newlines
+            it.replaceAll(/(?m)^ *\* *@(?:param|throws|return) *\w* *\n/, '')
+        }
+        custom 'Remove any empty Javadocs and block comments', {
+            // Matches any /** [...] */ or /* [...] */ that contains:
+            // (a) only whitespace
+            // (b) trivial information, such as "@param paramName" or @throws ExceptionType
+            //     without any additional information.  This information is implicit in the signature.
+            it.replaceAll(/\/\*+\s*\n(\s*\*\s*\n)*\s*\*+\/\s*\n/, '')
+        }
+
+        // Enforce style modifier order
+        custom 'Modifier ordering', {
+            def modifierRanking = [
+                    "public"      : 1,
+                    "protected"   : 2,
+                    "private"     : 3,
+                    "abstract"    : 4,
+                    "default"     : 5,
+                    "static"      : 6,
+                    "final"       : 7,
+                    "transient"   : 8,
+                    "volatile"    : 9,
+                    "synchronized": 10,
+                    "native"      : 11,
+                    "strictfp"    : 12]
+            // Find any instance of multiple modifiers. Lead with a non-word character to avoid
+            // accidental matching against for instance, "an alternative default value"
+            it.replaceAll(/\W(?:public |protected |private |abstract |default |static |final |transient |volatile |synchronized |native |strictfp ){2,}/, {
+                // Do not replace the leading non-word character.  Identify the modifiers
+                it.replaceAll(/(?:public |protected |private |abstract |default |static |final |transient |volatile |synchronized |native |strictfp ){2,}/, {
+                    // Sort the modifiers according to the ranking above
+                    it.split().sort({ modifierRanking[it] }).join(' ') + ' '
+                }
+                )
+            }
+            )
+        }
+
+
+        // Notes on eclipse formatter version:
+        // 4.6.3 is consistent with existing / previous behavior.
+        // 4.7.1 works, but had different default whitespace rules, notably with mid-ternary linebreak.
+        // 4.7.2 exists but is currently incompatible with our style file, raising NPEs.
+
+        // The format file is relative to geode-core and not the root project as the root project would change
+        // if Geode and submodules are included as part of a different gradle project.
+        eclipse('4.6.3').configFile "${rootDir}/${scriptDir}/../etc/eclipse-java-google-style.xml"
+        trimTrailingWhitespace()
+        endWithNewline()
+    }
+
+
+    groovyGradle {
+        target project.fileTree(project.projectDir) {
+            include '**/*.gradle'
+            exclude '**/generated-src/**'
+            exclude '**/build/**'
+        }
+
+        // As the method name suggests, bump this number if any of the below "custom" rules change.
+        // Spotless will not run on unchanged files unless this number changes.
+        bumpThisNumberIfACustomStepChanges(project.ext.'spotless-file-hash')
+
+        custom 'Use single-quote in project directives.', {
+            it.replaceAll(/project\(":([^"]*)"\)/, 'project(\':$1\')')
+        }
+
+        custom 'Use parenthesis in single-line gradle dependency declarations.', {
+            it.replaceAll(/\n(\s*\S*(?:[cC]ompile|[rR]untime)(?:Only)?) (?!\()([^{\n]*)\n/, { original, declaration, dep ->
+                "\n${declaration}(${dep})\n"
+            })
+        }
+
+        custom 'Do not pad spaces before parenthesis in gradle dependency declaration.', {
+            it.replaceAll(/\n(\s*\S*(?:[cC]ompile|[rR]untime)(?:Only)?) +\(/, '\n$1(')
+        }
+
+        indentWithSpaces(2)
+    }
+}
+
+// If we add more languages to Spotless, add them to 'compileXYZ' trigger below
+afterEvaluate {
+    // Not all projects are java projects.  findByName could return null, so use the null-safe ?. operator
+    project.tasks.findByName('compileJava')?.mustRunAfter(spotlessCheck)
+    project.tasks.findByName('compileJava')?.mustRunAfter(spotlessApply)
+
+    // Within the configure block, 'project' refers to the task-owning project, in this case rootProject
+    def thisProjectScoped = project
+    rootProject.tasks.named('devBuild').configure {
+        dependsOn thisProjectScoped.tasks.named('spotlessApply')
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index 48eed3b..546b8ec 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1 +1,15 @@
+//Licensed to the Apache Software Foundation (ASF) under one or more
+//contributor license agreements.  See the NOTICE file distributed with
+//this work for additional information regarding copyright ownership.
+//The ASF licenses this file to You under the Apache License, Version 2.0
+//(the "License"); you may not use this file except in compliance with
+//the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing, software
+//distributed under the License is distributed on an "AS IS" BASIS,
+//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//See the License for the specific language governing permissions and
+//limitations under the License.
 rootProject.name = 'geode-kafka-connector'
\ No newline at end of file
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index efffdcd..7702765 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -23,105 +23,106 @@
 
 public class GeodeConnectorConfig {
 
-    //GeodeKafka Specific Configuration
-    /**
-     * Identifier for each task
-     */
-    public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
-    /**
-     * Specifies which Locators to connect to Apache Geode
-     */
-    public static final String LOCATORS = "locators";
-    public static final String DEFAULT_LOCATOR = "localhost[10334]";
-    public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+  // GeodeKafka Specific Configuration
+  /**
+   * Identifier for each task
+   */
+  public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
+  /**
+   * Specifies which Locators to connect to Apache Geode
+   */
+  public static final String LOCATORS = "locators";
+  public static final String DEFAULT_LOCATOR = "localhost[10334]";
+  public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
 
-    protected final int taskId;
-    protected List<LocatorHostPort> locatorHostPorts;
-    private String securityClientAuthInit;
+  protected final int taskId;
+  protected List<LocatorHostPort> locatorHostPorts;
+  private String securityClientAuthInit;
 
-    protected GeodeConnectorConfig() {
-        taskId = 0;
+  protected GeodeConnectorConfig() {
+    taskId = 0;
+  }
+
+  public GeodeConnectorConfig(Map<String, String> connectorProperties) {
+    taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+    locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+    securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
+  }
+
+
+  public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
+    // It's the same formatting, so parsing is the same going topic to region or region to topic
+    return parseRegionToTopics(combinedBindings);
+  }
+
+  /**
+   * Given a string of the form [region:topic,...] will produce a map where the key is the
+   * regionName and the value is a list of topicNames to push values to
+   *
+   * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
+   * @return mapping of regionName to list of topics to update
+   */
+  public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
+    if (combinedBindings == null || combinedBindings.equals("")) {
+      return new HashMap();
     }
+    List<String> bindings = parseBindings(combinedBindings);
+    return bindings.stream().map(binding -> {
+      String[] regionToTopicsArray = parseBinding(binding);
+      return regionToTopicsArray;
+    }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0],
+        regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
+  }
 
-    public GeodeConnectorConfig(Map<String, String> connectorProperties) {
-        taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
-        locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
-        securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
-    }
+  public static List<String> parseBindings(String bindings) {
+    return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
+      s = s.replaceAll("\\[", "");
+      s = s.replaceAll("\\]", "");
+      s = s.trim();
+      return s;
+    }).collect(Collectors.toList());
+  }
 
+  private static String[] parseBinding(String binding) {
+    return binding.split(":");
+  }
 
-    public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
-        //It's the same formatting, so parsing is the same going topic to region or region to topic
-        return parseRegionToTopics(combinedBindings);
-    }
+  // Used to parse a string of topics or regions
+  public static List<String> parseStringByComma(String string) {
+    return parseStringBy(string, ",");
+  }
 
-    /**
-     * Given a string of the form [region:topic,...] will produce a map where the key is the
-     * regionName and the value is a list of topicNames to push values to
-     *
-     * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
-     * @return mapping of regionName to list of topics to update
-     */
-    public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
-        if (combinedBindings == null || combinedBindings.equals("")){
-            return new HashMap();
-        }
-        List<String> bindings = parseBindings(combinedBindings);
-        return bindings.stream().map(binding -> {
-            String[] regionToTopicsArray = parseBinding(binding);
-            return regionToTopicsArray;
-        }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
-    }
+  public static List<String> parseStringBy(String string, String regex) {
+    return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
+  }
 
-    public static List<String> parseBindings(String bindings) {
-        return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
-            s = s.replaceAll("\\[", "");
-            s = s.replaceAll("\\]", "");
-            s = s.trim();
-            return s;
-        }).collect(Collectors.toList());
-    }
+  public static String reconstructString(Collection<String> strings) {
+    return strings.stream().collect(Collectors.joining("],["));
+  }
 
-    private static String[] parseBinding(String binding) {
-        return binding.split(":");
-    }
+  List<LocatorHostPort> parseLocators(String locators) {
+    return Arrays.stream(locators.split(",")).map((s) -> {
+      String locatorString = s.trim();
+      return parseLocator(locatorString);
+    }).collect(Collectors.toList());
+  }
 
-    //Used to parse a string of topics or regions
-    public static List<String> parseStringByComma(String string) {
-        return parseStringBy(string, ",");
-    }
+  private LocatorHostPort parseLocator(String locatorString) {
+    String[] splits = locatorString.split("\\[");
+    String locator = splits[0];
+    int port = Integer.parseInt(splits[1].replace("]", ""));
+    return new LocatorHostPort(locator, port);
+  }
 
-    public static List<String> parseStringBy(String string, String regex) {
-        return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
-    }
+  public int getTaskId() {
+    return taskId;
+  }
 
-    public static String reconstructString(Collection<String> strings) {
-        return strings.stream().collect(Collectors.joining("],["));
-    }
+  public List<LocatorHostPort> getLocatorHostPorts() {
+    return locatorHostPorts;
+  }
 
-    List<LocatorHostPort> parseLocators(String locators) {
-        return Arrays.stream(locators.split(",")).map((s) -> {
-            String locatorString = s.trim();
-            return parseLocator(locatorString);
-        }).collect(Collectors.toList());
-    }
-
-    private LocatorHostPort parseLocator(String locatorString) {
-        String[] splits = locatorString.split("\\[");
-        String locator = splits[0];
-        int port = Integer.parseInt(splits[1].replace("]", ""));
-        return new LocatorHostPort(locator, port);
-    }
-
-    public int getTaskId() {
-        return taskId;
-    }
-
-    public List<LocatorHostPort> getLocatorHostPorts() {
-        return locatorHostPorts;
-    }
-
-    public String getSecurityClientAuthInit() {
-        return securityClientAuthInit;
-    }
+  public String getSecurityClientAuthInit() {
+    return securityClientAuthInit;
+  }
 }
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index ff8a8c3..ba5f9e5 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -14,6 +14,13 @@
  */
 package geode.kafka;
 
+import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.query.CqAttributes;
@@ -21,78 +28,70 @@
 import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.RegionNotFoundException;
-import org.apache.kafka.connect.errors.ConnectException;
-
-import java.util.Collection;
-import java.util.List;
-
-import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
 
 public class GeodeContext {
 
-    private ClientCache clientCache;
+  private ClientCache clientCache;
 
 
-    public GeodeContext() {
+  public GeodeContext() {}
+
+  public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
+      String durableClientId, String durableClientTimeout, String securityAuthInit) {
+    clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
+        securityAuthInit);
+    return clientCache;
+  }
+
+  public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
+      String securityAuthInit) {
+    clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
+    return clientCache;
+  }
+
+  public ClientCache getClientCache() {
+    return clientCache;
+  }
+
+  public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
+      String durableClientTimeOut, String securityAuthInit) {
+    ClientCacheFactory ccf = new ClientCacheFactory();
+
+    if (securityAuthInit != null) {
+      ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
     }
-
-    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout, String securityAuthInit) {
-        clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit);
-        return clientCache;
+    if (!durableClientName.equals("")) {
+      ccf.set("durable-client-id", durableClientName)
+          .set("durable-client-timeout", durableClientTimeOut);
     }
+    // currently we only allow using the default pool.
+    // If we ever want to allow adding multiple pools we'll have to configure pool factories
+    ccf.setPoolSubscriptionEnabled(true);
 
-    public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String securityAuthInit) {
-        clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
-        return clientCache;
+    for (LocatorHostPort locator : locators) {
+      ccf.addPoolLocator(locator.getHostName(), locator.getPort());
     }
+    return ccf.create();
+  }
 
-    public ClientCache getClientCache() {
-        return clientCache;
+  public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
+      throws ConnectException {
+    try {
+      CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+      cq.execute();
+      return cq;
+    } catch (RegionNotFoundException | CqException | CqExistsException e) {
+      throw new ConnectException(e);
     }
+  }
 
-    /**
-     *
-     * @param locators
-     * @param durableClientName
-     * @param durableClientTimeOut
-     * @return
-     */
-    public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut, String securityAuthInit) {
-        ClientCacheFactory ccf = new ClientCacheFactory();
-
-        if (securityAuthInit != null) {
-            ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
-        }
-        if (!durableClientName.equals("")) {
-            ccf.set("durable-client-id", durableClientName)
-                    .set("durable-client-timeout", durableClientTimeOut);
-        }
-        //currently we only allow using the default pool.
-        //If we ever want to allow adding multiple pools we'll have to configure pool factories
-        ccf.setPoolSubscriptionEnabled(true);
-
-        for (LocatorHostPort locator: locators) {
-            ccf.addPoolLocator(locator.getHostName(), locator.getPort());
-        }
-        return ccf.create();
+  public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
+      boolean isDurable) throws ConnectException {
+    try {
+      CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+      return cq.executeWithInitialResults();
+    } catch (RegionNotFoundException | CqException | CqExistsException e) {
+      throw new ConnectException(e);
     }
-
-    public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
-        try {
-            CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
-            cq.execute();
-            return cq;
-        } catch (RegionNotFoundException | CqException | CqExistsException e) {
-            throw new ConnectException(e);
-        }
-    }
-
-    public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
-        try {
-            CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
-            return cq.executeWithInitialResults();
-        } catch (RegionNotFoundException | CqException | CqExistsException e) {
-            throw new ConnectException(e);
-        }
-    }
+  }
 }
diff --git a/src/main/java/geode/kafka/LocatorHostPort.java b/src/main/java/geode/kafka/LocatorHostPort.java
index 21d0603..191d42d 100644
--- a/src/main/java/geode/kafka/LocatorHostPort.java
+++ b/src/main/java/geode/kafka/LocatorHostPort.java
@@ -16,22 +16,23 @@
 
 public class LocatorHostPort {
 
-    private String hostName;
-    private int port;
+  private String hostName;
+  private int port;
 
-    public LocatorHostPort(String hostName, int port) {
-        this.hostName = hostName;
-        this.port = port;
-    }
+  public LocatorHostPort(String hostName, int port) {
+    this.hostName = hostName;
+    this.port = port;
+  }
 
-    public String getHostName() {
-        return hostName;
-    }
+  public String getHostName() {
+    return hostName;
+  }
 
-    public int getPort() {
-        return port;
-    }
-    public String toString() {
-        return hostName + "[" + port + "]";
-    }
+  public int getPort() {
+    return port;
+  }
+
+  public String toString() {
+    return hostName + "[" + port + "]";
+  }
 }
diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java
index c10faaf..0221cbe 100644
--- a/src/main/java/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/geode/kafka/sink/BatchRecords.java
@@ -14,69 +14,67 @@
  */
 package geode.kafka.sink;
 
-import geode.kafka.source.GeodeKafkaSourceTask;
-import org.apache.geode.cache.Region;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.geode.cache.Region;
 
 /**
  * A collection of records to put/remove from a region
  */
 public class BatchRecords {
-    private static final Logger logger = LoggerFactory.getLogger(BatchRecords.class);
+  private static final Logger logger = LoggerFactory.getLogger(BatchRecords.class);
 
-    private Map updateMap;
-    private Collection removeList;
+  private Map updateMap;
+  private Collection removeList;
 
-    public BatchRecords() {
-        this(new HashMap(), new ArrayList());
+  public BatchRecords() {
+    this(new HashMap(), new ArrayList());
+  }
+
+  /** Used for tests **/
+  public BatchRecords(Map updateMap, Collection removeList) {
+    this.updateMap = updateMap;
+    this.removeList = removeList;
+  }
+
+  public void addRemoveOperation(SinkRecord record) {
+    // if a previous operation added to the update map
+    // let's just remove it so we don't do a put and then a remove
+    // depending on the order of operations (putAll then removeAll or removeAll or putAll)...
+    // ...we could remove one of the if statements.
+    if (updateMap.containsKey(record.key())) {
+      updateMap.remove(record.key());
+    } else {
+      removeList.add(record.key());
     }
+  }
 
-    /** Used for tests**/
-    public BatchRecords(Map updateMap, Collection removeList) {
-        this.updateMap = updateMap;
-        this.removeList = removeList;
+  public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
+    // it's assumed the records in are order
+    // if so if a previous value was in the remove list
+    // let's not remove it at the end of this operation
+    if (nullValuesMeansRemove) {
+      if (removeList.contains(record.key())) {
+        removeList.remove(record.key());
+      }
     }
+    updateMap.put(record.key(), record.value());
+  }
 
-    public void addRemoveOperation(SinkRecord record) {
-        //if a previous operation added to the update map
-        //let's just remove it so we don't do a put and then a remove
-        //depending on the order of operations (putAll then removeAll or removeAll or putAll)...
-        //...we could remove one of the if statements.
-        if (updateMap.containsKey(record.key())) {
-            updateMap.remove(record.key());
-        } else {
-            removeList.add(record.key());
-        }
+
+  public void executeOperations(Region region) {
+    if (region != null) {
+      region.putAll(updateMap);
+      region.removeAll(removeList);
+    } else {
+      logger.info("Unable to locate proxy region: " + region);
     }
-
-    public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
-        //it's assumed the records in are order
-        //if so if a previous value was in the remove list
-        // let's not remove it at the end of this operation
-        if (nullValuesMeansRemove) {
-            if (removeList.contains(record.key())) {
-                removeList.remove(record.key());
-            }
-        }
-        updateMap.put(record.key(), record.value());
-    }
-
-
-    public void executeOperations(Region region) {
-        if (region != null) {
-            region.putAll(updateMap);
-            region.removeAll(removeList);
-        }
-        else {
-            logger.info("Unable to locate proxy region: " + region);
-        }
-    }
+  }
 }
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 1ccb385..550e8a9 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,72 +14,71 @@
  */
 package geode.kafka.sink;
 
-import geode.kafka.GeodeConnectorConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
-import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
 
-public class GeodeKafkaSink extends SinkConnector  {
-    private static final ConfigDef CONFIG_DEF = new ConfigDef();
-    private Map<String, String> sharedProps;
+public class GeodeKafkaSink extends SinkConnector {
+  private static final ConfigDef CONFIG_DEF = new ConfigDef();
+  private Map<String, String> sharedProps;
 
-    @Override
-    public void start(Map<String, String> props) {
-        sharedProps = computeMissingConfigurations(props);
+  @Override
+  public void start(Map<String, String> props) {
+    sharedProps = computeMissingConfigurations(props);
+  }
+
+  @Override
+  public Class<? extends Task> taskClass() {
+    return GeodeKafkaSinkTask.class;
+  }
+
+  @Override
+  public List<Map<String, String>> taskConfigs(int maxTasks) {
+    List<Map<String, String>> taskConfigs = new ArrayList<>();
+
+    // All tasks will build up the topic to regions map. A few might not use certain keys but we
+    // have no control over partitioning in kafka and which tasks will fire
+    for (int i = 0; i < maxTasks; i++) {
+      Map<String, String> taskProps = new HashMap<>();
+      taskProps.putAll(sharedProps);
+      taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+      taskConfigs.add(taskProps);
     }
 
-    @Override
-    public Class<? extends Task> taskClass() {
-        return GeodeKafkaSinkTask.class;
-    }
+    return taskConfigs;
+  }
 
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        List<Map<String, String>> taskConfigs = new ArrayList<>();
+  @Override
+  public void stop() {
 
-        //All tasks will build up the topic to regions map. A few might not use certain keys but we have no control over partitioning in kafka and which tasks will fire
-        for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> taskProps = new HashMap<>();
-            taskProps.putAll(sharedProps);
-            taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
-            taskConfigs.add(taskProps);
-        }
+  }
 
-        return taskConfigs;
-    }
+  @Override
+  public ConfigDef config() {
+    return CONFIG_DEF;
+  }
 
-    @Override
-    public void stop() {
-
-    }
-
-    @Override
-    public ConfigDef config() {
-        return CONFIG_DEF;
-    }
-
-    @Override
-    public String version() {
-        //TODO
-        return "unknown";
-    }
+  @Override
+  public String version() {
+    // TODO
+    return "unknown";
+  }
 
 
-    private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
-        props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
-        props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
-        return props;
-    }
+  private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+    props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
+    props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
+    return props;
+  }
 }
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index bcc5885..3552b2d 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -14,22 +14,23 @@
  */
 package geode.kafka.sink;
 
-import geode.kafka.GeodeContext;
-import geode.kafka.GeodeSinkConnectorConfig;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionExistsException;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import geode.kafka.GeodeContext;
+import geode.kafka.GeodeSinkConnectorConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+
 
 /**
  * TODO javaDoc
@@ -37,109 +38,113 @@
  */
 public class GeodeKafkaSinkTask extends SinkTask {
 
-    private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
 
-    private GeodeContext geodeContext;
-    private int taskId;
-    private Map<String, List<String>> topicToRegions;
-    private Map<String, Region> regionNameToRegion;
-    private boolean nullValuesMeansRemove = true;
+  private GeodeContext geodeContext;
+  private int taskId;
+  private Map<String, List<String>> topicToRegions;
+  private Map<String, Region> regionNameToRegion;
+  private boolean nullValuesMeansRemove = true;
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String version() {
-        //TODO
-        return "unknown";
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String version() {
+    // TODO
+    return "unknown";
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    try {
+      GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
+      configure(geodeConnectorConfig);
+      geodeContext = new GeodeContext();
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+          geodeConnectorConfig.getSecurityClientAuthInit());
+      regionNameToRegion = createProxyRegions(topicToRegions.values());
+    } catch (Exception e) {
+      logger.error("Unable to start sink task", e);
+      throw e;
     }
+  }
 
-    @Override
-    public void start(Map<String, String> props) {
-        try {
-            GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
-            configure(geodeConnectorConfig);
-            geodeContext = new GeodeContext();
-            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getSecurityClientAuthInit());
-            regionNameToRegion = createProxyRegions(topicToRegions.values());
-        } catch (Exception e) {
-            logger.error("Unable to start sink task", e);
-            throw e;
-        }
+  void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
+    logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
+    taskId = geodeConnectorConfig.getTaskId();
+    topicToRegions = geodeConnectorConfig.getTopicToRegions();
+    nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
+  }
+
+  // For tests only
+  void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
+    this.regionNameToRegion = regionNameToRegion;
+  }
+
+  @Override
+  public void put(Collection<SinkRecord> records) {
+    put(records, new HashMap());
+  }
+
+  void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
+    // spin off a new thread to handle this operation? Downside is ordering and retries...
+    for (SinkRecord record : records) {
+      updateBatchForRegionByTopic(record, batchRecordsMap);
     }
+    batchRecordsMap.entrySet().stream().forEach((entry) -> {
+      String region = entry.getKey();
+      BatchRecords batchRecords = entry.getValue();
+      batchRecords.executeOperations(regionNameToRegion.get(region));
+    });
+  }
 
-    void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
-        logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
-        taskId = geodeConnectorConfig.getTaskId();
-        topicToRegions = geodeConnectorConfig.getTopicToRegions();
-        nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
+  private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
+      Map<String, BatchRecords> batchRecordsMap) {
+    Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
+    for (String region : regionsToUpdate) {
+      updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
     }
+  }
 
-    //For tests only
-    void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
-        this.regionNameToRegion = regionNameToRegion;
+  private void updateBatchRecordsForRecord(SinkRecord record,
+      Map<String, BatchRecords> batchRecordsMap, String region) {
+    BatchRecords batchRecords = batchRecordsMap.get(region);
+    if (batchRecords == null) {
+      batchRecords = new BatchRecords();
+      batchRecordsMap.put(region, batchRecords);
     }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-       put(records, new HashMap());
+    if (record.key() != null) {
+      if (record.value() == null && nullValuesMeansRemove) {
+        batchRecords.addRemoveOperation(record);
+      } else {
+        batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
+      }
+    } else {
+      // Invest in configurable auto key generator?
+      logger.warn("Unable to push to Geode, missing key in record : " + record.value());
     }
+  }
 
-    void put(Collection<SinkRecord> records, Map<String, BatchRecords>  batchRecordsMap) {
-        //spin off a new thread to handle this operation?  Downside is ordering and retries...
-        for (SinkRecord record : records) {
-            updateBatchForRegionByTopic(record, batchRecordsMap);
-        }
-        batchRecordsMap.entrySet().stream().forEach((entry) -> {
-            String region = entry.getKey();
-            BatchRecords batchRecords = entry.getValue();
-            batchRecords.executeOperations(regionNameToRegion.get(region));
-        });
+  private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) {
+    List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
+    return flat.stream().map(regionName -> createProxyRegion(regionName))
+        .collect(Collectors.toMap(region -> region.getName(), region -> region));
+  }
+
+  private Region createProxyRegion(String regionName) {
+    try {
+      return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(regionName);
+    } catch (RegionExistsException e) {
+      // Each task is a seperate parallel task controlled by kafka.
+      return geodeContext.getClientCache().getRegion(regionName);
     }
+  }
 
-    private void updateBatchForRegionByTopic(SinkRecord sinkRecord, Map<String, BatchRecords> batchRecordsMap) {
-        Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
-        for (String region : regionsToUpdate) {
-            updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
-        }
-    }
+  @Override
+  public void stop() {
+    geodeContext.getClientCache().close(false);
+  }
 
-    private void updateBatchRecordsForRecord(SinkRecord record, Map<String, BatchRecords> batchRecordsMap, String region) {
-        BatchRecords batchRecords = batchRecordsMap.get(region);
-        if (batchRecords == null) {
-            batchRecords = new BatchRecords();
-            batchRecordsMap.put(region, batchRecords);
-        }
-        if (record.key() != null) {
-            if (record.value() == null && nullValuesMeansRemove) {
-                batchRecords.addRemoveOperation(record);
-            } else {
-                batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
-            }
-        } else {
-            //Invest in configurable auto key generator?
-            logger.warn("Unable to push to Geode, missing key in record : " + record.value());
-        }
-    }
-
-    private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) {
-        List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
-        return flat.stream().map(regionName -> createProxyRegion(regionName)).collect(Collectors.toMap(region->region.getName(), region -> region));
-    }
-
-    private Region createProxyRegion(String regionName) {
-        try {
-            return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
-        }
-        catch (RegionExistsException e) {
-            //Each task is a seperate parallel task controlled by kafka.
-            return geodeContext.getClientCache().getRegion(regionName);
-        }
-    }
-
-    @Override
-    public void stop() {
-        geodeContext.getClientCache().close(false);
-    }
-
-}
\ No newline at end of file
+}
diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 59a57cd..e416a9f 100644
--- a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -18,26 +18,26 @@
 import java.util.Map;
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
-    //Used by sink
-    public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
-    public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
-    public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+  // Used by sink
+  public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+  public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+  public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
 
-    private Map<String, List<String>> topicToRegions;
-    private final boolean nullValuesMeanRemove;
+  private Map<String, List<String>> topicToRegions;
+  private final boolean nullValuesMeanRemove;
 
-    public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
-        super(connectorProperties);
-       topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
-       nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
-    }
+  public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
+    super(connectorProperties);
+    topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
+    nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
+  }
 
-    public Map<String, List<String>> getTopicToRegions() {
-        return topicToRegions;
-    }
+  public Map<String, List<String>> getTopicToRegions() {
+    return topicToRegions;
+  }
 
-    public boolean getNullValuesMeanRemove() {
-        return nullValuesMeanRemove;
-    }
+  public boolean getNullValuesMeanRemove() {
+    return nullValuesMeanRemove;
+  }
 
 }
diff --git a/src/main/java/geode/kafka/source/EventBufferSupplier.java b/src/main/java/geode/kafka/source/EventBufferSupplier.java
index d844744..8b05e22 100644
--- a/src/main/java/geode/kafka/source/EventBufferSupplier.java
+++ b/src/main/java/geode/kafka/source/EventBufferSupplier.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package geode.kafka.source;
 
 import java.util.concurrent.BlockingQueue;
diff --git a/src/main/java/geode/kafka/source/GeodeEvent.java b/src/main/java/geode/kafka/source/GeodeEvent.java
index 3de8abd..f569f2b 100644
--- a/src/main/java/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/geode/kafka/source/GeodeEvent.java
@@ -21,19 +21,19 @@
  */
 public class GeodeEvent {
 
-    private String regionName;
-    private CqEvent event;
+  private String regionName;
+  private CqEvent event;
 
-    public GeodeEvent(String regionName, CqEvent event) {
-        this.regionName = regionName;
-        this.event = event;
-    }
+  public GeodeEvent(String regionName, CqEvent event) {
+    this.regionName = regionName;
+    this.event = event;
+  }
 
-    public String getRegionName() {
-        return regionName;
-    }
+  public String getRegionName() {
+    return regionName;
+  }
 
-    public CqEvent getEvent() {
-        return event;
-    }
+  public CqEvent getEvent() {
+    return event;
+  }
 }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index b81401a..f150e07 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -14,18 +14,6 @@
  */
 package geode.kafka.source;
 
-import geode.kafka.GeodeConnectorConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -43,11 +31,23 @@
 import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
 import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+
 
 public class GeodeKafkaSource extends SourceConnector {
 
   private Map<String, String> sharedProps;
-  //TODO maybe club this into GeodeConnnectorConfig
+  // TODO maybe club this into GeodeConnnectorConfig
   private static final ConfigDef CONFIG_DEF = new ConfigDef();
 
 
@@ -59,14 +59,16 @@
   @Override
   public List<Map<String, String>> taskConfigs(int maxTasks) {
     List<Map<String, String>> taskConfigs = new ArrayList<>();
-    List<String> bindings = GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
+    List<String> bindings =
+        GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
     List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
       Map<String, String> taskProps = new HashMap<>();
       taskProps.putAll(sharedProps);
       taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
-      taskProps.put(CQS_TO_REGISTER, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
+      taskProps.put(CQS_TO_REGISTER,
+          GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
       taskConfigs.add(taskProps);
     }
     return taskConfigs;
@@ -84,7 +86,7 @@
   }
 
   private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
-    props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+    props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
     props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
     props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
     props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
@@ -101,7 +103,7 @@
 
   @Override
   public String version() {
-    //TODO
+    // TODO
     return AppInfoParser.getVersion();
   }
 
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 5c8a152..b0b8c6a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -14,64 +14,66 @@
  */
 package geode.kafka.source;
 
-import org.apache.geode.cache.query.CqEvent;
-import org.apache.geode.cache.query.CqStatusListener;
+import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqStatusListener;
 
 class GeodeKafkaSourceListener implements CqStatusListener {
 
-    private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
+  private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
 
-    public String regionName;
-    private EventBufferSupplier eventBufferSupplier;
-    private boolean initialResultsLoaded;
+  public String regionName;
+  private EventBufferSupplier eventBufferSupplier;
+  private boolean initialResultsLoaded;
 
-    public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
-        this.regionName = regionName;
-        this.eventBufferSupplier = eventBufferSupplier;
-        initialResultsLoaded = false;
+  public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
+    this.regionName = regionName;
+    this.eventBufferSupplier = eventBufferSupplier;
+    initialResultsLoaded = false;
+  }
+
+  @Override
+  public void onEvent(CqEvent aCqEvent) {
+    while (!initialResultsLoaded) {
+      Thread.yield();
     }
+    try {
+      eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
 
-    @Override
-    public void onEvent(CqEvent aCqEvent) {
-        while (!initialResultsLoaded) {
-            Thread.yield();
-        }
+      while (true) {
         try {
-            eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-
-            while (true) {
-                try {
-                    if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
-                        break;
-                } catch (InterruptedException ex) {
-                    ex.printStackTrace();
-                }
-                logger.info("GeodeKafkaSource Queue is full");
-            }
+          if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2,
+              TimeUnit.SECONDS))
+            break;
+        } catch (InterruptedException ex) {
+          ex.printStackTrace();
         }
+        logger.info("GeodeKafkaSource Queue is full");
+      }
     }
+  }
 
-    @Override
-    public void onError(CqEvent aCqEvent) {
+  @Override
+  public void onError(CqEvent aCqEvent) {
 
-    }
+  }
 
-    @Override
-    public void onCqDisconnected() {
-        //we should probably redistribute or reconnect
-    }
+  @Override
+  public void onCqDisconnected() {
+    // we should probably redistribute or reconnect
+  }
 
-    @Override
-    public void onCqConnected() {
+  @Override
+  public void onCqConnected() {
 
-    }
+  }
 
-    public void signalInitialResultsLoaded() {
-        initialResultsLoaded = true;
-    }
+  public void signalInitialResultsLoaded() {
+    initialResultsLoaded = true;
+  }
 }
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 6efeb85..da2119a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -14,151 +14,163 @@
  */
 package geode.kafka.source;
 
-import geode.kafka.GeodeContext;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
 
-import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
-import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import geode.kafka.GeodeContext;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
-    private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
 
-    private static final String TASK_PREFIX = "TASK";
-    private static final String DOT = ".";
+  private static final String TASK_PREFIX = "TASK";
+  private static final String DOT = ".";
 
-    //property string to pass in to identify task
-    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
+  // property string to pass in to identify task
+  private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
 
-    private GeodeContext geodeContext;
-    private GeodeSourceConnectorConfig geodeConnectorConfig;
-    private EventBufferSupplier eventBufferSupplier;
-    private Map<String, List<String>> regionToTopics;
-    private Map<String, Map<String, String>> sourcePartitions;
-    private int batchSize;
+  private GeodeContext geodeContext;
+  private GeodeSourceConnectorConfig geodeConnectorConfig;
+  private EventBufferSupplier eventBufferSupplier;
+  private Map<String, List<String>> regionToTopics;
+  private Map<String, Map<String, String>> sourcePartitions;
+  private int batchSize;
 
 
-    private static Map<String, Long> createOffset() {
-        Map<String, Long> offset = new HashMap<>();
-        offset.put("OFFSET", 0L);
-        return offset;
+  private static Map<String, Long> createOffset() {
+    Map<String, Long> offset = new HashMap<>();
+    offset.put("OFFSET", 0L);
+    return offset;
+  }
+
+  @Override
+  public String version() {
+    return null;
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    try {
+      geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+      int taskId = geodeConnectorConfig.getTaskId();
+      logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
+      geodeContext = new GeodeContext();
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+          geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
+          geodeConnectorConfig.getSecurityClientAuthInit());
+
+      batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+      eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
+
+      regionToTopics = geodeConnectorConfig.getRegionToTopics();
+      geodeConnectorConfig.getCqsToRegister();
+      sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
+
+      String cqPrefix = geodeConnectorConfig.getCqPrefix();
+      boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
+      installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
+          loadEntireRegion);
+    } catch (Exception e) {
+      e.printStackTrace();
+      logger.error("Unable to start source task", e);
+      throw e;
     }
+  }
 
-    @Override
-    public String version() {
-        return null;
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        try {
-            geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
-            int taskId = geodeConnectorConfig.getTaskId();
-            logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
-            geodeContext = new GeodeContext();
-            geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
-
-            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
-            eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
-
-            regionToTopics = geodeConnectorConfig.getRegionToTopics();
-            geodeConnectorConfig.getCqsToRegister();
-            sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
-
-            String cqPrefix = geodeConnectorConfig.getCqPrefix();
-            boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
-            installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix, loadEntireRegion);
-        } catch (Exception e) {
-            e.printStackTrace();
-            logger.error("Unable to start source task", e);
-            throw e;
+  @Override
+  public List<SourceRecord> poll() throws InterruptedException {
+    ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+    ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+    if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
+      for (GeodeEvent event : events) {
+        String regionName = event.getRegionName();
+        List<String> topics = regionToTopics.get(regionName);
+        for (String topic : topics) {
+          records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
+              null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
         }
+      }
+      return records;
     }
 
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
-        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
-        if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
-            for (GeodeEvent event : events) {
-                String regionName = event.getRegionName();
-                List<String> topics = regionToTopics.get(regionName);
-                for (String topic : topics) {
-                    records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
-                }
-            }
-            return records;
-        }
+    return null;
+  }
 
-        return null;
-    }
+  @Override
+  public void stop() {
+    geodeContext.getClientCache().close(true);
+  }
 
-    @Override
-    public void stop() {
-        geodeContext.getClientCache().close(true);
+  void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,
+      EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+    boolean isDurable = geodeConnectorConfig.isDurable();
+    int taskId = geodeConnectorConfig.getTaskId();
+    for (String region : geodeConnectorConfig.getCqsToRegister()) {
+      installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix,
+          loadEntireRegion, isDurable);
     }
+    if (isDurable) {
+      geodeContext.getClientCache().readyForEvents();
+    }
+  }
 
-    void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
-        boolean isDurable = geodeConnectorConfig.isDurable();
-        int taskId = geodeConnectorConfig.getTaskId();
-        for (String region : geodeConnectorConfig.getCqsToRegister()) {
-            installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
-        }
-        if (isDurable) {
-            geodeContext.getClientCache().readyForEvents();
-        }
+  GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
+      EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
+      boolean isDurable) {
+    CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+    GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
+    cqAttributesFactory.addCqListener(listener);
+    CqAttributes cqAttributes = cqAttributesFactory.create();
+    try {
+      if (loadEntireRegion) {
+        Collection<CqEvent> events =
+            geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
+                "select * from /" + regionName, cqAttributes,
+                isDurable);
+        eventBuffer.get().addAll(
+            events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+      } else {
+        geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
+            "select * from /" + regionName, cqAttributes,
+            isDurable);
+      }
+    } finally {
+      listener.signalInitialResultsLoaded();
     }
+    return listener;
+  }
 
-    GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
-        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
-        GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
-        cqAttributesFactory.addCqListener(listener);
-        CqAttributes cqAttributes = cqAttributesFactory.create();
-        try {
-            if (loadEntireRegion) {
-                Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
-                        isDurable);
-                eventBuffer.get().addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
-            } else {
-                geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
-                        isDurable);
-            }
-        } finally {
-            listener.signalInitialResultsLoaded();
-        }
-        return listener;
-    }
+  /**
+   * converts a list of regions names into a map of source partitions
+   *
+   * @param regionNames list of regionNames
+   * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+   */
+  Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
+    return regionNames.stream().map(regionName -> {
+      Map<String, String> sourcePartition = new HashMap<>();
+      sourcePartition.put(REGION_PARTITION, regionName);
+      return sourcePartition;
+    }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
+  }
 
-    /**
-     * converts a list of regions names into a map of source partitions
-     *
-     * @param regionNames list of regionNames
-     * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
-     */
-    Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
-        return regionNames.stream().map(regionName -> {
-            Map<String, String> sourcePartition = new HashMap<>();
-            sourcePartition.put(REGION_PARTITION, regionName);
-            return sourcePartition;
-        }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
-    }
-
-    String generateCqName(int taskId, String cqPrefix, String regionName) {
-        return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
-    }
+  String generateCqName(int taskId, String cqPrefix, String regionName) {
+    return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+  }
 }
diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
index 0b46d2b..4f0393d 100644
--- a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -14,101 +14,101 @@
  */
 package geode.kafka.source;
 
-import geode.kafka.GeodeConnectorConfig;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import geode.kafka.GeodeConnectorConfig;
+
 public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
 
-    //Geode Configuration
-    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
-    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-    public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
-    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+  // Geode Configuration
+  public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
+  public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+  public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+  public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
 
-    public static final String CQ_PREFIX = "cqPrefix";
-    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+  public static final String CQ_PREFIX = "cqPrefix";
+  public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
 
-    /**
-     * Used as a key for source partitions
-     */
-    public static final String REGION_PARTITION = "regionPartition";
-    public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
-    public static final String CQS_TO_REGISTER = "cqsToRegister";
+  /**
+   * Used as a key for source partitions
+   */
+  public static final String REGION_PARTITION = "regionPartition";
+  public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+  public static final String CQS_TO_REGISTER = "cqsToRegister";
 
-    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
-    public static final String DEFAULT_BATCH_SIZE = "100";
+  public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+  public static final String DEFAULT_BATCH_SIZE = "100";
 
-    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
-    public static final String DEFAULT_QUEUE_SIZE = "10000";
+  public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+  public static final String DEFAULT_QUEUE_SIZE = "10000";
 
-    public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
-    public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+  public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+  public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
-    private final String durableClientId;
-    private final String durableClientIdPrefix;
-    private final String durableClientTimeout;
-    private final String cqPrefix;
-    private final boolean loadEntireRegion;
+  private final String durableClientId;
+  private final String durableClientIdPrefix;
+  private final String durableClientTimeout;
+  private final String cqPrefix;
+  private final boolean loadEntireRegion;
 
-    private Map<String, List<String>> regionToTopics;
-    private Collection<String> cqsToRegister;
+  private Map<String, List<String>> regionToTopics;
+  private Collection<String> cqsToRegister;
 
-    public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
-        super(connectorProperties);
-        cqsToRegister = parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
-        regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
-        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
-        if (isDurable(durableClientIdPrefix)) {
-            durableClientId = durableClientIdPrefix + taskId;
-        } else {
-            durableClientId = "";
-        }
-        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
-        cqPrefix = connectorProperties.get(CQ_PREFIX);
-        loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+  public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
+    super(connectorProperties);
+    cqsToRegister = parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
+    regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
+    durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+    if (isDurable(durableClientIdPrefix)) {
+      durableClientId = durableClientIdPrefix + taskId;
+    } else {
+      durableClientId = "";
     }
+    durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+    cqPrefix = connectorProperties.get(CQ_PREFIX);
+    loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+  }
 
-    public boolean isDurable() {
-        return isDurable(durableClientId);
-    }
+  public boolean isDurable() {
+    return isDurable(durableClientId);
+  }
 
-    /**
-     * @param durableClientId or prefix can be passed in.  Either both will be "" or both will have a value
-     * @return
-     */
-    boolean isDurable(String durableClientId) {
-        return !durableClientId.equals("");
-    }
+  /**
+   * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a
+   *        value
+   */
+  boolean isDurable(String durableClientId) {
+    return !durableClientId.equals("");
+  }
 
-    public int getTaskId() {
-        return taskId;
-    }
+  public int getTaskId() {
+    return taskId;
+  }
 
-    public String getDurableClientId() {
-        return durableClientId;
-    }
+  public String getDurableClientId() {
+    return durableClientId;
+  }
 
-    public String getDurableClientTimeout() {
-        return durableClientTimeout;
-    }
+  public String getDurableClientTimeout() {
+    return durableClientTimeout;
+  }
 
-    public String getCqPrefix() {
-        return cqPrefix;
-    }
+  public String getCqPrefix() {
+    return cqPrefix;
+  }
 
-    public boolean getLoadEntireRegion() {
-        return loadEntireRegion;
-    }
+  public boolean getLoadEntireRegion() {
+    return loadEntireRegion;
+  }
 
-    public Map<String, List<String>> getRegionToTopics() {
-        return regionToTopics;
-    }
+  public Map<String, List<String>> getRegionToTopics() {
+    return regionToTopics;
+  }
 
-    public Collection<String> getCqsToRegister() {
-        return cqsToRegister;
-    }
+  public Collection<String> getCqsToRegister() {
+    return cqsToRegister;
+  }
 
 }
diff --git a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
index 2a4c883..6ac6bb6 100644
--- a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
+++ b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
@@ -1,37 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package geode.kafka.source;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Supplier;
 
 public class SharedEventBufferSupplier implements EventBufferSupplier {
 
-    private static BlockingQueue<GeodeEvent> eventBuffer;
+  private static BlockingQueue<GeodeEvent> eventBuffer;
 
-    public SharedEventBufferSupplier(int size) {
-        recreateEventBufferIfNeeded(size);
-    }
+  public SharedEventBufferSupplier(int size) {
+    recreateEventBufferIfNeeded(size);
+  }
 
-    BlockingQueue recreateEventBufferIfNeeded(int size) {
+  BlockingQueue recreateEventBufferIfNeeded(int size) {
+    if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+      synchronized (GeodeKafkaSource.class) {
         if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
-            synchronized (GeodeKafkaSource.class) {
-                if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
-                    BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
-                    eventBuffer = new LinkedBlockingQueue<>(size);
-                    if (oldEventBuffer != null) {
-                        eventBuffer.addAll(oldEventBuffer);
-                    }
-                }
-            }
+          BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
+          eventBuffer = new LinkedBlockingQueue<>(size);
+          if (oldEventBuffer != null) {
+            eventBuffer.addAll(oldEventBuffer);
+          }
         }
-        return eventBuffer;
+      }
     }
+    return eventBuffer;
+  }
 
-    /**
-     * Callers should not store a reference to this and instead always call get to make sure we always use the latest buffer
-     * Buffers themselves shouldn't change often but in cases where we want to modify the size
-     */
-    public BlockingQueue<GeodeEvent> get() {
-        return eventBuffer;
-    }
+  /**
+   * Callers should not store a reference to this and instead always call get to make sure we always
+   * use the latest buffer
+   * Buffers themselves shouldn't change often but in cases where we want to modify the size
+   */
+  public BlockingQueue<GeodeEvent> get() {
+    return eventBuffer;
+  }
 }
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 84a3ec5..c6fe491 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -14,15 +14,6 @@
  */
 package geode.kafka;
 
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -30,122 +21,130 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
 @RunWith(JUnitParamsRunner.class)
 public class GeodeConnectorConfigTest {
 
-    @Test
-    public void parseRegionNamesShouldSplitOnComma() {
-        GeodeConnectorConfig config = new GeodeConnectorConfig();
-        List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
+  @Test
+  public void parseRegionNamesShouldSplitOnComma() {
+    GeodeConnectorConfig config = new GeodeConnectorConfig();
+    List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
+    assertEquals(4, regionNames.size());
+    assertThat(true, allOf(is(regionNames.contains("region1")), is(regionNames.contains("region2")),
+        is(regionNames.contains("region3")), is(regionNames.contains("region4"))));
+  }
+
+  @Test
+  public void parseRegionNamesShouldChomp() {
+    GeodeConnectorConfig config = new GeodeConnectorConfig();
+    List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
+    assertEquals(4, regionNames.size());
+    assertThat(true,
+        allOf(is(regionNames instanceof List), is(regionNames.contains("region1")),
+            is(regionNames.contains("region2")), is(regionNames.contains("region3")),
+            is(regionNames.contains("region4"))));
+  }
+
+  @Test
+  public void shouldBeAbleToParseGeodeLocatorStrings() {
+    GeodeConnectorConfig config = new GeodeConnectorConfig();
+    String locatorString = "localhost[8888], localhost[8881]";
+    List<LocatorHostPort> locators = config.parseLocators(locatorString);
+    assertThat(2, is(locators.size()));
+  }
+
+  @Test
+  @Parameters(method = "oneToOneBindings")
+  public void parseBindingsCanSplitOneToOneBindings(String value) {
+    List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+    assertEquals(2, splitBindings.size());
+  }
+
+  @Test
+  public void parseBindingsCanSplitASingleOneToOneBindings() {
+    String binding = "[region1:topic1]";
+    List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
+    assertEquals(1, splitBindings.size());
+    assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
+  }
+
+  public List<String> oneToOneBindings() {
+    return Arrays.asList(
+        new String[] {"[region1:topic1],[region2:topic2]", "[region1:topic1] , [region2:topic2]",
+            "[region1:topic1], [region2:topic2] ,", "[region1: topic1], [region2 :topic2]"});
+  }
+
+  @Test
+  @Parameters(method = "oneToManyBindings")
+  public void parseBindingsCanSplitOneToManyBindings(String value) {
+    List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+    assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+  }
+
+  public List<String> oneToManyBindings() {
+    return Arrays.asList(new String[] {"[region1:topic1,topic2],[region2:topic2,topic3]",
+        "[region1:topic1 , topic2] , [region2:topic2 , topic3]",
+        "[region1:topic1 ,], [region2:topic2 ,] ,",
+        "[region1: topic1 ,topic3], [region2 :topic2]"});
+  }
+
+  @Test
+  @Parameters(method = "oneToManyBindings")
+  public void reconstructBindingsToStringShouldReformAParsableString(String value) {
+    List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+    String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
+    splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
+    assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+    for (String topicOrRegion : splitBindings) {
+      assertFalse(topicOrRegion.contains("\\["));
+      assertFalse(topicOrRegion.contains("\\]"));
     }
+  }
 
-    @Test
-    public void parseRegionNamesShouldChomp() {
-        GeodeConnectorConfig config = new GeodeConnectorConfig();
-        List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames instanceof List)
-                , is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
-    }
+  @Test
+  @Parameters(method = "oneToOneBindings")
+  public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
+    Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
+    assertEquals(2, regionToTopics.size());
+    assertTrue(regionToTopics.get("region1") != null);
+    assertEquals(1, regionToTopics.get("region1").size());
+    assertTrue(regionToTopics.get("region1").contains("topic1"));
+  }
 
-    @Test
-    public void shouldBeAbleToParseGeodeLocatorStrings() {
-        GeodeConnectorConfig config = new GeodeConnectorConfig();
-        String locatorString="localhost[8888], localhost[8881]";
-        List<LocatorHostPort> locators = config.parseLocators(locatorString);
-        assertThat(2, is(locators.size()));
-    }
-
-    @Test
-    @Parameters(method="oneToOneBindings")
-    public void parseBindingsCanSplitOneToOneBindings(String value) {
-        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
-        assertEquals(2, splitBindings.size());
-    }
-
-    @Test
-    public void parseBindingsCanSplitASingleOneToOneBindings() {
-        String binding = "[region1:topic1]";
-        List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
-        assertEquals(1, splitBindings.size());
-        assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
-    }
-
-    public List<String> oneToOneBindings() {
-        return Arrays.asList(new String[]{"[region1:topic1],[region2:topic2]"
-        ,"[region1:topic1] , [region2:topic2]"
-        ,"[region1:topic1], [region2:topic2] ,"
-        ,"[region1: topic1], [region2 :topic2]"});
-    }
-
-    @Test
-    @Parameters(method="oneToManyBindings")
-    public void parseBindingsCanSplitOneToManyBindings(String value) {
-        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
-        assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
-    }
-
-    public List<String> oneToManyBindings() {
-        return Arrays.asList(new String[]{"[region1:topic1,topic2],[region2:topic2,topic3]"
-                ,"[region1:topic1 , topic2] , [region2:topic2 , topic3]"
-                ,"[region1:topic1 ,], [region2:topic2 ,] ,"
-                ,"[region1: topic1 ,topic3], [region2 :topic2]"});
-    }
-
-    @Test
-    @Parameters(method="oneToManyBindings")
-    public void reconstructBindingsToStringShouldReformAParsableString(String value) {
-        List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
-        String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
-        splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
-        assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
-        for(String topicOrRegion: splitBindings) {
-            assertFalse(topicOrRegion.contains("\\["));
-            assertFalse(topicOrRegion.contains("\\]"));
-        }
-    }
-
-    @Test
-    @Parameters(method="oneToOneBindings")
-    public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
-        Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
-        assertEquals(2, regionToTopics.size());
-        assertTrue(regionToTopics.get("region1") != null);
-        assertEquals(1, regionToTopics.get("region1").size());
-        assertTrue(regionToTopics.get("region1").contains("topic1"));
-    }
-
-    @Test
-    public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
-        Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
-        assertTrue(regionToTopics.get("region1") != null);
-        assertEquals(1, regionToTopics.get("region1").size());
-        assertTrue(regionToTopics.get("region1").contains("topic1"));
-    }
+  @Test
+  public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
+    Map<String, List<String>> regionToTopics =
+        GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
+    assertTrue(regionToTopics.get("region1") != null);
+    assertEquals(1, regionToTopics.get("region1").size());
+    assertTrue(regionToTopics.get("region1").contains("topic1"));
+  }
 
 
-    /*
-     taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
-        durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
-        if (isDurable(durableClientIdPrefix)) {
-            durableClientId = durableClientIdPrefix + taskId;
-        } else {
-            durableClientId = "";
-        }
-        durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
-        regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
-        topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
-        locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
-
-     */
+  /*
+   * taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+   * durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+   * if (isDurable(durableClientIdPrefix)) {
+   * durableClientId = durableClientIdPrefix + taskId;
+   * } else {
+   * durableClientId = "";
+   * }
+   * durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+   * regionToTopics =
+   * parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+   * topicToRegions =
+   * parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+   * locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+   *
+   */
 
 
 
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index 121a978..3ee5e09 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -14,13 +14,20 @@
  */
 package geode.kafka;
 
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import kafka.admin.RackAwareMode;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -33,27 +40,18 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
 
 public class GeodeKafkaTestCluster {
 
@@ -76,7 +74,8 @@
   private static Consumer<String, String> consumer;
 
   @BeforeClass
-  public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
+  public static void setup()
+      throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
     startZooKeeper();
     startKafka();
     startGeode();
@@ -86,11 +85,11 @@
   @AfterClass
   public static void shutdown() {
     workerAndHerderCluster.stop();
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
-            15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
-//    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-//    adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
-//    adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+    // AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+    // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
+    // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
     zkClient.close();
     kafkaLocalCluster.stop();
     geodeLocalCluster.stop();
@@ -104,18 +103,19 @@
   }
 
   private static void createTopic(String topicName, int numPartitions, int replicationFactor) {
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
-            15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
 
     Properties topicProperties = new Properties();
     topicProperties.put("flush.messages", "1");
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-    adminZkClient.createTopic(topicName, numPartitions,replicationFactor, topicProperties, RackAwareMode.Disabled$.MODULE$);
+    adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties,
+        RackAwareMode.Disabled$.MODULE$);
   }
 
   private static void deleteTopic(String topicName) {
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
-            15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+        15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
     AdminZkClient adminZkClient = new AdminZkClient(zkClient);
     adminZkClient.deleteTopic(topicName);
   }
@@ -129,7 +129,8 @@
     zooKeeperLocalCluster.start();
   }
 
-  private static void startKafka() throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
+  private static void startKafka()
+      throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
     kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig());
     kafkaLocalCluster.start();
   }
@@ -139,9 +140,10 @@
     geodeLocalCluster.start();
   }
 
-  private static  Properties getZooKeeperProperties() throws IOException {
+  private static Properties getZooKeeperProperties() throws IOException {
     Properties properties = new Properties();
-    properties.setProperty("dataDir", (debug)? "/tmp/zookeeper" :temporaryFolder.newFolder("zookeeper").getAbsolutePath());
+    properties.setProperty("dataDir",
+        (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath());
     properties.setProperty("clientPort", "2181");
     properties.setProperty("tickTime", "2000");
     return properties;
@@ -159,43 +161,43 @@
     props.put("port", BROKER_PORT);
     props.put("offsets.topic.replication.factor", "1");
 
-    //Specifically GeodeKafka connector configs
+    // Specifically GeodeKafka connector configs
     return props;
   }
 
 
-  //consumer props, less important, just for testing?
-  public static Consumer<String,String> createConsumer() {
-      final Properties props = new Properties();
+  // consumer props, less important, just for testing?
+  public static Consumer<String, String> createConsumer() {
+    final Properties props = new Properties();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(ConsumerConfig.GROUP_ID_CONFIG,
-              "myGroup");
-      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-              StringDeserializer.class.getName());
-      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-              StringDeserializer.class.getName());
-      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        "myGroup");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
     // Create the consumer using props.
-      final Consumer<String, String> consumer =
-              new KafkaConsumer<>(props);
-      // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
-      return consumer;
+    final Consumer<String, String> consumer =
+        new KafkaConsumer<>(props);
+    // Subscribe to the topic.
+    consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
+    return consumer;
   }
 
-  //consumer props, less important, just for testing?
-  public static Producer<String,String> createProducer() {
+  // consumer props, less important, just for testing?
+  public static Producer<String, String> createProducer() {
     final Properties props = new Properties();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-            StringSerializer.class.getName());
+        StringSerializer.class.getName());
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-            StringSerializer.class.getName());
+        StringSerializer.class.getName());
 
     // Create the producer using props.
     final Producer<String, String> producer =
-            new KafkaProducer<>(props);
+        new KafkaProducer<>(props);
     return producer;
   }
 
@@ -207,9 +209,10 @@
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(TEST_REGION_FOR_SOURCE);
 
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         region.put("KEY" + i, "VALUE" + i);
       }
 
@@ -221,8 +224,7 @@
         }
         return valueReceived.get() == 10;
       });
-    }
-    finally {
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SOURCE);
     }
   }
@@ -236,9 +238,10 @@
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(TEST_REGION_FOR_SOURCE);
 
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         region.put("KEY" + i, "VALUE" + i);
       }
 
@@ -250,23 +253,24 @@
         }
         return valueReceived.get() == 10;
       });
-    }
-    finally {
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SOURCE);
     }
   }
 
   @Test
-  public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest() throws Exception {
+  public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest()
+      throws Exception {
     try {
       createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
       startWorker(5);
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(TEST_REGION_FOR_SOURCE);
 
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         region.put("KEY" + i, "VALUE" + i);
       }
 
@@ -278,8 +282,7 @@
         }
         return valueReceived.get() == 10;
       });
-    }
-    finally {
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SOURCE);
     }
   }
@@ -292,7 +295,8 @@
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+      Region region =
+          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
       Producer<String, String> producer = createProducer();
       for (int i = 0; i < 10; i++) {
@@ -300,23 +304,25 @@
       }
 
       int i = 0;
-      await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    }
-    finally {
+      await().atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SINK);
     }
   }
 
 
   @Test
-  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest() throws Exception {
+  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest()
+      throws Exception {
     try {
       createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
       startWorker(5);
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+      Region region =
+          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
       Producer<String, String> producer = createProducer();
       for (int i = 0; i < 10; i++) {
@@ -324,22 +330,24 @@
       }
 
       int i = 0;
-      await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    }
-    finally {
+      await().atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SINK);
     }
   }
 
   @Test
-  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest() throws Exception {
+  public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest()
+      throws Exception {
     try {
       createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
       startWorker(15);
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+      Region region =
+          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
       Producer<String, String> producer = createProducer();
       for (int i = 0; i < 10; i++) {
@@ -347,9 +355,9 @@
       }
 
       int i = 0;
-      await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    }
-    finally {
+      await().atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SINK);
     }
   }
@@ -362,7 +370,8 @@
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+      Region region =
+          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
       Producer<String, String> producer = createProducer();
       for (int i = 0; i < 10; i++) {
@@ -370,9 +379,9 @@
       }
 
       int i = 0;
-      await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    }
-    finally {
+      await().atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SINK);
     }
   }
@@ -385,17 +394,19 @@
       consumer = createConsumer();
 
       ClientCache client = createGeodeClient();
-      Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+      Region region =
+          client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
 
       Producer<String, String> producer = createProducer();
       for (int i = 0; i < 10; i++) {
-        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(),
+            UUID.randomUUID().toString()));
       }
 
       int i = 0;
-      await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
-    }
-    finally {
+      await().atMost(10, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+    } finally {
       deleteTopic(TEST_TOPIC_FOR_SINK);
     }
   }
diff --git a/src/test/java/geode/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
index 85b3a99..259d30b 100644
--- a/src/test/java/geode/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -18,25 +18,24 @@
 
 public class GeodeLocalCluster {
 
-    private JavaProcess locatorProcess;
-    private JavaProcess serverProcess;
+  private JavaProcess locatorProcess;
+  private JavaProcess serverProcess;
 
-    public GeodeLocalCluster() {
-        locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
-        serverProcess = new JavaProcess(ServerLauncherWrapper.class);
-    }
+  public GeodeLocalCluster() {
+    locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
+    serverProcess = new JavaProcess(ServerLauncherWrapper.class);
+  }
 
-    public void start() throws IOException, InterruptedException {
-        System.out.println("starting locator");
-        locatorProcess.exec("10334");
-        Thread.sleep(15000);
-        serverProcess.exec("40404");
-        Thread.sleep(30000);
-    }
+  public void start() throws IOException, InterruptedException {
+    System.out.println("starting locator");
+    locatorProcess.exec("10334");
+    Thread.sleep(15000);
+    serverProcess.exec("40404");
+    Thread.sleep(30000);
+  }
 
-    public void stop() {
-        serverProcess.destroy();
-        locatorProcess.destroy();
-    }
+  public void stop() {
+    serverProcess.destroy();
+    locatorProcess.destroy();
+  }
 }
-
diff --git a/src/test/java/geode/kafka/JavaProcess.java b/src/test/java/geode/kafka/JavaProcess.java
index ed7a493..b130223 100644
--- a/src/test/java/geode/kafka/JavaProcess.java
+++ b/src/test/java/geode/kafka/JavaProcess.java
@@ -19,40 +19,41 @@
 
 public class JavaProcess {
 
-    public Process process;
-    Class classWithMain;
+  public Process process;
+  Class classWithMain;
 
-    public JavaProcess(Class classWithmain) {
-        this.classWithMain = classWithmain;
+  public JavaProcess(Class classWithmain) {
+    this.classWithMain = classWithmain;
+  }
+
+  public void exec(String... args) throws IOException, InterruptedException {
+    String java =
+        System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+    String classpath = System.getProperty("java.class.path");
+    String className = classWithMain.getName();
+
+    ProcessBuilder builder = new ProcessBuilder(
+        java, "-cp", classpath, className, convertArgsToString(args));
+
+    process = builder.inheritIO().start();
+  }
+
+  private String convertArgsToString(String... args) {
+    String string = "";
+    for (String arg : args) {
+      string += arg;
     }
+    return string;
+  }
 
-    public void exec(String... args) throws IOException, InterruptedException {
-        String java = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
-        String classpath = System.getProperty("java.class.path");
-        String className = classWithMain.getName();
+  public void waitFor() throws InterruptedException {
+    process.waitFor();
+  }
 
-        ProcessBuilder builder = new ProcessBuilder(
-                java, "-cp", classpath, className, convertArgsToString(args));
-
-        process = builder.inheritIO().start();
-    }
-
-    private String convertArgsToString(String... args) {
-        String string = "";
-        for(String arg: args) {
-            string += arg;
-        }
-        return string;
-    }
-
-    public void waitFor() throws InterruptedException {
-        process.waitFor();
-    }
-
-    public void destroy() {
-        process.destroy();
-    }
+  public void destroy() {
+    process.destroy();
+  }
 
 
 
-}
\ No newline at end of file
+}
diff --git a/src/test/java/geode/kafka/KafkaLocalCluster.java b/src/test/java/geode/kafka/KafkaLocalCluster.java
index 8d09775..ef534d9 100644
--- a/src/test/java/geode/kafka/KafkaLocalCluster.java
+++ b/src/test/java/geode/kafka/KafkaLocalCluster.java
@@ -14,12 +14,12 @@
  */
 package geode.kafka;
 
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
 import java.io.IOException;
 import java.util.Properties;
 
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
 public class KafkaLocalCluster {
 
   KafkaServerStartable kafka;
@@ -33,8 +33,7 @@
     try {
       kafka.startup();
       System.out.println("Kafka started up");
-    }
-    catch (Throwable t) {
+    } catch (Throwable t) {
       System.out.println(t);
     }
   }
diff --git a/src/test/java/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
index 0bc446d..1b922c9 100644
--- a/src/test/java/geode/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
@@ -14,37 +14,38 @@
  */
 package geode.kafka;
 
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+
 public class LocatorLauncherWrapper {
 
-    public static void main(String[] args) throws IOException {
-        Properties properties = new Properties();
-//        String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
-//        properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
-        properties.setProperty(ConfigurationProperties.NAME, "locator1");
+  public static void main(String[] args) throws IOException {
+    Properties properties = new Properties();
+    // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
+    // properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
+    properties.setProperty(ConfigurationProperties.NAME, "locator1");
 
-        Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
-        while (true) {
-
-        }
-//
-//        LocatorLauncher locatorLauncher  = new LocatorLauncher.Builder()
-//                .setMemberName("locator1")
-////                .setPort(Integer.valueOf(args[0]))
-////                .setBindAddress("localhost")
-//                .build();
-//
-//        locatorLauncher.start();
-//        while (!locatorLauncher.isRunning()) {
-//
-//        }
-//        System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort());
+    Locator.startLocatorAndDS(10334,
+        new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
+    while (true) {
 
     }
+    //
+    // LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+    // .setMemberName("locator1")
+    //// .setPort(Integer.valueOf(args[0]))
+    //// .setBindAddress("localhost")
+    // .build();
+    //
+    // locatorLauncher.start();
+    // while (!locatorLauncher.isRunning()) {
+    //
+    // }
+    // System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort());
+
+  }
 }
diff --git a/src/test/java/geode/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
index 5204f65..f60af27 100644
--- a/src/test/java/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -14,58 +14,58 @@
  */
 package geode.kafka;
 
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
+
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 
-import java.io.IOException;
-import java.util.Properties;
-
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
-
 public class ServerLauncherWrapper {
 
-    public static void main(String... args) throws IOException {
-//        ServerLauncher serverLauncher  = new ServerLauncher.Builder()
-//                .setMemberName("server1")
-////                .setServerPort(Integer.valueOf(args[0]))
-////                .setServerBindAddress("localhost")
-//              //  .set("locators", "localhost[10334]")
-////                .set("jmx-manager", "true")
-////                .set("jmx-manager-start", "true")
-//                .build();
-//
-//        serverLauncher.start();
-//        System.out.println("Geode Server Launcher complete");
+  public static void main(String... args) throws IOException {
+    // ServerLauncher serverLauncher = new ServerLauncher.Builder()
+    // .setMemberName("server1")
+    //// .setServerPort(Integer.valueOf(args[0]))
+    //// .setServerBindAddress("localhost")
+    // // .set("locators", "localhost[10334]")
+    //// .set("jmx-manager", "true")
+    //// .set("jmx-manager-start", "true")
+    // .build();
+    //
+    // serverLauncher.start();
+    // System.out.println("Geode Server Launcher complete");
 
 
 
+    Properties properties = new Properties();
+    String locatorString = "localhost[10334]";
+    // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
+    Cache cache = new CacheFactory(properties)
+        // .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
+        .set(ConfigurationProperties.LOCATORS, locatorString)
+        .set(ConfigurationProperties.NAME,
+            "server-1")
+        .set(ConfigurationProperties.LOG_FILE,
+            "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
+        .set(ConfigurationProperties.LOG_LEVEL, "info")
+        // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
+        .create();
+    CacheServer cacheServer = cache.addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
 
-        Properties properties = new Properties();
-        String locatorString = "localhost[10334]";
-//        String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
-        Cache cache = new CacheFactory(properties)
-//                .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
-                .set(ConfigurationProperties.LOCATORS, locatorString)
-                .set(ConfigurationProperties.NAME,
-                        "server-1")
-                .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
-                .set(ConfigurationProperties.LOG_LEVEL, "info")
-//               .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
-                .create();
-        CacheServer cacheServer = cache.addCacheServer();
-        cacheServer.setPort(0);
-        cacheServer.start();
+    // create the region
+    cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
+    cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
+    System.out.println("starting cacheserver");
+    while (true) {
 
-        //create the region
-        cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
-        cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
-        System.out.println("starting cacheserver");
-        while (true) {
-
-        }
     }
+  }
 }
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index c388a5a..70461a3 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -18,19 +18,18 @@
 
 public class WorkerAndHerderCluster {
 
-    private JavaProcess workerAndHerder;
+  private JavaProcess workerAndHerder;
 
-    public WorkerAndHerderCluster() {
-        workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
-    }
+  public WorkerAndHerderCluster() {
+    workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
+  }
 
-    public void start(String maxTasks) throws IOException, InterruptedException {
-        workerAndHerder.exec(maxTasks);
+  public void start(String maxTasks) throws IOException, InterruptedException {
+    workerAndHerder.exec(maxTasks);
 
-    }
+  }
 
-    public void stop() {
-        workerAndHerder.destroy();
-    }
+  public void stop() {
+    workerAndHerder.destroy();
+  }
 }
-
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 24427aa..5f86985 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -14,6 +14,16 @@
  */
 package geode.kafka;
 
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 import geode.kafka.sink.GeodeKafkaSink;
 import geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
@@ -28,68 +38,62 @@
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-
 public class WorkerAndHerderWrapper {
 
-    public static void main(String[] args) throws IOException {
-        String maxTasks = args[0];
+  public static void main(String[] args) throws IOException {
+    String maxTasks = args[0];
 
-        Map props = new HashMap();
-        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
-        // fast flushing for testing.
-        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+    Map props = new HashMap();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+    // fast flushing for testing.
+    props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
 
-        props.put("internal.key.converter.schemas.enable", "false");
-        props.put("internal.value.converter.schemas.enable", "false");
-        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
-        props.put("key.converter.schemas.enable", "false");
-        props.put("value.converter.schemas.enable", "false");
+    props.put("internal.key.converter.schemas.enable", "false");
+    props.put("internal.value.converter.schemas.enable", "false");
+    props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+        "org.apache.kafka.connect.storage.StringConverter");
+    props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+        "org.apache.kafka.connect.storage.StringConverter");
+    props.put("key.converter.schemas.enable", "false");
+    props.put("value.converter.schemas.enable", "false");
 
-        WorkerConfig workerCfg = new StandaloneConfig(props);
+    WorkerConfig workerCfg = new StandaloneConfig(props);
 
-        MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
-        offBackingStore.configure(workerCfg);
+    MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+    offBackingStore.configure(workerCfg);
 
-        Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
-        worker.start();
+    Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg,
+        offBackingStore, new AllConnectorClientConfigOverridePolicy());
+    worker.start();
 
-        Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
-        herder.start();
+    Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg),
+        new AllConnectorClientConfigOverridePolicy());
+    herder.start();
 
-        Map<String, String> sourceProps = new HashMap<>();
-        sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
-        sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
-        sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-        sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
+    Map<String, String> sourceProps = new HashMap<>();
+    sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
+    sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
+    sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
+    sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
 
-        herder.putConnectorConfig(
-                sourceProps.get(ConnectorConfig.NAME_CONFIG),
-                sourceProps, true, (error, result)->{
-                });
+    herder.putConnectorConfig(
+        sourceProps.get(ConnectorConfig.NAME_CONFIG),
+        sourceProps, true, (error, result) -> {
+        });
 
-        Map<String, String> sinkProps = new HashMap<>();
-        sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
-        sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
-        sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-        sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
-        sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
+    Map<String, String> sinkProps = new HashMap<>();
+    sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
+    sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
+    sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
+    sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
+    sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
 
-        herder.putConnectorConfig(
-                sinkProps.get(ConnectorConfig.NAME_CONFIG),
-                sinkProps, true, (error, result)->{
-                });
+    herder.putConnectorConfig(
+        sinkProps.get(ConnectorConfig.NAME_CONFIG),
+        sinkProps, true, (error, result) -> {
+        });
 
 
-    }
+  }
 }
diff --git a/src/test/java/geode/kafka/ZooKeeperLocalCluster.java b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
index c1d743c..717b046 100644
--- a/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
@@ -14,14 +14,14 @@
  */
 package geode.kafka;
 
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.admin.AdminServer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
-import java.io.IOException;
-import java.util.Properties;
-
 public class ZooKeeperLocalCluster {
 
   ZooKeeperServerMain zooKeeperServer;
diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
index 210480e..cdb286b 100644
--- a/src/test/java/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
@@ -14,84 +14,85 @@
  */
 package geode.kafka.sink;
 
-import org.apache.geode.cache.Region;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Map;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+
 public class BatchRecordsTest {
-    @Test
-    public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
-        Map updates = mock(Map.class);
-        Collection removes = mock(Collection.class);
-        when(removes.contains(any())).thenReturn(true);
-        BatchRecords records = new BatchRecords(updates, removes);
-        SinkRecord sinkRecord = mock(SinkRecord.class);
-        records.addUpdateOperation(sinkRecord, true);
-        verify(removes, times(1)).remove(any());
-    }
+  @Test
+  public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
+    Map updates = mock(Map.class);
+    Collection removes = mock(Collection.class);
+    when(removes.contains(any())).thenReturn(true);
+    BatchRecords records = new BatchRecords(updates, removes);
+    SinkRecord sinkRecord = mock(SinkRecord.class);
+    records.addUpdateOperation(sinkRecord, true);
+    verify(removes, times(1)).remove(any());
+  }
 
-    @Test
-    public void updatingARecordShouldAddToTheUpdateMap() {
-        Map updates = mock(Map.class);
-        Collection removes = mock(Collection.class);
-        when(removes.contains(any())).thenReturn(false);
-        BatchRecords records = new BatchRecords(updates, removes);
-        SinkRecord sinkRecord = mock(SinkRecord.class);
-        records.addUpdateOperation(sinkRecord, true);
-        verify(updates, times(1)).put(any(), any());
-    }
+  @Test
+  public void updatingARecordShouldAddToTheUpdateMap() {
+    Map updates = mock(Map.class);
+    Collection removes = mock(Collection.class);
+    when(removes.contains(any())).thenReturn(false);
+    BatchRecords records = new BatchRecords(updates, removes);
+    SinkRecord sinkRecord = mock(SinkRecord.class);
+    records.addUpdateOperation(sinkRecord, true);
+    verify(updates, times(1)).put(any(), any());
+  }
 
-    @Test
-    public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
-        boolean nullValuesMeanRemove = false;
-        Map updates = mock(Map.class);
-        Collection removes = mock(Collection.class);
-        when(removes.contains(any())).thenReturn(true);
-        BatchRecords records = new BatchRecords(updates, removes);
-        SinkRecord sinkRecord = mock(SinkRecord.class);
-        records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
-        verify(removes, times(0)).remove(any());
-    }
+  @Test
+  public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
+    boolean nullValuesMeanRemove = false;
+    Map updates = mock(Map.class);
+    Collection removes = mock(Collection.class);
+    when(removes.contains(any())).thenReturn(true);
+    BatchRecords records = new BatchRecords(updates, removes);
+    SinkRecord sinkRecord = mock(SinkRecord.class);
+    records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+    verify(removes, times(0)).remove(any());
+  }
 
 
-    @Test
-    public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
-        Map updates = mock(Map.class);
-        Collection removes = mock(Collection.class);
-        when(updates.containsKey(any())).thenReturn(true);
-        BatchRecords records = new BatchRecords(updates, removes);
-        SinkRecord sinkRecord = mock(SinkRecord.class);
-        records.addRemoveOperation(sinkRecord);
-        verify(updates, times(1)).remove(any());
-    }
+  @Test
+  public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
+    Map updates = mock(Map.class);
+    Collection removes = mock(Collection.class);
+    when(updates.containsKey(any())).thenReturn(true);
+    BatchRecords records = new BatchRecords(updates, removes);
+    SinkRecord sinkRecord = mock(SinkRecord.class);
+    records.addRemoveOperation(sinkRecord);
+    verify(updates, times(1)).remove(any());
+  }
 
-    @Test
-    public void removingARecordAddToTheRemoveCollection() {
-        Map updates = mock(Map.class);
-        Collection removes = mock(Collection.class);
-        BatchRecords records = new BatchRecords(updates, removes);
-        SinkRecord sinkRecord = mock(SinkRecord.class);
-        records.addRemoveOperation(sinkRecord);
-        verify(removes, times(1)).add(any());
-    }
+  @Test
+  public void removingARecordAddToTheRemoveCollection() {
+    Map updates = mock(Map.class);
+    Collection removes = mock(Collection.class);
+    BatchRecords records = new BatchRecords(updates, removes);
+    SinkRecord sinkRecord = mock(SinkRecord.class);
+    records.addRemoveOperation(sinkRecord);
+    verify(removes, times(1)).add(any());
+  }
 
-    @Test
-    public void executeOperationsShouldInvokePutAllAndRemoveAll() {
-        Region region = mock(Region.class);
-        BatchRecords records = new BatchRecords();
-        records.executeOperations(region);
-        verify(region, times(1)).putAll(any());
-        verify(region, times(1)).removeAll(any());
-    }
+  @Test
+  public void executeOperationsShouldInvokePutAllAndRemoveAll() {
+    Region region = mock(Region.class);
+    BatchRecords records = new BatchRecords();
+    records.executeOperations(region);
+    verify(region, times(1)).putAll(any());
+    verify(region, times(1)).removeAll(any());
+  }
 
 
 
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index ed8f040..735f255 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -14,62 +14,61 @@
  */
 package geode.kafka.sink;
 
-import geode.kafka.GeodeSinkConnectorConfig;
-import org.apache.geode.cache.Region;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.TASK_ID;
 import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
 import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import geode.kafka.GeodeSinkConnectorConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+
 public class GeodeKafkaSinkTaskTest {
 
-    private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) {
-        HashMap<String, String> props = new HashMap<>();
-        props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
-        props.put(TASK_ID, "0");
-        props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
-        props.put(LOCATORS, "localhost[10334]");
-        return props;
-    }
+  private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) {
+    HashMap<String, String> props = new HashMap<>();
+    props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
+    props.put(TASK_ID, "0");
+    props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
+    props.put(LOCATORS, "localhost[10334]");
+    return props;
+  }
 
-    @Test
-    public void putRecordsAddsToRegionBatchRecords() {
-        boolean nullMeansRemove = true;
-        GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
-        HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+  @Test
+  public void putRecordsAddsToRegionBatchRecords() {
+    boolean nullMeansRemove = true;
+    GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
+    HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
 
-        SinkRecord topicRecord = mock(SinkRecord.class);
-        when(topicRecord.topic()).thenReturn("topic");
-        when(topicRecord.value()).thenReturn("value");
-        when(topicRecord.key()).thenReturn("key");
+    SinkRecord topicRecord = mock(SinkRecord.class);
+    when(topicRecord.topic()).thenReturn("topic");
+    when(topicRecord.value()).thenReturn("value");
+    when(topicRecord.key()).thenReturn("key");
 
-        List<SinkRecord> records = new ArrayList();
-        records.add(topicRecord);
+    List<SinkRecord> records = new ArrayList();
+    records.add(topicRecord);
 
-        HashMap<String, Region> regionNameToRegion = new HashMap<>();
-        GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props);
-        HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
-        BatchRecords batchRecords = mock(BatchRecords.class);
-        batchRecordsMap.put("region", batchRecords);
-        task.configure(geodeSinkConnectorConfig);
-        task.setRegionNameToRegion(regionNameToRegion);
+    HashMap<String, Region> regionNameToRegion = new HashMap<>();
+    GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props);
+    HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+    BatchRecords batchRecords = mock(BatchRecords.class);
+    batchRecordsMap.put("region", batchRecords);
+    task.configure(geodeSinkConnectorConfig);
+    task.setRegionNameToRegion(regionNameToRegion);
 
-        task.put(records, batchRecordsMap);
-        assertTrue(batchRecordsMap.containsKey("region"));
-        verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
-    }
+    task.put(records, batchRecordsMap);
+    assertTrue(batchRecordsMap.containsKey("region"));
+    verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
+  }
 }
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
index 633c7ae..d45c0c8 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -14,52 +14,52 @@
  */
 package geode.kafka.sink;
 
-import org.junit.Test;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.junit.Test;
 
 public class GeodeKafkaSinkTest {
 
-    @Test
-    public void taskConfigsCreatesMaxNumberOfTasks() {
-        GeodeKafkaSink sink = new GeodeKafkaSink();
-        Map<String, String> props = new HashMap();
-        props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
-        sink.start(props);
-        Collection<Map<String,String>> tasks = sink.taskConfigs(5);
-        assertEquals(5, tasks.size());
-    }
+  @Test
+  public void taskConfigsCreatesMaxNumberOfTasks() {
+    GeodeKafkaSink sink = new GeodeKafkaSink();
+    Map<String, String> props = new HashMap();
+    props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+    sink.start(props);
+    Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+    assertEquals(5, tasks.size());
+  }
 
-    @Test
-    public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
-        GeodeKafkaSink sink = new GeodeKafkaSink();
-        Map<String, String> props = new HashMap();
-        props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
-        sink.start(props);
-        Collection<Map<String,String>> tasks = sink.taskConfigs(5);
-        for(Map<String, String> prop : tasks) {
-            assertEquals("[someTopic:someRegion]", prop.get(TOPIC_TO_REGION_BINDINGS));
-        }
+  @Test
+  public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
+    GeodeKafkaSink sink = new GeodeKafkaSink();
+    Map<String, String> props = new HashMap();
+    props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+    sink.start(props);
+    Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+    for (Map<String, String> prop : tasks) {
+      assertEquals("[someTopic:someRegion]", prop.get(TOPIC_TO_REGION_BINDINGS));
     }
+  }
 
-    @Test
-    public void eachTaskHasUniqueTaskIds() {
-        GeodeKafkaSink sink = new GeodeKafkaSink();
-        Map<String, String> props = new HashMap();
-        props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
-        sink.start(props);
-        Collection<Map<String,String>> tasks = sink.taskConfigs(5);
-        HashSet<String> seenIds = new HashSet();
-        for(Map<String, String> taskProp : tasks) {
-            assertTrue(seenIds.add(taskProp.get(TASK_ID)));
-        }
+  @Test
+  public void eachTaskHasUniqueTaskIds() {
+    GeodeKafkaSink sink = new GeodeKafkaSink();
+    Map<String, String> props = new HashMap();
+    props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+    sink.start(props);
+    Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+    HashSet<String> seenIds = new HashSet();
+    for (Map<String, String> taskProp : tasks) {
+      assertTrue(seenIds.add(taskProp.get(TASK_ID)));
     }
+  }
 }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 33f1ab5..a919b96 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -14,20 +14,6 @@
  */
 package geode.kafka.source;
 
-import geode.kafka.GeodeContext;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.query.CqEvent;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
 import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
 import static org.hamcrest.CoreMatchers.is;
@@ -41,190 +27,212 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import geode.kafka.GeodeContext;
+import org.junit.Test;
+
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.query.CqEvent;
+
 
 public class GeodeKafkaSourceTaskTest {
 
 
-    @Test
-    public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-        boolean loadEntireRegion = true;
-        boolean isDurable = false;
-        List<CqEvent> fakeInitialResults = new LinkedList<>();
-        for (int i = 0; i < 10; i++) {
-            fakeInitialResults.add(mock(CqEvent.class));
-        }
-
-        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
-        assertEquals(10, eventBuffer.size());
+  @Test
+  public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+    boolean loadEntireRegion = true;
+    boolean isDurable = false;
+    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      fakeInitialResults.add(mock(CqEvent.class));
     }
 
-    @Test
-    public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-        boolean loadEntireRegion = false;
-        boolean isDurable = false;
-        List<CqEvent> fakeInitialResults = new LinkedList<>();
-        for (int i = 0; i < 10; i++) {
-            fakeInitialResults.add(mock(CqEvent.class));
-        }
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(fakeInitialResults);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+        "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+    assertEquals(10, eventBuffer.size());
+  }
 
-        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
-        assertEquals(0, eventBuffer.size());
+  @Test
+  public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+    boolean loadEntireRegion = false;
+    boolean isDurable = false;
+    List<CqEvent> fakeInitialResults = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      fakeInitialResults.add(mock(CqEvent.class));
     }
 
-    @Test
-    public void cqListenerOnEventPopulatesEventsBuffer() {
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-        boolean loadEntireRegion = false;
-        boolean isDurable = false;
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(fakeInitialResults);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+        "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+    assertEquals(0, eventBuffer.size());
+  }
 
-        when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+  @Test
+  public void cqListenerOnEventPopulatesEventsBuffer() {
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+    boolean loadEntireRegion = false;
+    boolean isDurable = false;
 
-        listener.onEvent(mock(CqEvent.class));
-        assertEquals(1, eventBuffer.size());
-    }
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(new ArrayList());
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    GeodeKafkaSourceListener listener =
+        task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+            "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
 
-    @Test
-    public void readyForEventsIsCalledIfDurable() {
-        ClientCache clientCache = mock(ClientCache.class);
+    listener.onEvent(mock(CqEvent.class));
+    assertEquals(1, eventBuffer.size());
+  }
 
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        when(geodeContext.getClientCache()).thenReturn(clientCache);
+  @Test
+  public void readyForEventsIsCalledIfDurable() {
+    ClientCache clientCache = mock(ClientCache.class);
 
-        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
-        when (config.isDurable()).thenReturn(true);
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installOnGeode(config, geodeContext, null, "", false);
-        verify(clientCache, times(1)).readyForEvents();
-    }
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-    @Test
-    public void cqIsInvokedForEveryRegionWithATopic() {
-        ClientCache clientCache = mock(ClientCache.class);
+    GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+    when(config.isDurable()).thenReturn(true);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installOnGeode(config, geodeContext, null, "", false);
+    verify(clientCache, times(1)).readyForEvents();
+  }
 
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        when(geodeContext.getClientCache()).thenReturn(clientCache);
+  @Test
+  public void cqIsInvokedForEveryRegionWithATopic() {
+    ClientCache clientCache = mock(ClientCache.class);
 
-        Map<String, List<String>> regionToTopicsMap = new HashMap<>();
-        regionToTopicsMap.put("region1", new ArrayList());
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
-        when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
+    Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+    regionToTopicsMap.put("region1", new ArrayList());
 
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
-        verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
-    }
+    GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+    when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
 
-    @Test
-    public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
-        ClientCache clientCache = mock(ClientCache.class);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
+    verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
+  }
 
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        when(geodeContext.getClientCache()).thenReturn(clientCache);
+  @Test
+  public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
+    ClientCache clientCache = mock(ClientCache.class);
 
-        Map<String, List<String>> regionToTopicsMap = new HashMap<>();
-        regionToTopicsMap.put("region1", new ArrayList());
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
-        when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
+    Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+    regionToTopicsMap.put("region1", new ArrayList());
 
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installOnGeode(config, geodeContext, createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
-        verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
-    }
+    GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+    when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
 
-    @Test
-    public void readyForEventsIsNotCalledIfNotDurable() {
-        ClientCache clientCache = mock(ClientCache.class);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installOnGeode(config, geodeContext,
+        createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
+    verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(),
+        anyBoolean());
+  }
 
-        GeodeContext geodeContext = mock(GeodeContext.class);
-        when(geodeContext.getClientCache()).thenReturn(clientCache);
+  @Test
+  public void readyForEventsIsNotCalledIfNotDurable() {
+    ClientCache clientCache = mock(ClientCache.class);
 
-        GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
-        when (config.isDurable()).thenReturn(false);
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        task.installOnGeode(config, geodeContext, null, "", false);
-        verify(clientCache, times(0)).readyForEvents();
-    }
+    GeodeContext geodeContext = mock(GeodeContext.class);
+    when(geodeContext.getClientCache()).thenReturn(clientCache);
 
-    @Test
-    public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
-//        BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-//        CqEvent cqEvent = mock(CqEvent.class);
-//        when(cqEvent.getNewValue()).thenReturn("New Value");
-//        GeodeEvent event = mock(GeodeEvent.class);
-//        when(event.getEvent()).thenReturn(cqEvent);
-//        eventBuffer.add(event);
-//
-//        List<String> topics = new ArrayList<>();
-//        topics.add("myTopic");
-//
-//        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-//        task.startForTesting(eventBuffer, topics, 1);
-//        List<SourceRecord> records = task.poll();
-//        assertEquals(1, records.size());
-    }
+    GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+    when(config.isDurable()).thenReturn(false);
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    task.installOnGeode(config, geodeContext, null, "", false);
+    verify(clientCache, times(0)).readyForEvents();
+  }
 
-    @Test
-    public void installOnGeodeShouldCallCq() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-    }
+  @Test
+  public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
+    // BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+    // CqEvent cqEvent = mock(CqEvent.class);
+    // when(cqEvent.getNewValue()).thenReturn("New Value");
+    // GeodeEvent event = mock(GeodeEvent.class);
+    // when(event.getEvent()).thenReturn(cqEvent);
+    // eventBuffer.add(event);
+    //
+    // List<String> topics = new ArrayList<>();
+    // topics.add("myTopic");
+    //
+    // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    // task.startForTesting(eventBuffer, topics, 1);
+    // List<SourceRecord> records = task.poll();
+    // assertEquals(1, records.size());
+  }
+
+  @Test
+  public void installOnGeodeShouldCallCq() {
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+  }
 
 
 
+  @Test
+  public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
+    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    List<String> regionNames = Arrays.asList(new String[] {"region1", "region2", "region3"});
+    Map<String, Map<String, String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
+    assertThat(3, is(sourcePartitions.size()));
+    assertThat(true, is(sourcePartitions.get("region1").get(REGION_PARTITION).equals("region1")));
+    assertThat(true, is(sourcePartitions.get("region2").get(REGION_PARTITION).equals("region2")));
+    assertThat(true, is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
+  }
+
+  @Test
+  public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
+
+  }
+
+  @Test
+  public void shouldNotBeDurableIfDurableClientIdIsNull() {
+
+  }
+
+  @Test
+  public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
+
+  }
 
 
-    @Test
-    public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
-        Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
-        assertThat(3, is(sourcePartitions.size()));
-        assertThat(true, is(sourcePartitions.get("region1").get(REGION_PARTITION).equals("region1")));
-        assertThat(true, is(sourcePartitions.get("region2").get(REGION_PARTITION).equals("region2")));
-        assertThat(true, is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
-    }
-
-    @Test
-    public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
-
-    }
-
-    @Test
-    public void shouldNotBeDurableIfDurableClientIdIsNull() {
-
-    }
-
-    @Test
-    public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
-
-    }
+  @Test
+  public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+    // GeodeContext geodeContext = mock(GeodeContext.class);
+    // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+  }
 
 
-    @Test
-    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-//        GeodeContext geodeContext = mock(GeodeContext.class);
-//        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-    }
-
-
-    private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
-        return new EventBufferSupplier() {
-            @Override
-            public BlockingQueue<GeodeEvent> get() {
-                return eventBuffer;
-            }
-        };
-    }
+  private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
+    return new EventBufferSupplier() {
+      @Override
+      public BlockingQueue<GeodeEvent> get() {
+        return eventBuffer;
+      }
+    };
+  }
 }
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index 3e61c77..6632d75 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -14,53 +14,53 @@
  */
 package geode.kafka.source;
 
-import org.junit.Test;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.junit.Test;
 
 public class GeodeKafkaSourceTest {
 
-    @Test
-    public void taskConfigsCreatesMaxNumberOfTasks() {
-        GeodeKafkaSource source = new GeodeKafkaSource();
-        Map<String, String> props = new HashMap();
-        props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
-        source.start(props);
-        Collection<Map<String,String>> tasks = source.taskConfigs(5);
-        assertEquals(5, tasks.size());
-    }
+  @Test
+  public void taskConfigsCreatesMaxNumberOfTasks() {
+    GeodeKafkaSource source = new GeodeKafkaSource();
+    Map<String, String> props = new HashMap();
+    props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+    source.start(props);
+    Collection<Map<String, String>> tasks = source.taskConfigs(5);
+    assertEquals(5, tasks.size());
+  }
 
-    @Test
-    public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
-        GeodeKafkaSource source = new GeodeKafkaSource();
-        Map<String, String> props = new HashMap();
-        props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
-        source.start(props);
-        Collection<Map<String,String>> tasks = source.taskConfigs(5);
-        for(Map<String, String> prop : tasks) {
-            assertEquals("[someRegion:someTopic]", prop.get(REGION_TO_TOPIC_BINDINGS));
-        }
+  @Test
+  public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
+    GeodeKafkaSource source = new GeodeKafkaSource();
+    Map<String, String> props = new HashMap();
+    props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+    source.start(props);
+    Collection<Map<String, String>> tasks = source.taskConfigs(5);
+    for (Map<String, String> prop : tasks) {
+      assertEquals("[someRegion:someTopic]", prop.get(REGION_TO_TOPIC_BINDINGS));
     }
+  }
 
-    @Test
-    public void eachTaskHasUniqueTaskIds() {
-        GeodeKafkaSource sink = new GeodeKafkaSource();
-        Map<String, String> props = new HashMap();
-        props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
-        sink.start(props);
-        Collection<Map<String,String>> tasks = sink.taskConfigs(5);
-        HashSet<String> seenIds = new HashSet();
-        for(Map<String, String> taskProp : tasks) {
-            assertTrue(seenIds.add(taskProp.get(TASK_ID)));
-        }
+  @Test
+  public void eachTaskHasUniqueTaskIds() {
+    GeodeKafkaSource sink = new GeodeKafkaSource();
+    Map<String, String> props = new HashMap();
+    props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+    sink.start(props);
+    Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+    HashSet<String> seenIds = new HashSet();
+    for (Map<String, String> taskProp : tasks) {
+      assertTrue(seenIds.add(taskProp.get(TASK_ID)));
     }
+  }
 
 }
diff --git a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index c75e1b8..a545a72 100644
--- a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -14,26 +14,26 @@
  */
 package geode.kafka.source;
 
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
 import static geode.kafka.GeodeConnectorConfig.LOCATORS;
 import static geode.kafka.GeodeConnectorConfig.TASK_ID;
 import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
 public class GeodeSourceConnectorConfigTest {
 
-    @Test
-    public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
-        Map<String, String> props = new HashMap<>();
-        props.put(TASK_ID, "0");
-        props.put(DURABLE_CLIENT_ID_PREFIX, "");
-        props.put(LOCATORS, "localhost[10334]");
-        GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
-        assertEquals("", config.getDurableClientId());
-    }
+  @Test
+  public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
+    Map<String, String> props = new HashMap<>();
+    props.put(TASK_ID, "0");
+    props.put(DURABLE_CLIENT_ID_PREFIX, "");
+    props.put(LOCATORS, "localhost[10334]");
+    GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
+    assertEquals("", config.getDurableClientId());
+  }
 
 }
diff --git a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
index b4a429b..87a0e07 100644
--- a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
+++ b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -1,51 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package geode.kafka.source;
 
-import org.junit.Test;
-
-import java.util.concurrent.BlockingQueue;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+import java.util.concurrent.BlockingQueue;
+
+import org.junit.Test;
+
 public class SharedEventBufferSupplierTest {
 
-    @Test
-    public void creatingNewSharedEventSupplierShouldCreateInstance() {
-        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
-        assertNotNull(supplier.get());
-    }
+  @Test
+  public void creatingNewSharedEventSupplierShouldCreateInstance() {
+    SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+    assertNotNull(supplier.get());
+  }
 
-    @Test
-    public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
-        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
-        BlockingQueue<GeodeEvent> queue = supplier.get();
-        supplier = new SharedEventBufferSupplier(1);
-        assertEquals(queue, supplier.get());
-    }
+  @Test
+  public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
+    SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+    BlockingQueue<GeodeEvent> queue = supplier.get();
+    supplier = new SharedEventBufferSupplier(1);
+    assertEquals(queue, supplier.get());
+  }
 
-    @Test
-    public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
-        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
-        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
-        assertEquals(supplier.get(), newSupplier.get());
-    }
+  @Test
+  public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
+    SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+    SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+    assertEquals(supplier.get(), newSupplier.get());
+  }
 
-    @Test
-    public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
-        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
-        BlockingQueue<GeodeEvent> queue = supplier.get();
-        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
-        assertNotEquals(queue, newSupplier.get());
-    }
+  @Test
+  public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
+    SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+    BlockingQueue<GeodeEvent> queue = supplier.get();
+    SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+    assertNotEquals(queue, newSupplier.get());
+  }
 
-    @Test
-    public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
-        SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
-        GeodeEvent geodeEvent = mock(GeodeEvent.class);
-        supplier.get().add(geodeEvent);
-        SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
-        assertEquals(geodeEvent, newSupplier.get().poll());
-    }
+  @Test
+  public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
+    SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+    GeodeEvent geodeEvent = mock(GeodeEvent.class);
+    supplier.get().add(geodeEvent);
+    SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+    assertEquals(geodeEvent, newSupplier.get().poll());
+  }
 }