Addition of rat and spolessApply
* All offending files were also corrected.
diff --git a/build.gradle b/build.gradle
index 114b3cd..124ef4b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,9 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+buildscript {
+ repositories {
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url "https://dl.bintray.com/palantir/releases" }
+ jcenter()
+ }
+ dependencies {
+ classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.21.1'
+ classpath 'com.github.ben-manes:gradle-versions-plugin:0.21.0'
+ classpath 'com.netflix.nebula:gradle-lint-plugin:11.4.4'
+ classpath 'com.netflix.nebula:nebula-project-plugin:6.0.2'
+ classpath 'gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.22.1'
+ classpath 'io.spring.gradle:dependency-management-plugin:1.0.7.RELEASE'
+ classpath 'org.ajoberstar.grgit:grgit-gradle:3.1.1'
+ classpath 'org.nosphere.apache:creadur-rat-gradle:0.4.0'
+ classpath 'org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.7'
+ }
+}
plugins {
id 'java'
}
-group 'org.apache.geode'
-version '1.0-SNAPSHOT'
+apply plugin: 'wrapper'
+apply plugin: 'nebula.facet'
+apply plugin: 'java-library'
+apply plugin: 'idea'
+apply plugin: 'eclipse'
+
+tasks.register('devBuild') {
+ description "A convenience target for a typical developer workflow: apply spotless and assemble all classes."
+ dependsOn tasks.named('assemble')
+ // Each subproject injects its SpotlessApply as a dependency to this task in the standard config
+}
+
+
+apply from: "${rootDir}/${scriptDir}/spotless.gradle"
+apply from: "${scriptDir}/rat.gradle"
+
sourceCompatibility = 1.8
@@ -12,22 +60,22 @@
}
dependencies {
-
- compile 'org.apache.geode:geode-core:1.10.0'
- compile 'org.apache.geode:geode-cq:1.10.0'
+ compile('org.apache.geode:geode-core:1.10.0')
+ compile('org.apache.geode:geode-cq:1.10.0')
compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
- compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
- compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
+ compile(group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0')
+ compile(group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0')
testCompile(group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1')
testCompile(group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: '1.1.0')
testCompile(group: 'org.apache.curator', name: 'curator-framework', version: '4.2.0')
testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1')
- testCompile group: 'junit', name: 'junit', version: '4.12'
- testCompile 'org.mockito:mockito-core:3.2.4'
- testCompile 'pl.pragmatists:JUnitParams:1.1.1'
+ testCompile(group: 'junit', name: 'junit', version: '4.12')
+ testCompile('org.mockito:mockito-core:3.2.4')
+ testCompile('pl.pragmatists:JUnitParams:1.1.1')
testImplementation 'org.awaitility:awaitility:4.0.2'
}
+
diff --git a/etc/eclipse-java-google-style.xml b/etc/eclipse-java-google-style.xml
new file mode 100644
index 0000000..629ef19
--- /dev/null
+++ b/etc/eclipse-java-google-style.xml
@@ -0,0 +1,337 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="13">
+<profile kind="CodeFormatterProfile" name="GoogleStyle" version="13">
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.7"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.7"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.7"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_field" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_field.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_local_variable" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_local_variable.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_method" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_method.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_package" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_package.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_parameter" value="1040"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_parameter.count_dependent" value="1040|-1|1040"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_type" value="1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_annotations_on_type.count_dependent" value="1585|-1|1585"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression.count_dependent" value="16|4|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_cascading_method_invocation_with_arguments" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_cascading_method_invocation_with_arguments.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants.count_dependent" value="16|5|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_field_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_for_statement" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_generic_type_arguments" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_generic_type_arguments.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_local_variable_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields.count_dependent" value="16|-1|16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_new_anonymous_class" value="20"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration.count_dependent" value="16|5|80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_resources_in_try" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration.count_dependent" value="16|4|49"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration.count_dependent" value="16|4|48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="100"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment_new_line_at_start_of_html_paragraph" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.force_if_else_statement_brace" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comment_prefix" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="100"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="3"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_comment_inline_tags" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_local_variable_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_member_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_package_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_parameter_annotation" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_non_simple_type_annotation" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_prefer_two_fragments" value="false"/>
+</profile>
+</profiles>
diff --git a/etc/eclipseOrganizeImports.importorder b/etc/eclipseOrganizeImports.importorder
new file mode 100644
index 0000000..de96cae
--- /dev/null
+++ b/etc/eclipseOrganizeImports.importorder
@@ -0,0 +1,8 @@
+#Organize Import Order
+#Thu Sep 15 13:10:33 PDT 2016
+5=com.gemstone
+4=org.apache.geode
+3=
+2=javax
+1=java
+0=\#
diff --git a/etc/intellij-java-modified-google-style.xml b/etc/intellij-java-modified-google-style.xml
new file mode 100644
index 0000000..696500c
--- /dev/null
+++ b/etc/intellij-java-modified-google-style.xml
@@ -0,0 +1,255 @@
+<code_scheme name="GeodeStyle">
+ <option name="JAVA_INDENT_OPTIONS">
+ <value>
+ <option name="INDENT_SIZE" value="2" />
+ <option name="CONTINUATION_INDENT_SIZE" value="4" />
+ <option name="TAB_SIZE" value="8" />
+ <option name="USE_TAB_CHARACTER" value="false" />
+ <option name="SMART_TABS" value="false" />
+ <option name="LABEL_INDENT_SIZE" value="0" />
+ <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
+ </value>
+ </option>
+ <option name="OTHER_INDENT_OPTIONS">
+ <value>
+ <option name="INDENT_SIZE" value="2" />
+ <option name="CONTINUATION_INDENT_SIZE" value="4" />
+ <option name="TAB_SIZE" value="8" />
+ <option name="USE_TAB_CHARACTER" value="false" />
+ <option name="SMART_TABS" value="false" />
+ <option name="LABEL_INDENT_SIZE" value="0" />
+ <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
+ </value>
+ </option>
+ <option name="LINE_SEPARATOR" value="
" />
+ <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
+ <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="99" />
+ <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+ <value />
+ </option>
+ <option name="IMPORT_LAYOUT_TABLE">
+ <value>
+ <package name="" withSubpackages="true" static="true" />
+ <emptyLine />
+ <package name="java" withSubpackages="true" static="false" />
+ <emptyLine />
+ <package name="javax" withSubpackages="true" static="false" />
+ <emptyLine />
+ <package name="" withSubpackages="true" static="false" />
+ <emptyLine />
+ <package name="org.apache.geode" withSubpackages="true" static="false" />
+ <emptyLine />
+ <package name="com.gemstone" withSubpackages="true" static="false" />
+ </value>
+ </option>
+ <option name="STATIC_METHODS_ORDER_WEIGHT" value="5" />
+ <option name="METHODS_ORDER_WEIGHT" value="4" />
+ <option name="RIGHT_MARGIN" value="100" />
+ <option name="JD_ALIGN_PARAM_COMMENTS" value="false" />
+ <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false" />
+ <option name="JD_ADD_BLANK_AFTER_DESCRIPTION" value="false" />
+ <option name="JD_P_AT_EMPTY_LINES" value="false" />
+ <option name="JD_KEEP_EMPTY_PARAMETER" value="false" />
+ <option name="JD_KEEP_EMPTY_EXCEPTION" value="false" />
+ <option name="JD_KEEP_EMPTY_RETURN" value="false" />
+ <option name="HTML_KEEP_BLANK_LINES" value="1" />
+ <option name="HTML_ALIGN_TEXT" value="true" />
+ <option name="KEEP_LINE_BREAKS" value="false" />
+ <option name="KEEP_FIRST_COLUMN_COMMENT" value="false" />
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+ <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1" />
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1" />
+ <option name="BLANK_LINES_AROUND_CLASS" value="0" />
+ <option name="BLANK_LINES_AROUND_FIELD" value="1" />
+ <option name="BLANK_LINES_AFTER_CLASS_HEADER" value="1" />
+ <option name="BRACE_STYLE" value="2" />
+ <option name="CLASS_BRACE_STYLE" value="2" />
+ <option name="METHOD_BRACE_STYLE" value="2" />
+ <option name="ELSE_ON_NEW_LINE" value="true" />
+ <option name="WHILE_ON_NEW_LINE" value="true" />
+ <option name="CATCH_ON_NEW_LINE" value="true" />
+ <option name="FINALLY_ON_NEW_LINE" value="true" />
+ <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+ <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_ASSIGNMENT" value="true" />
+ <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_THROWS_LIST" value="true" />
+ <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+ <option name="ALIGN_MULTILINE_PARENTHESIZED_EXPRESSION" value="true" />
+ <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+ <option name="SPACE_AFTER_TYPE_CAST" value="false" />
+ <option name="SPACE_BEFORE_IF_PARENTHESES" value="false" />
+ <option name="SPACE_BEFORE_WHILE_PARENTHESES" value="false" />
+ <option name="SPACE_BEFORE_FOR_PARENTHESES" value="false" />
+ <option name="SPACE_BEFORE_CATCH_PARENTHESES" value="false" />
+ <option name="SPACE_BEFORE_SWITCH_PARENTHESES" value="false" />
+ <option name="SPACE_BEFORE_SYNCHRONIZED_PARENTHESES" value="false" />
+ <option name="CALL_PARAMETERS_WRAP" value="1" />
+ <option name="CALL_PARAMETERS_LPAREN_ON_NEXT_LINE" value="true" />
+ <option name="METHOD_PARAMETERS_WRAP" value="1" />
+ <option name="EXTENDS_LIST_WRAP" value="1" />
+ <option name="THROWS_LIST_WRAP" value="1" />
+ <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+ <option name="THROWS_KEYWORD_WRAP" value="1" />
+ <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+ <option name="BINARY_OPERATION_WRAP" value="1" />
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+ <option name="TERNARY_OPERATION_WRAP" value="1" />
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="FOR_STATEMENT_WRAP" value="1" />
+ <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+ <option name="ASSIGNMENT_WRAP" value="5" />
+ <option name="WRAP_COMMENTS" value="true" />
+ <option name="IF_BRACE_FORCE" value="3" />
+ <option name="DOWHILE_BRACE_FORCE" value="3" />
+ <option name="WHILE_BRACE_FORCE" value="3" />
+ <option name="FOR_BRACE_FORCE" value="3" />
+ <JavaCodeStyleSettings>
+ <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true" />
+ <option name="CLASS_NAMES_IN_JAVADOC" value="3" />
+ </JavaCodeStyleSettings>
+ <MarkdownNavigatorCodeStyleSettings>
+ <option name="RIGHT_MARGIN" value="72" />
+ </MarkdownNavigatorCodeStyleSettings>
+ <XML>
+ <option name="XML_KEEP_BLANK_LINES" value="1" />
+ <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
+ </XML>
+ <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+ <option name="INDENT_SIZE" value="2" />
+ </ADDITIONAL_INDENT_OPTIONS>
+ <ADDITIONAL_INDENT_OPTIONS fileType="java">
+ <option name="INDENT_SIZE" value="2" />
+ <option name="CONTINUATION_INDENT_SIZE" value="4" />
+ <option name="TAB_SIZE" value="8" />
+ </ADDITIONAL_INDENT_OPTIONS>
+ <ADDITIONAL_INDENT_OPTIONS fileType="js">
+ <option name="CONTINUATION_INDENT_SIZE" value="4" />
+ </ADDITIONAL_INDENT_OPTIONS>
+ <ADDITIONAL_INDENT_OPTIONS fileType="sass">
+ <option name="INDENT_SIZE" value="2" />
+ </ADDITIONAL_INDENT_OPTIONS>
+ <ADDITIONAL_INDENT_OPTIONS fileType="yml">
+ <option name="INDENT_SIZE" value="2" />
+ </ADDITIONAL_INDENT_OPTIONS>
+ <codeStyleSettings language="ECMA Script Level 4">
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+ <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+ <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+ <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+ <option name="CALL_PARAMETERS_WRAP" value="1" />
+ <option name="METHOD_PARAMETERS_WRAP" value="1" />
+ <option name="EXTENDS_LIST_WRAP" value="1" />
+ <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+ <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+ <option name="BINARY_OPERATION_WRAP" value="1" />
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+ <option name="TERNARY_OPERATION_WRAP" value="1" />
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="FOR_STATEMENT_WRAP" value="1" />
+ <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+ <option name="ASSIGNMENT_WRAP" value="5" />
+ <option name="WRAP_COMMENTS" value="true" />
+ <option name="IF_BRACE_FORCE" value="3" />
+ <option name="DOWHILE_BRACE_FORCE" value="3" />
+ <option name="WHILE_BRACE_FORCE" value="3" />
+ <option name="FOR_BRACE_FORCE" value="3" />
+ <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+ </codeStyleSettings>
+ <codeStyleSettings language="JAVA">
+ <option name="RIGHT_MARGIN" value="100" />
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+ <option name="CALL_PARAMETERS_WRAP" value="1" />
+ <option name="METHOD_PARAMETERS_WRAP" value="1" />
+ <option name="EXTENDS_LIST_WRAP" value="1" />
+ <option name="THROWS_LIST_WRAP" value="1" />
+ <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+ <option name="THROWS_KEYWORD_WRAP" value="1" />
+ <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+ <option name="BINARY_OPERATION_WRAP" value="1" />
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+ <option name="TERNARY_OPERATION_WRAP" value="1" />
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="FOR_STATEMENT_WRAP" value="1" />
+ <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+ <option name="ASSIGNMENT_WRAP" value="5" />
+ <option name="IF_BRACE_FORCE" value="3" />
+ <option name="DOWHILE_BRACE_FORCE" value="3" />
+ <option name="WHILE_BRACE_FORCE" value="3" />
+ <option name="FOR_BRACE_FORCE" value="3" />
+ <indentOptions>
+ <option name="INDENT_SIZE" value="2" />
+ <option name="CONTINUATION_INDENT_SIZE" value="4" />
+ <option name="TAB_SIZE" value="8" />
+ </indentOptions>
+ </codeStyleSettings>
+ <codeStyleSettings language="JavaScript">
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+ <option name="ALIGN_MULTILINE_PARAMETERS_IN_CALLS" value="true" />
+ <option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
+ <option name="CALL_PARAMETERS_WRAP" value="1" />
+ <option name="METHOD_PARAMETERS_WRAP" value="1" />
+ <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+ <option name="BINARY_OPERATION_WRAP" value="1" />
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+ <option name="TERNARY_OPERATION_WRAP" value="1" />
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="FOR_STATEMENT_WRAP" value="1" />
+ <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+ <option name="ASSIGNMENT_WRAP" value="5" />
+ <option name="WRAP_COMMENTS" value="true" />
+ <option name="IF_BRACE_FORCE" value="3" />
+ <option name="DOWHILE_BRACE_FORCE" value="3" />
+ <option name="WHILE_BRACE_FORCE" value="3" />
+ <option name="FOR_BRACE_FORCE" value="3" />
+ <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+ </codeStyleSettings>
+ <codeStyleSettings language="PHP">
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false" />
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1" />
+ <option name="BLANK_LINES_AFTER_CLASS_HEADER" value="1" />
+ <option name="ALIGN_MULTILINE_ASSIGNMENT" value="true" />
+ <option name="ALIGN_MULTILINE_TERNARY_OPERATION" value="true" />
+ <option name="ALIGN_MULTILINE_THROWS_LIST" value="true" />
+ <option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
+ <option name="ALIGN_MULTILINE_PARENTHESIZED_EXPRESSION" value="true" />
+ <option name="CALL_PARAMETERS_WRAP" value="1" />
+ <option name="METHOD_PARAMETERS_WRAP" value="1" />
+ <option name="EXTENDS_LIST_WRAP" value="1" />
+ <option name="THROWS_LIST_WRAP" value="1" />
+ <option name="EXTENDS_KEYWORD_WRAP" value="1" />
+ <option name="THROWS_KEYWORD_WRAP" value="1" />
+ <option name="METHOD_CALL_CHAIN_WRAP" value="1" />
+ <option name="BINARY_OPERATION_WRAP" value="1" />
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
+ <option name="TERNARY_OPERATION_WRAP" value="1" />
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="FOR_STATEMENT_WRAP" value="1" />
+ <option name="ARRAY_INITIALIZER_WRAP" value="1" />
+ <option name="ASSIGNMENT_WRAP" value="5" />
+ <option name="WRAP_COMMENTS" value="true" />
+ <option name="IF_BRACE_FORCE" value="3" />
+ <option name="DOWHILE_BRACE_FORCE" value="3" />
+ <option name="WHILE_BRACE_FORCE" value="3" />
+ <option name="FOR_BRACE_FORCE" value="3" />
+ <option name="PARENT_SETTINGS_INSTALLED" value="true" />
+ <indentOptions>
+ <option name="INDENT_SIZE" value="4" />
+ <option name="CONTINUATION_INDENT_SIZE" value="8" />
+ <option name="TAB_SIZE" value="4" />
+ <option name="USE_TAB_CHARACTER" value="false" />
+ <option name="SMART_TABS" value="false" />
+ <option name="LABEL_INDENT_SIZE" value="0" />
+ <option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
+ </indentOptions>
+ </codeStyleSettings>
+</code_scheme>
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..7e073d9
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version = '1.0-SNAPSHOT'
+group = 'org.apache.geode'
+
+
+# 'apply from:' location for gradle scripts, relative to the project root. Specified here so that
+# it may be overridden by external projects or custom develop environment configurations
+scriptDir = gradle
+
+productName = Apache Geode Kafka Connector
+productOrg = Apache Software Foundation (ASF)
+
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
new file mode 100644
index 0000000..75ccac3
--- /dev/null
+++ b/gradle/rat.gradle
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: "org.nosphere.apache.rat"
+
+rat {
+ inputDir = rootDir
+ excludes = [
+ // git
+ '.git/**',
+ '**/.gitignore',
+ '**/.gitkeep',
+
+ // gradle
+ '**/.gradle/**',
+ 'gradlew',
+ 'gradlew.bat',
+ 'gradle/wrapper/gradle-wrapper.properties',
+ 'caches/**',
+ 'daemon/**',
+ 'native/**',
+ 'wrapper/**',
+ '**/build/**',
+ '**/build-*/**',
+ '.buildinfo',
+ // IDE
+ 'etc/eclipse-java-google-style.xml',
+ 'etc/intellij-java-modified-google-style.xml',
+ 'etc/eclipseOrganizeImports.importorder',
+ '**/.project',
+ '**/.classpath',
+ '**/.settings/**',
+ '**/build-eclipse/**',
+ '**/*.iml',
+ '**/*.ipr',
+ '**/*.iws',
+ '.idea/**',
+ '**/tags',
+ '**/out/**',
+
+ // text files
+ '**/*.fig',
+ '**/*.txt',
+ '**/*.md',
+ '**/*.json',
+ '**/*.tx0',
+ '**/*.txo',
+ '**/*.log',
+ '**/*.patch',
+ '**/*.diff',
+ '**/*.rej',
+ '**/*.orig',
+ '**/*.MF',
+
+ // binary files
+ '**/*.cer',
+ '**/*.dia',
+ '**/*.gfs',
+ '**/*.gif',
+ '**/*.ico',
+ '**/*.jpg',
+ '**/*.keystore',
+ '**/*.pdf',
+ '**/*.png',
+ '**/*.ser',
+ '**/*.svg',
+ '**/*.truststore',
+ '**/*.xls',
+ '**/publickeyfile',
+ '**/*.dat',
+
+ // other text files
+ '**/log4j*.xml',
+ '.java-version', // file created by `jenv local`
+ '**/META-INF/**',
+ ]
+}
+
+check.dependsOn rat
\ No newline at end of file
diff --git a/gradle/spotless.gradle b/gradle/spotless.gradle
new file mode 100644
index 0000000..70d3c95
--- /dev/null
+++ b/gradle/spotless.gradle
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// When a custom step changes, we need to bump the value passed to the method
+// bumpThisNumberIfACustomStepChanges
+// This has been historically easy to forget, however, and can cause failures in some rare cases.
+// To safeguard against this, we instead use the (partial) md5 of this file as that method input.
+def thisFile = file("${rootDir}/${scriptDir}/spotless.gradle")
+def thisFileMd5 = thisFile.text.md5() as String
+def thisFileMd5Piece = thisFileMd5.substring(0, 8)
+def thisFileIntegerHash = Integer.parseUnsignedInt(thisFileMd5Piece, 16)
+logger.debug("Using partial md5 (${thisFileIntegerHash}) of file ${thisFile} as spotless bump.")
+
+project.ext.set("spotless-file-hash", thisFileIntegerHash)
+
+
+apply plugin: "com.diffplug.gradle.spotless"
+spotless {
+ lineEndings = 'unix'
+ java {
+ target project.fileTree(project.projectDir) {
+ include '**/*.java'
+ exclude '**/generated-src/**'
+ exclude '**/build/**'
+ }
+
+ // As the method name suggests, bump this number if any of the below "custom" rules change.
+ // Spotless will not run on unchanged files unless this number changes.
+ bumpThisNumberIfACustomStepChanges(project.ext.'spotless-file-hash')
+
+ removeUnusedImports()
+
+ custom 'Remove commented-out import statements', {
+ it.replaceAll(/\n\/\/ import .*?;.*/, '')
+ }
+ custom 'Refuse wildcard imports', {
+ // Wildcard imports can't be resolved by spotless itself.
+ // This will require the developer themselves to adhere to best practices.
+ if (it =~ /\nimport .*\*;/) {
+ throw new AssertionError("Do not use wildcard imports. 'spotlessApply' cannot resolve this issue.")
+ }
+ }
+// custom 'Refuse Awaitility import', {
+// if (it =~ /import\s+(static\s+)?org.awaitility.Awaitility.*/) {
+// throw new AssertionError("Do not use Awaitility.await(). Use GeodeAwaitility.await() instead. 'spotlessApply' cannot resolve this issue.")
+// }
+// }
+ importOrderFile "${rootDir}/${scriptDir}/../etc/eclipseOrganizeImports.importorder"
+
+ custom 'Remove unhelpful javadoc stubs', {
+ // e.g., remove the following lines:
+ // "* @param paramName"
+ // "* @throws ExceptionType"
+ // "* @return returnType"'
+ // Multiline to allow anchors on newlines
+ it.replaceAll(/(?m)^ *\* *@(?:param|throws|return) *\w* *\n/, '')
+ }
+ custom 'Remove any empty Javadocs and block comments', {
+ // Matches any /** [...] */ or /* [...] */ that contains:
+ // (a) only whitespace
+ // (b) trivial information, such as "@param paramName" or @throws ExceptionType
+ // without any additional information. This information is implicit in the signature.
+ it.replaceAll(/\/\*+\s*\n(\s*\*\s*\n)*\s*\*+\/\s*\n/, '')
+ }
+
+ // Enforce style modifier order
+ custom 'Modifier ordering', {
+ def modifierRanking = [
+ "public" : 1,
+ "protected" : 2,
+ "private" : 3,
+ "abstract" : 4,
+ "default" : 5,
+ "static" : 6,
+ "final" : 7,
+ "transient" : 8,
+ "volatile" : 9,
+ "synchronized": 10,
+ "native" : 11,
+ "strictfp" : 12]
+ // Find any instance of multiple modifiers. Lead with a non-word character to avoid
+ // accidental matching against for instance, "an alternative default value"
+ it.replaceAll(/\W(?:public |protected |private |abstract |default |static |final |transient |volatile |synchronized |native |strictfp ){2,}/, {
+ // Do not replace the leading non-word character. Identify the modifiers
+ it.replaceAll(/(?:public |protected |private |abstract |default |static |final |transient |volatile |synchronized |native |strictfp ){2,}/, {
+ // Sort the modifiers according to the ranking above
+ it.split().sort({ modifierRanking[it] }).join(' ') + ' '
+ }
+ )
+ }
+ )
+ }
+
+
+ // Notes on eclipse formatter version:
+ // 4.6.3 is consistent with existing / previous behavior.
+ // 4.7.1 works, but had different default whitespace rules, notably with mid-ternary linebreak.
+ // 4.7.2 exists but is currently incompatible with our style file, raising NPEs.
+
+ // The format file is relative to geode-core and not the root project as the root project would change
+ // if Geode and submodules are included as part of a different gradle project.
+ eclipse('4.6.3').configFile "${rootDir}/${scriptDir}/../etc/eclipse-java-google-style.xml"
+ trimTrailingWhitespace()
+ endWithNewline()
+ }
+
+
+ groovyGradle {
+ target project.fileTree(project.projectDir) {
+ include '**/*.gradle'
+ exclude '**/generated-src/**'
+ exclude '**/build/**'
+ }
+
+ // As the method name suggests, bump this number if any of the below "custom" rules change.
+ // Spotless will not run on unchanged files unless this number changes.
+ bumpThisNumberIfACustomStepChanges(project.ext.'spotless-file-hash')
+
+ custom 'Use single-quote in project directives.', {
+ it.replaceAll(/project\(":([^"]*)"\)/, 'project(\':$1\')')
+ }
+
+ custom 'Use parenthesis in single-line gradle dependency declarations.', {
+ it.replaceAll(/\n(\s*\S*(?:[cC]ompile|[rR]untime)(?:Only)?) (?!\()([^{\n]*)\n/, { original, declaration, dep ->
+ "\n${declaration}(${dep})\n"
+ })
+ }
+
+ custom 'Do not pad spaces before parenthesis in gradle dependency declaration.', {
+ it.replaceAll(/\n(\s*\S*(?:[cC]ompile|[rR]untime)(?:Only)?) +\(/, '\n$1(')
+ }
+
+ indentWithSpaces(2)
+ }
+}
+
+// If we add more languages to Spotless, add them to 'compileXYZ' trigger below
+afterEvaluate {
+ // Not all projects are java projects. findByName could return null, so use the null-safe ?. operator
+ project.tasks.findByName('compileJava')?.mustRunAfter(spotlessCheck)
+ project.tasks.findByName('compileJava')?.mustRunAfter(spotlessApply)
+
+ // Within the configure block, 'project' refers to the task-owning project, in this case rootProject
+ def thisProjectScoped = project
+ rootProject.tasks.named('devBuild').configure {
+ dependsOn thisProjectScoped.tasks.named('spotlessApply')
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 48eed3b..546b8ec 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1 +1,15 @@
+//Licensed to the Apache Software Foundation (ASF) under one or more
+//contributor license agreements. See the NOTICE file distributed with
+//this work for additional information regarding copyright ownership.
+//The ASF licenses this file to You under the Apache License, Version 2.0
+//(the "License"); you may not use this file except in compliance with
+//the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing, software
+//distributed under the License is distributed on an "AS IS" BASIS,
+//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//See the License for the specific language governing permissions and
+//limitations under the License.
rootProject.name = 'geode-kafka-connector'
\ No newline at end of file
diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java
index efffdcd..7702765 100644
--- a/src/main/java/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -23,105 +23,106 @@
public class GeodeConnectorConfig {
- //GeodeKafka Specific Configuration
- /**
- * Identifier for each task
- */
- public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
- /**
- * Specifies which Locators to connect to Apache Geode
- */
- public static final String LOCATORS = "locators";
- public static final String DEFAULT_LOCATOR = "localhost[10334]";
- public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
+ // GeodeKafka Specific Configuration
+ /**
+ * Identifier for each task
+ */
+ public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
+ /**
+ * Specifies which Locators to connect to Apache Geode
+ */
+ public static final String LOCATORS = "locators";
+ public static final String DEFAULT_LOCATOR = "localhost[10334]";
+ public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
- protected final int taskId;
- protected List<LocatorHostPort> locatorHostPorts;
- private String securityClientAuthInit;
+ protected final int taskId;
+ protected List<LocatorHostPort> locatorHostPorts;
+ private String securityClientAuthInit;
- protected GeodeConnectorConfig() {
- taskId = 0;
+ protected GeodeConnectorConfig() {
+ taskId = 0;
+ }
+
+ public GeodeConnectorConfig(Map<String, String> connectorProperties) {
+ taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+ locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+ securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
+ }
+
+
+ public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
+ // It's the same formatting, so parsing is the same going topic to region or region to topic
+ return parseRegionToTopics(combinedBindings);
+ }
+
+ /**
+ * Given a string of the form [region:topic,...] will produce a map where the key is the
+ * regionName and the value is a list of topicNames to push values to
+ *
+ * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
+ * @return mapping of regionName to list of topics to update
+ */
+ public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
+ if (combinedBindings == null || combinedBindings.equals("")) {
+ return new HashMap();
}
+ List<String> bindings = parseBindings(combinedBindings);
+ return bindings.stream().map(binding -> {
+ String[] regionToTopicsArray = parseBinding(binding);
+ return regionToTopicsArray;
+ }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0],
+ regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
+ }
- public GeodeConnectorConfig(Map<String, String> connectorProperties) {
- taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
- locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
- securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
- }
+ public static List<String> parseBindings(String bindings) {
+ return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
+ s = s.replaceAll("\\[", "");
+ s = s.replaceAll("\\]", "");
+ s = s.trim();
+ return s;
+ }).collect(Collectors.toList());
+ }
+ private static String[] parseBinding(String binding) {
+ return binding.split(":");
+ }
- public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) {
- //It's the same formatting, so parsing is the same going topic to region or region to topic
- return parseRegionToTopics(combinedBindings);
- }
+ // Used to parse a string of topics or regions
+ public static List<String> parseStringByComma(String string) {
+ return parseStringBy(string, ",");
+ }
- /**
- * Given a string of the form [region:topic,...] will produce a map where the key is the
- * regionName and the value is a list of topicNames to push values to
- *
- * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...]
- * @return mapping of regionName to list of topics to update
- */
- public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) {
- if (combinedBindings == null || combinedBindings.equals("")){
- return new HashMap();
- }
- List<String> bindings = parseBindings(combinedBindings);
- return bindings.stream().map(binding -> {
- String[] regionToTopicsArray = parseBinding(binding);
- return regionToTopicsArray;
- }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
- }
+ public static List<String> parseStringBy(String string, String regex) {
+ return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
+ }
- public static List<String> parseBindings(String bindings) {
- return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> {
- s = s.replaceAll("\\[", "");
- s = s.replaceAll("\\]", "");
- s = s.trim();
- return s;
- }).collect(Collectors.toList());
- }
+ public static String reconstructString(Collection<String> strings) {
+ return strings.stream().collect(Collectors.joining("],["));
+ }
- private static String[] parseBinding(String binding) {
- return binding.split(":");
- }
+ List<LocatorHostPort> parseLocators(String locators) {
+ return Arrays.stream(locators.split(",")).map((s) -> {
+ String locatorString = s.trim();
+ return parseLocator(locatorString);
+ }).collect(Collectors.toList());
+ }
- //Used to parse a string of topics or regions
- public static List<String> parseStringByComma(String string) {
- return parseStringBy(string, ",");
- }
+ private LocatorHostPort parseLocator(String locatorString) {
+ String[] splits = locatorString.split("\\[");
+ String locator = splits[0];
+ int port = Integer.parseInt(splits[1].replace("]", ""));
+ return new LocatorHostPort(locator, port);
+ }
- public static List<String> parseStringBy(String string, String regex) {
- return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList());
- }
+ public int getTaskId() {
+ return taskId;
+ }
- public static String reconstructString(Collection<String> strings) {
- return strings.stream().collect(Collectors.joining("],["));
- }
+ public List<LocatorHostPort> getLocatorHostPorts() {
+ return locatorHostPorts;
+ }
- List<LocatorHostPort> parseLocators(String locators) {
- return Arrays.stream(locators.split(",")).map((s) -> {
- String locatorString = s.trim();
- return parseLocator(locatorString);
- }).collect(Collectors.toList());
- }
-
- private LocatorHostPort parseLocator(String locatorString) {
- String[] splits = locatorString.split("\\[");
- String locator = splits[0];
- int port = Integer.parseInt(splits[1].replace("]", ""));
- return new LocatorHostPort(locator, port);
- }
-
- public int getTaskId() {
- return taskId;
- }
-
- public List<LocatorHostPort> getLocatorHostPorts() {
- return locatorHostPorts;
- }
-
- public String getSecurityClientAuthInit() {
- return securityClientAuthInit;
- }
+ public String getSecurityClientAuthInit() {
+ return securityClientAuthInit;
+ }
}
diff --git a/src/main/java/geode/kafka/GeodeContext.java b/src/main/java/geode/kafka/GeodeContext.java
index ff8a8c3..ba5f9e5 100644
--- a/src/main/java/geode/kafka/GeodeContext.java
+++ b/src/main/java/geode/kafka/GeodeContext.java
@@ -14,6 +14,13 @@
*/
package geode.kafka;
+import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
@@ -21,78 +28,70 @@
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.RegionNotFoundException;
-import org.apache.kafka.connect.errors.ConnectException;
-
-import java.util.Collection;
-import java.util.List;
-
-import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
public class GeodeContext {
- private ClientCache clientCache;
+ private ClientCache clientCache;
- public GeodeContext() {
+ public GeodeContext() {}
+
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
+ String durableClientId, String durableClientTimeout, String securityAuthInit) {
+ clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
+ securityAuthInit);
+ return clientCache;
+ }
+
+ public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
+ String securityAuthInit) {
+ clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
+ return clientCache;
+ }
+
+ public ClientCache getClientCache() {
+ return clientCache;
+ }
+
+ public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
+ String durableClientTimeOut, String securityAuthInit) {
+ ClientCacheFactory ccf = new ClientCacheFactory();
+
+ if (securityAuthInit != null) {
+ ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
-
- public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String durableClientId, String durableClientTimeout, String securityAuthInit) {
- clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit);
- return clientCache;
+ if (!durableClientName.equals("")) {
+ ccf.set("durable-client-id", durableClientName)
+ .set("durable-client-timeout", durableClientTimeOut);
}
+ // currently we only allow using the default pool.
+ // If we ever want to allow adding multiple pools we'll have to configure pool factories
+ ccf.setPoolSubscriptionEnabled(true);
- public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, String securityAuthInit) {
- clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
- return clientCache;
+ for (LocatorHostPort locator : locators) {
+ ccf.addPoolLocator(locator.getHostName(), locator.getPort());
}
+ return ccf.create();
+ }
- public ClientCache getClientCache() {
- return clientCache;
+ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
+ throws ConnectException {
+ try {
+ CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+ cq.execute();
+ return cq;
+ } catch (RegionNotFoundException | CqException | CqExistsException e) {
+ throw new ConnectException(e);
}
+ }
- /**
- *
- * @param locators
- * @param durableClientName
- * @param durableClientTimeOut
- * @return
- */
- public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut, String securityAuthInit) {
- ClientCacheFactory ccf = new ClientCacheFactory();
-
- if (securityAuthInit != null) {
- ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
- }
- if (!durableClientName.equals("")) {
- ccf.set("durable-client-id", durableClientName)
- .set("durable-client-timeout", durableClientTimeOut);
- }
- //currently we only allow using the default pool.
- //If we ever want to allow adding multiple pools we'll have to configure pool factories
- ccf.setPoolSubscriptionEnabled(true);
-
- for (LocatorHostPort locator: locators) {
- ccf.addPoolLocator(locator.getHostName(), locator.getPort());
- }
- return ccf.create();
+ public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
+ boolean isDurable) throws ConnectException {
+ try {
+ CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
+ return cq.executeWithInitialResults();
+ } catch (RegionNotFoundException | CqException | CqExistsException e) {
+ throw new ConnectException(e);
}
-
- public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
- try {
- CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
- cq.execute();
- return cq;
- } catch (RegionNotFoundException | CqException | CqExistsException e) {
- throw new ConnectException(e);
- }
- }
-
- public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
- try {
- CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
- return cq.executeWithInitialResults();
- } catch (RegionNotFoundException | CqException | CqExistsException e) {
- throw new ConnectException(e);
- }
- }
+ }
}
diff --git a/src/main/java/geode/kafka/LocatorHostPort.java b/src/main/java/geode/kafka/LocatorHostPort.java
index 21d0603..191d42d 100644
--- a/src/main/java/geode/kafka/LocatorHostPort.java
+++ b/src/main/java/geode/kafka/LocatorHostPort.java
@@ -16,22 +16,23 @@
public class LocatorHostPort {
- private String hostName;
- private int port;
+ private String hostName;
+ private int port;
- public LocatorHostPort(String hostName, int port) {
- this.hostName = hostName;
- this.port = port;
- }
+ public LocatorHostPort(String hostName, int port) {
+ this.hostName = hostName;
+ this.port = port;
+ }
- public String getHostName() {
- return hostName;
- }
+ public String getHostName() {
+ return hostName;
+ }
- public int getPort() {
- return port;
- }
- public String toString() {
- return hostName + "[" + port + "]";
- }
+ public int getPort() {
+ return port;
+ }
+
+ public String toString() {
+ return hostName + "[" + port + "]";
+ }
}
diff --git a/src/main/java/geode/kafka/sink/BatchRecords.java b/src/main/java/geode/kafka/sink/BatchRecords.java
index c10faaf..0221cbe 100644
--- a/src/main/java/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/geode/kafka/sink/BatchRecords.java
@@ -14,69 +14,67 @@
*/
package geode.kafka.sink;
-import geode.kafka.source.GeodeKafkaSourceTask;
-import org.apache.geode.cache.Region;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.geode.cache.Region;
/**
* A collection of records to put/remove from a region
*/
public class BatchRecords {
- private static final Logger logger = LoggerFactory.getLogger(BatchRecords.class);
+ private static final Logger logger = LoggerFactory.getLogger(BatchRecords.class);
- private Map updateMap;
- private Collection removeList;
+ private Map updateMap;
+ private Collection removeList;
- public BatchRecords() {
- this(new HashMap(), new ArrayList());
+ public BatchRecords() {
+ this(new HashMap(), new ArrayList());
+ }
+
+ /** Used for tests **/
+ public BatchRecords(Map updateMap, Collection removeList) {
+ this.updateMap = updateMap;
+ this.removeList = removeList;
+ }
+
+ public void addRemoveOperation(SinkRecord record) {
+ // if a previous operation added to the update map
+ // let's just remove it so we don't do a put and then a remove
+ // depending on the order of operations (putAll then removeAll or removeAll or putAll)...
+ // ...we could remove one of the if statements.
+ if (updateMap.containsKey(record.key())) {
+ updateMap.remove(record.key());
+ } else {
+ removeList.add(record.key());
}
+ }
- /** Used for tests**/
- public BatchRecords(Map updateMap, Collection removeList) {
- this.updateMap = updateMap;
- this.removeList = removeList;
+ public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
+ // it's assumed the records in are order
+ // if so if a previous value was in the remove list
+ // let's not remove it at the end of this operation
+ if (nullValuesMeansRemove) {
+ if (removeList.contains(record.key())) {
+ removeList.remove(record.key());
+ }
}
+ updateMap.put(record.key(), record.value());
+ }
- public void addRemoveOperation(SinkRecord record) {
- //if a previous operation added to the update map
- //let's just remove it so we don't do a put and then a remove
- //depending on the order of operations (putAll then removeAll or removeAll or putAll)...
- //...we could remove one of the if statements.
- if (updateMap.containsKey(record.key())) {
- updateMap.remove(record.key());
- } else {
- removeList.add(record.key());
- }
+
+ public void executeOperations(Region region) {
+ if (region != null) {
+ region.putAll(updateMap);
+ region.removeAll(removeList);
+ } else {
+ logger.info("Unable to locate proxy region: " + region);
}
-
- public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
- //it's assumed the records in are order
- //if so if a previous value was in the remove list
- // let's not remove it at the end of this operation
- if (nullValuesMeansRemove) {
- if (removeList.contains(record.key())) {
- removeList.remove(record.key());
- }
- }
- updateMap.put(record.key(), record.value());
- }
-
-
- public void executeOperations(Region region) {
- if (region != null) {
- region.putAll(updateMap);
- region.removeAll(removeList);
- }
- else {
- logger.info("Unable to locate proxy region: " + region);
- }
- }
+ }
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 1ccb385..550e8a9 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,72 +14,71 @@
*/
package geode.kafka.sink;
-import geode.kafka.GeodeConnectorConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
+import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static geode.kafka.GeodeConnectorConfig.LOCATORS;
-import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
-import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
-public class GeodeKafkaSink extends SinkConnector {
- private static final ConfigDef CONFIG_DEF = new ConfigDef();
- private Map<String, String> sharedProps;
+public class GeodeKafkaSink extends SinkConnector {
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
+ private Map<String, String> sharedProps;
- @Override
- public void start(Map<String, String> props) {
- sharedProps = computeMissingConfigurations(props);
+ @Override
+ public void start(Map<String, String> props) {
+ sharedProps = computeMissingConfigurations(props);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return GeodeKafkaSinkTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+
+ // All tasks will build up the topic to regions map. A few might not use certain keys but we
+ // have no control over partitioning in kafka and which tasks will fire
+ for (int i = 0; i < maxTasks; i++) {
+ Map<String, String> taskProps = new HashMap<>();
+ taskProps.putAll(sharedProps);
+ taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
+ taskConfigs.add(taskProps);
}
- @Override
- public Class<? extends Task> taskClass() {
- return GeodeKafkaSinkTask.class;
- }
+ return taskConfigs;
+ }
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- List<Map<String, String>> taskConfigs = new ArrayList<>();
+ @Override
+ public void stop() {
- //All tasks will build up the topic to regions map. A few might not use certain keys but we have no control over partitioning in kafka and which tasks will fire
- for (int i = 0; i < maxTasks; i++) {
- Map<String, String> taskProps = new HashMap<>();
- taskProps.putAll(sharedProps);
- taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskConfigs.add(taskProps);
- }
+ }
- return taskConfigs;
- }
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
- @Override
- public void stop() {
-
- }
-
- @Override
- public ConfigDef config() {
- return CONFIG_DEF;
- }
-
- @Override
- public String version() {
- //TODO
- return "unknown";
- }
+ @Override
+ public String version() {
+ // TODO
+ return "unknown";
+ }
- private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
- props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
- props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
- return props;
- }
+ private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
+ props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
+ props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
+ return props;
+ }
}
diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
index bcc5885..3552b2d 100644
--- a/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -14,22 +14,23 @@
*/
package geode.kafka.sink;
-import geode.kafka.GeodeContext;
-import geode.kafka.GeodeSinkConnectorConfig;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionExistsException;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import geode.kafka.GeodeContext;
+import geode.kafka.GeodeSinkConnectorConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+
/**
* TODO javaDoc
@@ -37,109 +38,113 @@
*/
public class GeodeKafkaSinkTask extends SinkTask {
- private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
- private GeodeContext geodeContext;
- private int taskId;
- private Map<String, List<String>> topicToRegions;
- private Map<String, Region> regionNameToRegion;
- private boolean nullValuesMeansRemove = true;
+ private GeodeContext geodeContext;
+ private int taskId;
+ private Map<String, List<String>> topicToRegions;
+ private Map<String, Region> regionNameToRegion;
+ private boolean nullValuesMeansRemove = true;
- /**
- * {@inheritDoc}
- */
- @Override
- public String version() {
- //TODO
- return "unknown";
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String version() {
+ // TODO
+ return "unknown";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ try {
+ GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
+ configure(geodeConnectorConfig);
+ geodeContext = new GeodeContext();
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getSecurityClientAuthInit());
+ regionNameToRegion = createProxyRegions(topicToRegions.values());
+ } catch (Exception e) {
+ logger.error("Unable to start sink task", e);
+ throw e;
}
+ }
- @Override
- public void start(Map<String, String> props) {
- try {
- GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
- configure(geodeConnectorConfig);
- geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getSecurityClientAuthInit());
- regionNameToRegion = createProxyRegions(topicToRegions.values());
- } catch (Exception e) {
- logger.error("Unable to start sink task", e);
- throw e;
- }
+ void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
+ logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
+ taskId = geodeConnectorConfig.getTaskId();
+ topicToRegions = geodeConnectorConfig.getTopicToRegions();
+ nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
+ }
+
+ // For tests only
+ void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
+ this.regionNameToRegion = regionNameToRegion;
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> records) {
+ put(records, new HashMap());
+ }
+
+ void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
+ // spin off a new thread to handle this operation? Downside is ordering and retries...
+ for (SinkRecord record : records) {
+ updateBatchForRegionByTopic(record, batchRecordsMap);
}
+ batchRecordsMap.entrySet().stream().forEach((entry) -> {
+ String region = entry.getKey();
+ BatchRecords batchRecords = entry.getValue();
+ batchRecords.executeOperations(regionNameToRegion.get(region));
+ });
+ }
- void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
- logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
- taskId = geodeConnectorConfig.getTaskId();
- topicToRegions = geodeConnectorConfig.getTopicToRegions();
- nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
+ private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
+ Map<String, BatchRecords> batchRecordsMap) {
+ Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
+ for (String region : regionsToUpdate) {
+ updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
}
+ }
- //For tests only
- void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
- this.regionNameToRegion = regionNameToRegion;
+ private void updateBatchRecordsForRecord(SinkRecord record,
+ Map<String, BatchRecords> batchRecordsMap, String region) {
+ BatchRecords batchRecords = batchRecordsMap.get(region);
+ if (batchRecords == null) {
+ batchRecords = new BatchRecords();
+ batchRecordsMap.put(region, batchRecords);
}
-
- @Override
- public void put(Collection<SinkRecord> records) {
- put(records, new HashMap());
+ if (record.key() != null) {
+ if (record.value() == null && nullValuesMeansRemove) {
+ batchRecords.addRemoveOperation(record);
+ } else {
+ batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
+ }
+ } else {
+ // Invest in configurable auto key generator?
+ logger.warn("Unable to push to Geode, missing key in record : " + record.value());
}
+ }
- void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
- //spin off a new thread to handle this operation? Downside is ordering and retries...
- for (SinkRecord record : records) {
- updateBatchForRegionByTopic(record, batchRecordsMap);
- }
- batchRecordsMap.entrySet().stream().forEach((entry) -> {
- String region = entry.getKey();
- BatchRecords batchRecords = entry.getValue();
- batchRecords.executeOperations(regionNameToRegion.get(region));
- });
+ private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) {
+ List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
+ return flat.stream().map(regionName -> createProxyRegion(regionName))
+ .collect(Collectors.toMap(region -> region.getName(), region -> region));
+ }
+
+ private Region createProxyRegion(String regionName) {
+ try {
+ return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(regionName);
+ } catch (RegionExistsException e) {
+ // Each task is a seperate parallel task controlled by kafka.
+ return geodeContext.getClientCache().getRegion(regionName);
}
+ }
- private void updateBatchForRegionByTopic(SinkRecord sinkRecord, Map<String, BatchRecords> batchRecordsMap) {
- Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
- for (String region : regionsToUpdate) {
- updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
- }
- }
+ @Override
+ public void stop() {
+ geodeContext.getClientCache().close(false);
+ }
- private void updateBatchRecordsForRecord(SinkRecord record, Map<String, BatchRecords> batchRecordsMap, String region) {
- BatchRecords batchRecords = batchRecordsMap.get(region);
- if (batchRecords == null) {
- batchRecords = new BatchRecords();
- batchRecordsMap.put(region, batchRecords);
- }
- if (record.key() != null) {
- if (record.value() == null && nullValuesMeansRemove) {
- batchRecords.addRemoveOperation(record);
- } else {
- batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
- }
- } else {
- //Invest in configurable auto key generator?
- logger.warn("Unable to push to Geode, missing key in record : " + record.value());
- }
- }
-
- private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) {
- List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
- return flat.stream().map(regionName -> createProxyRegion(regionName)).collect(Collectors.toMap(region->region.getName(), region -> region));
- }
-
- private Region createProxyRegion(String regionName) {
- try {
- return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
- }
- catch (RegionExistsException e) {
- //Each task is a seperate parallel task controlled by kafka.
- return geodeContext.getClientCache().getRegion(regionName);
- }
- }
-
- @Override
- public void stop() {
- geodeContext.getClientCache().close(false);
- }
-
-}
\ No newline at end of file
+}
diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 59a57cd..e416a9f 100644
--- a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -18,26 +18,26 @@
import java.util.Map;
public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
- //Used by sink
- public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
- public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
- public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
+ // Used by sink
+ public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+ public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+ public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
- private Map<String, List<String>> topicToRegions;
- private final boolean nullValuesMeanRemove;
+ private Map<String, List<String>> topicToRegions;
+ private final boolean nullValuesMeanRemove;
- public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
- super(connectorProperties);
- topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
- nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
- }
+ public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
+ super(connectorProperties);
+ topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
+ nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
+ }
- public Map<String, List<String>> getTopicToRegions() {
- return topicToRegions;
- }
+ public Map<String, List<String>> getTopicToRegions() {
+ return topicToRegions;
+ }
- public boolean getNullValuesMeanRemove() {
- return nullValuesMeanRemove;
- }
+ public boolean getNullValuesMeanRemove() {
+ return nullValuesMeanRemove;
+ }
}
diff --git a/src/main/java/geode/kafka/source/EventBufferSupplier.java b/src/main/java/geode/kafka/source/EventBufferSupplier.java
index d844744..8b05e22 100644
--- a/src/main/java/geode/kafka/source/EventBufferSupplier.java
+++ b/src/main/java/geode/kafka/source/EventBufferSupplier.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package geode.kafka.source;
import java.util.concurrent.BlockingQueue;
diff --git a/src/main/java/geode/kafka/source/GeodeEvent.java b/src/main/java/geode/kafka/source/GeodeEvent.java
index 3de8abd..f569f2b 100644
--- a/src/main/java/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/geode/kafka/source/GeodeEvent.java
@@ -21,19 +21,19 @@
*/
public class GeodeEvent {
- private String regionName;
- private CqEvent event;
+ private String regionName;
+ private CqEvent event;
- public GeodeEvent(String regionName, CqEvent event) {
- this.regionName = regionName;
- this.event = event;
- }
+ public GeodeEvent(String regionName, CqEvent event) {
+ this.regionName = regionName;
+ this.event = event;
+ }
- public String getRegionName() {
- return regionName;
- }
+ public String getRegionName() {
+ return regionName;
+ }
- public CqEvent getEvent() {
- return event;
- }
+ public CqEvent getEvent() {
+ return event;
+ }
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
index b81401a..f150e07 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -14,18 +14,6 @@
*/
package geode.kafka.source;
-import geode.kafka.GeodeConnectorConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.util.ConnectorUtils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -43,11 +31,23 @@
import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+
public class GeodeKafkaSource extends SourceConnector {
private Map<String, String> sharedProps;
- //TODO maybe club this into GeodeConnnectorConfig
+ // TODO maybe club this into GeodeConnnectorConfig
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -59,14 +59,16 @@
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
- List<String> bindings = GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
+ List<String> bindings =
+ GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
- taskProps.put(CQS_TO_REGISTER, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
+ taskProps.put(CQS_TO_REGISTER,
+ GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
return taskConfigs;
@@ -84,7 +86,7 @@
}
private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
- props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
+ props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
@@ -101,7 +103,7 @@
@Override
public String version() {
- //TODO
+ // TODO
return AppInfoParser.getVersion();
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 5c8a152..b0b8c6a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -14,64 +14,66 @@
*/
package geode.kafka.source;
-import org.apache.geode.cache.query.CqEvent;
-import org.apache.geode.cache.query.CqStatusListener;
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqStatusListener;
class GeodeKafkaSourceListener implements CqStatusListener {
- private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
+ private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
- public String regionName;
- private EventBufferSupplier eventBufferSupplier;
- private boolean initialResultsLoaded;
+ public String regionName;
+ private EventBufferSupplier eventBufferSupplier;
+ private boolean initialResultsLoaded;
- public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
- this.regionName = regionName;
- this.eventBufferSupplier = eventBufferSupplier;
- initialResultsLoaded = false;
+ public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
+ this.regionName = regionName;
+ this.eventBufferSupplier = eventBufferSupplier;
+ initialResultsLoaded = false;
+ }
+
+ @Override
+ public void onEvent(CqEvent aCqEvent) {
+ while (!initialResultsLoaded) {
+ Thread.yield();
}
+ try {
+ eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
- @Override
- public void onEvent(CqEvent aCqEvent) {
- while (!initialResultsLoaded) {
- Thread.yield();
- }
+ while (true) {
try {
- eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
-
- while (true) {
- try {
- if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
- break;
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- logger.info("GeodeKafkaSource Queue is full");
- }
+ if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2,
+ TimeUnit.SECONDS))
+ break;
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
}
+ logger.info("GeodeKafkaSource Queue is full");
+ }
}
+ }
- @Override
- public void onError(CqEvent aCqEvent) {
+ @Override
+ public void onError(CqEvent aCqEvent) {
- }
+ }
- @Override
- public void onCqDisconnected() {
- //we should probably redistribute or reconnect
- }
+ @Override
+ public void onCqDisconnected() {
+ // we should probably redistribute or reconnect
+ }
- @Override
- public void onCqConnected() {
+ @Override
+ public void onCqConnected() {
- }
+ }
- public void signalInitialResultsLoaded() {
- initialResultsLoaded = true;
- }
+ public void signalInitialResultsLoaded() {
+ initialResultsLoaded = true;
+ }
}
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
index 6efeb85..da2119a 100644
--- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -14,151 +14,163 @@
*/
package geode.kafka.source;
-import geode.kafka.GeodeContext;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
-import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
-import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
+import geode.kafka.GeodeContext;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
public class GeodeKafkaSourceTask extends SourceTask {
- private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceTask.class);
- private static final String TASK_PREFIX = "TASK";
- private static final String DOT = ".";
+ private static final String TASK_PREFIX = "TASK";
+ private static final String DOT = ".";
- //property string to pass in to identify task
- private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
+ // property string to pass in to identify task
+ private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
- private GeodeContext geodeContext;
- private GeodeSourceConnectorConfig geodeConnectorConfig;
- private EventBufferSupplier eventBufferSupplier;
- private Map<String, List<String>> regionToTopics;
- private Map<String, Map<String, String>> sourcePartitions;
- private int batchSize;
+ private GeodeContext geodeContext;
+ private GeodeSourceConnectorConfig geodeConnectorConfig;
+ private EventBufferSupplier eventBufferSupplier;
+ private Map<String, List<String>> regionToTopics;
+ private Map<String, Map<String, String>> sourcePartitions;
+ private int batchSize;
- private static Map<String, Long> createOffset() {
- Map<String, Long> offset = new HashMap<>();
- offset.put("OFFSET", 0L);
- return offset;
+ private static Map<String, Long> createOffset() {
+ Map<String, Long> offset = new HashMap<>();
+ offset.put("OFFSET", 0L);
+ return offset;
+ }
+
+ @Override
+ public String version() {
+ return null;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ try {
+ geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
+ int taskId = geodeConnectorConfig.getTaskId();
+ logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
+ geodeContext = new GeodeContext();
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
+ geodeConnectorConfig.getSecurityClientAuthInit());
+
+ batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+ eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
+
+ regionToTopics = geodeConnectorConfig.getRegionToTopics();
+ geodeConnectorConfig.getCqsToRegister();
+ sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
+
+ String cqPrefix = geodeConnectorConfig.getCqPrefix();
+ boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
+ installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
+ loadEntireRegion);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("Unable to start source task", e);
+ throw e;
}
+ }
- @Override
- public String version() {
- return null;
- }
-
- @Override
- public void start(Map<String, String> props) {
- try {
- geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
- int taskId = geodeConnectorConfig.getTaskId();
- logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
- geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());
-
- batchSize = Integer.parseInt(props.get(BATCH_SIZE));
- eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
-
- regionToTopics = geodeConnectorConfig.getRegionToTopics();
- geodeConnectorConfig.getCqsToRegister();
- sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());
-
- String cqPrefix = geodeConnectorConfig.getCqPrefix();
- boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
- installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix, loadEntireRegion);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Unable to start source task", e);
- throw e;
+ @Override
+ public List<SourceRecord> poll() throws InterruptedException {
+ ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+ ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+ if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
+ for (GeodeEvent event : events) {
+ String regionName = event.getRegionName();
+ List<String> topics = regionToTopics.get(regionName);
+ for (String topic : topics) {
+ records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
+ null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
}
+ }
+ return records;
}
- @Override
- public List<SourceRecord> poll() throws InterruptedException {
- ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
- ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
- if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
- for (GeodeEvent event : events) {
- String regionName = event.getRegionName();
- List<String> topics = regionToTopics.get(regionName);
- for (String topic : topics) {
- records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
- }
- }
- return records;
- }
+ return null;
+ }
- return null;
- }
+ @Override
+ public void stop() {
+ geodeContext.getClientCache().close(true);
+ }
- @Override
- public void stop() {
- geodeContext.getClientCache().close(true);
+ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,
+ EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+ boolean isDurable = geodeConnectorConfig.isDurable();
+ int taskId = geodeConnectorConfig.getTaskId();
+ for (String region : geodeConnectorConfig.getCqsToRegister()) {
+ installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix,
+ loadEntireRegion, isDurable);
}
+ if (isDurable) {
+ geodeContext.getClientCache().readyForEvents();
+ }
+ }
- void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
- boolean isDurable = geodeConnectorConfig.isDurable();
- int taskId = geodeConnectorConfig.getTaskId();
- for (String region : geodeConnectorConfig.getCqsToRegister()) {
- installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
- }
- if (isDurable) {
- geodeContext.getClientCache().readyForEvents();
- }
+ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
+ EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
+ boolean isDurable) {
+ CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+ GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
+ cqAttributesFactory.addCqListener(listener);
+ CqAttributes cqAttributes = cqAttributesFactory.create();
+ try {
+ if (loadEntireRegion) {
+ Collection<CqEvent> events =
+ geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
+ "select * from /" + regionName, cqAttributes,
+ isDurable);
+ eventBuffer.get().addAll(
+ events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
+ } else {
+ geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
+ "select * from /" + regionName, cqAttributes,
+ isDurable);
+ }
+ } finally {
+ listener.signalInitialResultsLoaded();
}
+ return listener;
+ }
- GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
- CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
- GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
- cqAttributesFactory.addCqListener(listener);
- CqAttributes cqAttributes = cqAttributesFactory.create();
- try {
- if (loadEntireRegion) {
- Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
- isDurable);
- eventBuffer.get().addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
- } else {
- geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
- isDurable);
- }
- } finally {
- listener.signalInitialResultsLoaded();
- }
- return listener;
- }
+ /**
+ * converts a list of regions names into a map of source partitions
+ *
+ * @param regionNames list of regionNames
+ * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
+ */
+ Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
+ return regionNames.stream().map(regionName -> {
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put(REGION_PARTITION, regionName);
+ return sourcePartition;
+ }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
+ }
- /**
- * converts a list of regions names into a map of source partitions
- *
- * @param regionNames list of regionNames
- * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
- */
- Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
- return regionNames.stream().map(regionName -> {
- Map<String, String> sourcePartition = new HashMap<>();
- sourcePartition.put(REGION_PARTITION, regionName);
- return sourcePartition;
- }).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
- }
-
- String generateCqName(int taskId, String cqPrefix, String regionName) {
- return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
- }
+ String generateCqName(int taskId, String cqPrefix, String regionName) {
+ return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+ }
}
diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
index 0b46d2b..4f0393d 100644
--- a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -14,101 +14,101 @@
*/
package geode.kafka.source;
-import geode.kafka.GeodeConnectorConfig;
-
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import geode.kafka.GeodeConnectorConfig;
+
public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
- //Geode Configuration
- public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
- public static final String DEFAULT_DURABLE_CLIENT_ID = "";
- public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
- public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+ // Geode Configuration
+ public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix";
+ public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+ public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+ public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
- public static final String CQ_PREFIX = "cqPrefix";
- public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+ public static final String CQ_PREFIX = "cqPrefix";
+ public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
- /**
- * Used as a key for source partitions
- */
- public static final String REGION_PARTITION = "regionPartition";
- public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
- public static final String CQS_TO_REGISTER = "cqsToRegister";
+ /**
+ * Used as a key for source partitions
+ */
+ public static final String REGION_PARTITION = "regionPartition";
+ public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+ public static final String CQS_TO_REGISTER = "cqsToRegister";
- public static final String BATCH_SIZE = "geodeConnectorBatchSize";
- public static final String DEFAULT_BATCH_SIZE = "100";
+ public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+ public static final String DEFAULT_BATCH_SIZE = "100";
- public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
- public static final String DEFAULT_QUEUE_SIZE = "10000";
+ public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+ public static final String DEFAULT_QUEUE_SIZE = "10000";
- public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
- public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
+ public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+ public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
- private final String durableClientId;
- private final String durableClientIdPrefix;
- private final String durableClientTimeout;
- private final String cqPrefix;
- private final boolean loadEntireRegion;
+ private final String durableClientId;
+ private final String durableClientIdPrefix;
+ private final String durableClientTimeout;
+ private final String cqPrefix;
+ private final boolean loadEntireRegion;
- private Map<String, List<String>> regionToTopics;
- private Collection<String> cqsToRegister;
+ private Map<String, List<String>> regionToTopics;
+ private Collection<String> cqsToRegister;
- public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
- super(connectorProperties);
- cqsToRegister = parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
- regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
- durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
- if (isDurable(durableClientIdPrefix)) {
- durableClientId = durableClientIdPrefix + taskId;
- } else {
- durableClientId = "";
- }
- durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
- cqPrefix = connectorProperties.get(CQ_PREFIX);
- loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+ public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
+ super(connectorProperties);
+ cqsToRegister = parseRegionToTopics(connectorProperties.get(CQS_TO_REGISTER)).keySet();
+ regionToTopics = parseRegionToTopics(connectorProperties.get(REGION_TO_TOPIC_BINDINGS));
+ durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+ if (isDurable(durableClientIdPrefix)) {
+ durableClientId = durableClientIdPrefix + taskId;
+ } else {
+ durableClientId = "";
}
+ durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+ cqPrefix = connectorProperties.get(CQ_PREFIX);
+ loadEntireRegion = Boolean.parseBoolean(connectorProperties.get(LOAD_ENTIRE_REGION));
+ }
- public boolean isDurable() {
- return isDurable(durableClientId);
- }
+ public boolean isDurable() {
+ return isDurable(durableClientId);
+ }
- /**
- * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a value
- * @return
- */
- boolean isDurable(String durableClientId) {
- return !durableClientId.equals("");
- }
+ /**
+ * @param durableClientId or prefix can be passed in. Either both will be "" or both will have a
+ * value
+ */
+ boolean isDurable(String durableClientId) {
+ return !durableClientId.equals("");
+ }
- public int getTaskId() {
- return taskId;
- }
+ public int getTaskId() {
+ return taskId;
+ }
- public String getDurableClientId() {
- return durableClientId;
- }
+ public String getDurableClientId() {
+ return durableClientId;
+ }
- public String getDurableClientTimeout() {
- return durableClientTimeout;
- }
+ public String getDurableClientTimeout() {
+ return durableClientTimeout;
+ }
- public String getCqPrefix() {
- return cqPrefix;
- }
+ public String getCqPrefix() {
+ return cqPrefix;
+ }
- public boolean getLoadEntireRegion() {
- return loadEntireRegion;
- }
+ public boolean getLoadEntireRegion() {
+ return loadEntireRegion;
+ }
- public Map<String, List<String>> getRegionToTopics() {
- return regionToTopics;
- }
+ public Map<String, List<String>> getRegionToTopics() {
+ return regionToTopics;
+ }
- public Collection<String> getCqsToRegister() {
- return cqsToRegister;
- }
+ public Collection<String> getCqsToRegister() {
+ return cqsToRegister;
+ }
}
diff --git a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
index 2a4c883..6ac6bb6 100644
--- a/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
+++ b/src/main/java/geode/kafka/source/SharedEventBufferSupplier.java
@@ -1,37 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package geode.kafka.source;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Supplier;
public class SharedEventBufferSupplier implements EventBufferSupplier {
- private static BlockingQueue<GeodeEvent> eventBuffer;
+ private static BlockingQueue<GeodeEvent> eventBuffer;
- public SharedEventBufferSupplier(int size) {
- recreateEventBufferIfNeeded(size);
- }
+ public SharedEventBufferSupplier(int size) {
+ recreateEventBufferIfNeeded(size);
+ }
- BlockingQueue recreateEventBufferIfNeeded(int size) {
+ BlockingQueue recreateEventBufferIfNeeded(int size) {
+ if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
+ synchronized (GeodeKafkaSource.class) {
if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
- synchronized (GeodeKafkaSource.class) {
- if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
- BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
- eventBuffer = new LinkedBlockingQueue<>(size);
- if (oldEventBuffer != null) {
- eventBuffer.addAll(oldEventBuffer);
- }
- }
- }
+ BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
+ eventBuffer = new LinkedBlockingQueue<>(size);
+ if (oldEventBuffer != null) {
+ eventBuffer.addAll(oldEventBuffer);
+ }
}
- return eventBuffer;
+ }
}
+ return eventBuffer;
+ }
- /**
- * Callers should not store a reference to this and instead always call get to make sure we always use the latest buffer
- * Buffers themselves shouldn't change often but in cases where we want to modify the size
- */
- public BlockingQueue<GeodeEvent> get() {
- return eventBuffer;
- }
+ /**
+ * Callers should not store a reference to this and instead always call get to make sure we always
+ * use the latest buffer
+ * Buffers themselves shouldn't change often but in cases where we want to modify the size
+ */
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
}
diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
index 84a3ec5..c6fe491 100644
--- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java
@@ -14,15 +14,6 @@
*/
package geode.kafka;
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -30,122 +21,130 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
@RunWith(JUnitParamsRunner.class)
public class GeodeConnectorConfigTest {
- @Test
- public void parseRegionNamesShouldSplitOnComma() {
- GeodeConnectorConfig config = new GeodeConnectorConfig();
- List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
- assertEquals(4, regionNames.size());
- assertThat(true, allOf(is(regionNames.contains("region1"))
- , is(regionNames.contains("region2"))
- , is(regionNames.contains("region3"))
- , is(regionNames.contains("region4"))));
+ @Test
+ public void parseRegionNamesShouldSplitOnComma() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4");
+ assertEquals(4, regionNames.size());
+ assertThat(true, allOf(is(regionNames.contains("region1")), is(regionNames.contains("region2")),
+ is(regionNames.contains("region3")), is(regionNames.contains("region4"))));
+ }
+
+ @Test
+ public void parseRegionNamesShouldChomp() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
+ assertEquals(4, regionNames.size());
+ assertThat(true,
+ allOf(is(regionNames instanceof List), is(regionNames.contains("region1")),
+ is(regionNames.contains("region2")), is(regionNames.contains("region3")),
+ is(regionNames.contains("region4"))));
+ }
+
+ @Test
+ public void shouldBeAbleToParseGeodeLocatorStrings() {
+ GeodeConnectorConfig config = new GeodeConnectorConfig();
+ String locatorString = "localhost[8888], localhost[8881]";
+ List<LocatorHostPort> locators = config.parseLocators(locatorString);
+ assertThat(2, is(locators.size()));
+ }
+
+ @Test
+ @Parameters(method = "oneToOneBindings")
+ public void parseBindingsCanSplitOneToOneBindings(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ assertEquals(2, splitBindings.size());
+ }
+
+ @Test
+ public void parseBindingsCanSplitASingleOneToOneBindings() {
+ String binding = "[region1:topic1]";
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
+ assertEquals(1, splitBindings.size());
+ assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
+ }
+
+ public List<String> oneToOneBindings() {
+ return Arrays.asList(
+ new String[] {"[region1:topic1],[region2:topic2]", "[region1:topic1] , [region2:topic2]",
+ "[region1:topic1], [region2:topic2] ,", "[region1: topic1], [region2 :topic2]"});
+ }
+
+ @Test
+ @Parameters(method = "oneToManyBindings")
+ public void parseBindingsCanSplitOneToManyBindings(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+ }
+
+ public List<String> oneToManyBindings() {
+ return Arrays.asList(new String[] {"[region1:topic1,topic2],[region2:topic2,topic3]",
+ "[region1:topic1 , topic2] , [region2:topic2 , topic3]",
+ "[region1:topic1 ,], [region2:topic2 ,] ,",
+ "[region1: topic1 ,topic3], [region2 :topic2]"});
+ }
+
+ @Test
+ @Parameters(method = "oneToManyBindings")
+ public void reconstructBindingsToStringShouldReformAParsableString(String value) {
+ List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
+ String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
+ splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
+ assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
+ for (String topicOrRegion : splitBindings) {
+ assertFalse(topicOrRegion.contains("\\["));
+ assertFalse(topicOrRegion.contains("\\]"));
}
+ }
- @Test
- public void parseRegionNamesShouldChomp() {
- GeodeConnectorConfig config = new GeodeConnectorConfig();
- List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4");
- assertEquals(4, regionNames.size());
- assertThat(true, allOf(is(regionNames instanceof List)
- , is(regionNames.contains("region1"))
- , is(regionNames.contains("region2"))
- , is(regionNames.contains("region3"))
- , is(regionNames.contains("region4"))));
- }
+ @Test
+ @Parameters(method = "oneToOneBindings")
+ public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
+ Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
+ assertEquals(2, regionToTopics.size());
+ assertTrue(regionToTopics.get("region1") != null);
+ assertEquals(1, regionToTopics.get("region1").size());
+ assertTrue(regionToTopics.get("region1").contains("topic1"));
+ }
- @Test
- public void shouldBeAbleToParseGeodeLocatorStrings() {
- GeodeConnectorConfig config = new GeodeConnectorConfig();
- String locatorString="localhost[8888], localhost[8881]";
- List<LocatorHostPort> locators = config.parseLocators(locatorString);
- assertThat(2, is(locators.size()));
- }
-
- @Test
- @Parameters(method="oneToOneBindings")
- public void parseBindingsCanSplitOneToOneBindings(String value) {
- List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
- assertEquals(2, splitBindings.size());
- }
-
- @Test
- public void parseBindingsCanSplitASingleOneToOneBindings() {
- String binding = "[region1:topic1]";
- List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding);
- assertEquals(1, splitBindings.size());
- assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0));
- }
-
- public List<String> oneToOneBindings() {
- return Arrays.asList(new String[]{"[region1:topic1],[region2:topic2]"
- ,"[region1:topic1] , [region2:topic2]"
- ,"[region1:topic1], [region2:topic2] ,"
- ,"[region1: topic1], [region2 :topic2]"});
- }
-
- @Test
- @Parameters(method="oneToManyBindings")
- public void parseBindingsCanSplitOneToManyBindings(String value) {
- List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
- assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
- }
-
- public List<String> oneToManyBindings() {
- return Arrays.asList(new String[]{"[region1:topic1,topic2],[region2:topic2,topic3]"
- ,"[region1:topic1 , topic2] , [region2:topic2 , topic3]"
- ,"[region1:topic1 ,], [region2:topic2 ,] ,"
- ,"[region1: topic1 ,topic3], [region2 :topic2]"});
- }
-
- @Test
- @Parameters(method="oneToManyBindings")
- public void reconstructBindingsToStringShouldReformAParsableString(String value) {
- List<String> splitBindings = GeodeConnectorConfig.parseBindings(value);
- String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings);
- splitBindings = GeodeConnectorConfig.parseBindings(reconstructString);
- assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size());
- for(String topicOrRegion: splitBindings) {
- assertFalse(topicOrRegion.contains("\\["));
- assertFalse(topicOrRegion.contains("\\]"));
- }
- }
-
- @Test
- @Parameters(method="oneToOneBindings")
- public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) {
- Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value);
- assertEquals(2, regionToTopics.size());
- assertTrue(regionToTopics.get("region1") != null);
- assertEquals(1, regionToTopics.get("region1").size());
- assertTrue(regionToTopics.get("region1").contains("topic1"));
- }
-
- @Test
- public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
- Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
- assertTrue(regionToTopics.get("region1") != null);
- assertEquals(1, regionToTopics.get("region1").size());
- assertTrue(regionToTopics.get("region1").contains("topic1"));
- }
+ @Test
+ public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
+ Map<String, List<String>> regionToTopics =
+ GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
+ assertTrue(regionToTopics.get("region1") != null);
+ assertEquals(1, regionToTopics.get("region1").size());
+ assertTrue(regionToTopics.get("region1").contains("topic1"));
+ }
- /*
- taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
- durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
- if (isDurable(durableClientIdPrefix)) {
- durableClientId = durableClientIdPrefix + taskId;
- } else {
- durableClientId = "";
- }
- durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
- regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
- topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
- locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
-
- */
+ /*
+ * taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+ * durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+ * if (isDurable(durableClientIdPrefix)) {
+ * durableClientId = durableClientIdPrefix + taskId;
+ * } else {
+ * durableClientId = "";
+ * }
+ * durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+ * regionToTopics =
+ * parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ * topicToRegions =
+ * parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+ * locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+ *
+ */
diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index 121a978..3ee5e09 100644
--- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -14,13 +14,20 @@
*/
package geode.kafka;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import kafka.admin.RackAwareMode;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -33,27 +40,18 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
public class GeodeKafkaTestCluster {
@@ -76,7 +74,8 @@
private static Consumer<String, String> consumer;
@BeforeClass
- public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
+ public static void setup()
+ throws IOException, QuorumPeerConfig.ConfigException, InterruptedException {
startZooKeeper();
startKafka();
startGeode();
@@ -86,11 +85,11 @@
@AfterClass
public static void shutdown() {
workerAndHerderCluster.stop();
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
- 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
-// AdminZkClient adminZkClient = new AdminZkClient(zkClient);
-// adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
-// adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+ 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
+ // AdminZkClient adminZkClient = new AdminZkClient(zkClient);
+ // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE);
+ // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);
zkClient.close();
kafkaLocalCluster.stop();
geodeLocalCluster.stop();
@@ -104,18 +103,19 @@
}
private static void createTopic(String topicName, int numPartitions, int replicationFactor) {
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
- 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+ 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
- adminZkClient.createTopic(topicName, numPartitions,replicationFactor, topicProperties, RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties,
+ RackAwareMode.Disabled$.MODULE$);
}
private static void deleteTopic(String topicName) {
- KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
- 15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
+ KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000,
+ 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topicName);
}
@@ -129,7 +129,8 @@
zooKeeperLocalCluster.start();
}
- private static void startKafka() throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
+ private static void startKafka()
+ throws IOException, InterruptedException, QuorumPeerConfig.ConfigException {
kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig());
kafkaLocalCluster.start();
}
@@ -139,9 +140,10 @@
geodeLocalCluster.start();
}
- private static Properties getZooKeeperProperties() throws IOException {
+ private static Properties getZooKeeperProperties() throws IOException {
Properties properties = new Properties();
- properties.setProperty("dataDir", (debug)? "/tmp/zookeeper" :temporaryFolder.newFolder("zookeeper").getAbsolutePath());
+ properties.setProperty("dataDir",
+ (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath());
properties.setProperty("clientPort", "2181");
properties.setProperty("tickTime", "2000");
return properties;
@@ -159,43 +161,43 @@
props.put("port", BROKER_PORT);
props.put("offsets.topic.replication.factor", "1");
- //Specifically GeodeKafka connector configs
+ // Specifically GeodeKafka connector configs
return props;
}
- //consumer props, less important, just for testing?
- public static Consumer<String,String> createConsumer() {
- final Properties props = new Properties();
+ // consumer props, less important, just for testing?
+ public static Consumer<String, String> createConsumer() {
+ final Properties props = new Properties();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "myGroup");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ "myGroup");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create the consumer using props.
- final Consumer<String, String> consumer =
- new KafkaConsumer<>(props);
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
- return consumer;
+ final Consumer<String, String> consumer =
+ new KafkaConsumer<>(props);
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE));
+ return consumer;
}
- //consumer props, less important, just for testing?
- public static Producer<String,String> createProducer() {
+ // consumer props, less important, just for testing?
+ public static Producer<String, String> createProducer() {
final Properties props = new Properties();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
+ StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class.getName());
+ StringSerializer.class.getName());
// Create the producer using props.
final Producer<String, String> producer =
- new KafkaProducer<>(props);
+ new KafkaProducer<>(props);
return producer;
}
@@ -207,9 +209,10 @@
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(TEST_REGION_FOR_SOURCE);
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
region.put("KEY" + i, "VALUE" + i);
}
@@ -221,8 +224,7 @@
}
return valueReceived.get() == 10;
});
- }
- finally {
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SOURCE);
}
}
@@ -236,9 +238,10 @@
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(TEST_REGION_FOR_SOURCE);
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
region.put("KEY" + i, "VALUE" + i);
}
@@ -250,23 +253,24 @@
}
return valueReceived.get() == 10;
});
- }
- finally {
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SOURCE);
}
}
@Test
- public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest() throws Exception {
+ public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest()
+ throws Exception {
try {
createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1);
startWorker(5);
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE);
+ Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(TEST_REGION_FOR_SOURCE);
- for (int i = 0; i < 10 ; i++) {
+ for (int i = 0; i < 10; i++) {
region.put("KEY" + i, "VALUE" + i);
}
@@ -278,8 +282,7 @@
}
return valueReceived.get() == 10;
});
- }
- finally {
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SOURCE);
}
}
@@ -292,7 +295,8 @@
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ Region region =
+ client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
@@ -300,23 +304,25 @@
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- }
- finally {
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SINK);
}
}
@Test
- public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest() throws Exception {
+ public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest()
+ throws Exception {
try {
createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
startWorker(5);
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ Region region =
+ client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
@@ -324,22 +330,24 @@
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- }
- finally {
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SINK);
}
}
@Test
- public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest() throws Exception {
+ public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest()
+ throws Exception {
try {
createTopic(TEST_TOPIC_FOR_SINK, 10, 1);
startWorker(15);
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ Region region =
+ client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
@@ -347,9 +355,9 @@
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- }
- finally {
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SINK);
}
}
@@ -362,7 +370,8 @@
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ Region region =
+ client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
@@ -370,9 +379,9 @@
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- }
- finally {
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SINK);
}
}
@@ -385,17 +394,19 @@
consumer = createConsumer();
ClientCache client = createGeodeClient();
- Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
+ Region region =
+ client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);
Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(),
+ UUID.randomUUID().toString()));
}
int i = 0;
- await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
- }
- finally {
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+ } finally {
deleteTopic(TEST_TOPIC_FOR_SINK);
}
}
diff --git a/src/test/java/geode/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java
index 85b3a99..259d30b 100644
--- a/src/test/java/geode/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -18,25 +18,24 @@
public class GeodeLocalCluster {
- private JavaProcess locatorProcess;
- private JavaProcess serverProcess;
+ private JavaProcess locatorProcess;
+ private JavaProcess serverProcess;
- public GeodeLocalCluster() {
- locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
- serverProcess = new JavaProcess(ServerLauncherWrapper.class);
- }
+ public GeodeLocalCluster() {
+ locatorProcess = new JavaProcess(LocatorLauncherWrapper.class);
+ serverProcess = new JavaProcess(ServerLauncherWrapper.class);
+ }
- public void start() throws IOException, InterruptedException {
- System.out.println("starting locator");
- locatorProcess.exec("10334");
- Thread.sleep(15000);
- serverProcess.exec("40404");
- Thread.sleep(30000);
- }
+ public void start() throws IOException, InterruptedException {
+ System.out.println("starting locator");
+ locatorProcess.exec("10334");
+ Thread.sleep(15000);
+ serverProcess.exec("40404");
+ Thread.sleep(30000);
+ }
- public void stop() {
- serverProcess.destroy();
- locatorProcess.destroy();
- }
+ public void stop() {
+ serverProcess.destroy();
+ locatorProcess.destroy();
+ }
}
-
diff --git a/src/test/java/geode/kafka/JavaProcess.java b/src/test/java/geode/kafka/JavaProcess.java
index ed7a493..b130223 100644
--- a/src/test/java/geode/kafka/JavaProcess.java
+++ b/src/test/java/geode/kafka/JavaProcess.java
@@ -19,40 +19,41 @@
public class JavaProcess {
- public Process process;
- Class classWithMain;
+ public Process process;
+ Class classWithMain;
- public JavaProcess(Class classWithmain) {
- this.classWithMain = classWithmain;
+ public JavaProcess(Class classWithmain) {
+ this.classWithMain = classWithmain;
+ }
+
+ public void exec(String... args) throws IOException, InterruptedException {
+ String java =
+ System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+ String classpath = System.getProperty("java.class.path");
+ String className = classWithMain.getName();
+
+ ProcessBuilder builder = new ProcessBuilder(
+ java, "-cp", classpath, className, convertArgsToString(args));
+
+ process = builder.inheritIO().start();
+ }
+
+ private String convertArgsToString(String... args) {
+ String string = "";
+ for (String arg : args) {
+ string += arg;
}
+ return string;
+ }
- public void exec(String... args) throws IOException, InterruptedException {
- String java = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
- String classpath = System.getProperty("java.class.path");
- String className = classWithMain.getName();
+ public void waitFor() throws InterruptedException {
+ process.waitFor();
+ }
- ProcessBuilder builder = new ProcessBuilder(
- java, "-cp", classpath, className, convertArgsToString(args));
-
- process = builder.inheritIO().start();
- }
-
- private String convertArgsToString(String... args) {
- String string = "";
- for(String arg: args) {
- string += arg;
- }
- return string;
- }
-
- public void waitFor() throws InterruptedException {
- process.waitFor();
- }
-
- public void destroy() {
- process.destroy();
- }
+ public void destroy() {
+ process.destroy();
+ }
-}
\ No newline at end of file
+}
diff --git a/src/test/java/geode/kafka/KafkaLocalCluster.java b/src/test/java/geode/kafka/KafkaLocalCluster.java
index 8d09775..ef534d9 100644
--- a/src/test/java/geode/kafka/KafkaLocalCluster.java
+++ b/src/test/java/geode/kafka/KafkaLocalCluster.java
@@ -14,12 +14,12 @@
*/
package geode.kafka;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
import java.io.IOException;
import java.util.Properties;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
public class KafkaLocalCluster {
KafkaServerStartable kafka;
@@ -33,8 +33,7 @@
try {
kafka.startup();
System.out.println("Kafka started up");
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
System.out.println(t);
}
}
diff --git a/src/test/java/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
index 0bc446d..1b922c9 100644
--- a/src/test/java/geode/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
@@ -14,37 +14,38 @@
*/
package geode.kafka;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-
import java.io.File;
import java.io.IOException;
import java.util.Properties;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+
public class LocatorLauncherWrapper {
- public static void main(String[] args) throws IOException {
- Properties properties = new Properties();
-// String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
-// properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
- properties.setProperty(ConfigurationProperties.NAME, "locator1");
+ public static void main(String[] args) throws IOException {
+ Properties properties = new Properties();
+ // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
+ // properties.setProperty(ConfigurationPropert/**/ies.STATISTIC_ARCHIVE_FILE, statsFile);
+ properties.setProperty(ConfigurationProperties.NAME, "locator1");
- Locator.startLocatorAndDS(10334, new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
- while (true) {
-
- }
-//
-// LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
-// .setMemberName("locator1")
-//// .setPort(Integer.valueOf(args[0]))
-//// .setBindAddress("localhost")
-// .build();
-//
-// locatorLauncher.start();
-// while (!locatorLauncher.isRunning()) {
-//
-// }
-// System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort());
+ Locator.startLocatorAndDS(10334,
+ new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties);
+ while (true) {
}
+ //
+ // LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+ // .setMemberName("locator1")
+ //// .setPort(Integer.valueOf(args[0]))
+ //// .setBindAddress("localhost")
+ // .build();
+ //
+ // locatorLauncher.start();
+ // while (!locatorLauncher.isRunning()) {
+ //
+ // }
+ // System.out.println(locatorLauncher.getBindAddress() + ":" + locatorLauncher.getPort());
+
+ }
}
diff --git a/src/test/java/geode/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java
index 5204f65..f60af27 100644
--- a/src/test/java/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -14,58 +14,58 @@
*/
package geode.kafka;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
+
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
-import java.io.IOException;
-import java.util.Properties;
-
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
-
public class ServerLauncherWrapper {
- public static void main(String... args) throws IOException {
-// ServerLauncher serverLauncher = new ServerLauncher.Builder()
-// .setMemberName("server1")
-//// .setServerPort(Integer.valueOf(args[0]))
-//// .setServerBindAddress("localhost")
-// // .set("locators", "localhost[10334]")
-//// .set("jmx-manager", "true")
-//// .set("jmx-manager-start", "true")
-// .build();
-//
-// serverLauncher.start();
-// System.out.println("Geode Server Launcher complete");
+ public static void main(String... args) throws IOException {
+ // ServerLauncher serverLauncher = new ServerLauncher.Builder()
+ // .setMemberName("server1")
+ //// .setServerPort(Integer.valueOf(args[0]))
+ //// .setServerBindAddress("localhost")
+ // // .set("locators", "localhost[10334]")
+ //// .set("jmx-manager", "true")
+ //// .set("jmx-manager-start", "true")
+ // .build();
+ //
+ // serverLauncher.start();
+ // System.out.println("Geode Server Launcher complete");
+ Properties properties = new Properties();
+ String locatorString = "localhost[10334]";
+ // String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
+ Cache cache = new CacheFactory(properties)
+ // .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
+ .set(ConfigurationProperties.LOCATORS, locatorString)
+ .set(ConfigurationProperties.NAME,
+ "server-1")
+ .set(ConfigurationProperties.LOG_FILE,
+ "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
+ .set(ConfigurationProperties.LOG_LEVEL, "info")
+ // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
+ .create();
+ CacheServer cacheServer = cache.addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
- Properties properties = new Properties();
- String locatorString = "localhost[10334]";
-// String statsFile = new File(context.getOutputDir(), "stats.gfs").getAbsolutePath();
- Cache cache = new CacheFactory(properties)
-// .setPdxSerializer(new ReflectionBasedAutoSerializer("benchmark.geode.data.*"))
- .set(ConfigurationProperties.LOCATORS, locatorString)
- .set(ConfigurationProperties.NAME,
- "server-1")
- .set(ConfigurationProperties.LOG_FILE, "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
- .set(ConfigurationProperties.LOG_LEVEL, "info")
-// .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
- .create();
- CacheServer cacheServer = cache.addCacheServer();
- cacheServer.setPort(0);
- cacheServer.start();
+ // create the region
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
+ System.out.println("starting cacheserver");
+ while (true) {
- //create the region
- cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
- cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
- System.out.println("starting cacheserver");
- while (true) {
-
- }
}
+ }
}
diff --git a/src/test/java/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
index c388a5a..70461a3 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -18,19 +18,18 @@
public class WorkerAndHerderCluster {
- private JavaProcess workerAndHerder;
+ private JavaProcess workerAndHerder;
- public WorkerAndHerderCluster() {
- workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
- }
+ public WorkerAndHerderCluster() {
+ workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
+ }
- public void start(String maxTasks) throws IOException, InterruptedException {
- workerAndHerder.exec(maxTasks);
+ public void start(String maxTasks) throws IOException, InterruptedException {
+ workerAndHerder.exec(maxTasks);
- }
+ }
- public void stop() {
- workerAndHerder.destroy();
- }
+ public void stop() {
+ workerAndHerder.destroy();
+ }
}
-
diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 24427aa..5f86985 100644
--- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -14,6 +14,16 @@
*/
package geode.kafka;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import geode.kafka.sink.GeodeKafkaSink;
import geode.kafka.source.GeodeKafkaSource;
import org.apache.kafka.common.utils.SystemTime;
@@ -28,68 +38,62 @@
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
-import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-
public class WorkerAndHerderWrapper {
- public static void main(String[] args) throws IOException {
- String maxTasks = args[0];
+ public static void main(String[] args) throws IOException {
+ String maxTasks = args[0];
- Map props = new HashMap();
- props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put("offset.storage.file.filename", "/tmp/connect.offsets");
- // fast flushing for testing.
- props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+ Map props = new HashMap();
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
- props.put("internal.key.converter.schemas.enable", "false");
- props.put("internal.value.converter.schemas.enable", "false");
- props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
- props.put("key.converter.schemas.enable", "false");
- props.put("value.converter.schemas.enable", "false");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+ "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+ "org.apache.kafka.connect.storage.StringConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
- WorkerConfig workerCfg = new StandaloneConfig(props);
+ WorkerConfig workerCfg = new StandaloneConfig(props);
- MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
- offBackingStore.configure(workerCfg);
+ MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
+ offBackingStore.configure(workerCfg);
- Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy());
- worker.start();
+ Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg,
+ offBackingStore, new AllConnectorClientConfigOverridePolicy());
+ worker.start();
- Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy());
- herder.start();
+ Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg),
+ new AllConnectorClientConfigOverridePolicy());
+ herder.start();
- Map<String, String> sourceProps = new HashMap<>();
- sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
- sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
- sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
+ Map<String, String> sourceProps = new HashMap<>();
+ sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
+ sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
+ sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
+ sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS);
- herder.putConnectorConfig(
- sourceProps.get(ConnectorConfig.NAME_CONFIG),
- sourceProps, true, (error, result)->{
- });
+ herder.putConnectorConfig(
+ sourceProps.get(ConnectorConfig.NAME_CONFIG),
+ sourceProps, true, (error, result) -> {
+ });
- Map<String, String> sinkProps = new HashMap<>();
- sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
- sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
- sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
- sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
- sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
+ Map<String, String> sinkProps = new HashMap<>();
+ sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
+ sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
+ sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
+ sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS);
+ sinkProps.put("topics", TEST_TOPIC_FOR_SINK);
- herder.putConnectorConfig(
- sinkProps.get(ConnectorConfig.NAME_CONFIG),
- sinkProps, true, (error, result)->{
- });
+ herder.putConnectorConfig(
+ sinkProps.get(ConnectorConfig.NAME_CONFIG),
+ sinkProps, true, (error, result) -> {
+ });
- }
+ }
}
diff --git a/src/test/java/geode/kafka/ZooKeeperLocalCluster.java b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
index c1d743c..717b046 100644
--- a/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
@@ -14,14 +14,14 @@
*/
package geode.kafka;
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.admin.AdminServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import java.io.IOException;
-import java.util.Properties;
-
public class ZooKeeperLocalCluster {
ZooKeeperServerMain zooKeeperServer;
diff --git a/src/test/java/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
index 210480e..cdb286b 100644
--- a/src/test/java/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/geode/kafka/sink/BatchRecordsTest.java
@@ -14,84 +14,85 @@
*/
package geode.kafka.sink;
-import org.apache.geode.cache.Region;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Map;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+
public class BatchRecordsTest {
- @Test
- public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
- Map updates = mock(Map.class);
- Collection removes = mock(Collection.class);
- when(removes.contains(any())).thenReturn(true);
- BatchRecords records = new BatchRecords(updates, removes);
- SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addUpdateOperation(sinkRecord, true);
- verify(removes, times(1)).remove(any());
- }
+ @Test
+ public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, true);
+ verify(removes, times(1)).remove(any());
+ }
- @Test
- public void updatingARecordShouldAddToTheUpdateMap() {
- Map updates = mock(Map.class);
- Collection removes = mock(Collection.class);
- when(removes.contains(any())).thenReturn(false);
- BatchRecords records = new BatchRecords(updates, removes);
- SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addUpdateOperation(sinkRecord, true);
- verify(updates, times(1)).put(any(), any());
- }
+ @Test
+ public void updatingARecordShouldAddToTheUpdateMap() {
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(false);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, true);
+ verify(updates, times(1)).put(any(), any());
+ }
- @Test
- public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
- boolean nullValuesMeanRemove = false;
- Map updates = mock(Map.class);
- Collection removes = mock(Collection.class);
- when(removes.contains(any())).thenReturn(true);
- BatchRecords records = new BatchRecords(updates, removes);
- SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
- verify(removes, times(0)).remove(any());
- }
+ @Test
+ public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
+ boolean nullValuesMeanRemove = false;
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(removes.contains(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addUpdateOperation(sinkRecord, nullValuesMeanRemove);
+ verify(removes, times(0)).remove(any());
+ }
- @Test
- public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
- Map updates = mock(Map.class);
- Collection removes = mock(Collection.class);
- when(updates.containsKey(any())).thenReturn(true);
- BatchRecords records = new BatchRecords(updates, removes);
- SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addRemoveOperation(sinkRecord);
- verify(updates, times(1)).remove(any());
- }
+ @Test
+ public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ when(updates.containsKey(any())).thenReturn(true);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addRemoveOperation(sinkRecord);
+ verify(updates, times(1)).remove(any());
+ }
- @Test
- public void removingARecordAddToTheRemoveCollection() {
- Map updates = mock(Map.class);
- Collection removes = mock(Collection.class);
- BatchRecords records = new BatchRecords(updates, removes);
- SinkRecord sinkRecord = mock(SinkRecord.class);
- records.addRemoveOperation(sinkRecord);
- verify(removes, times(1)).add(any());
- }
+ @Test
+ public void removingARecordAddToTheRemoveCollection() {
+ Map updates = mock(Map.class);
+ Collection removes = mock(Collection.class);
+ BatchRecords records = new BatchRecords(updates, removes);
+ SinkRecord sinkRecord = mock(SinkRecord.class);
+ records.addRemoveOperation(sinkRecord);
+ verify(removes, times(1)).add(any());
+ }
- @Test
- public void executeOperationsShouldInvokePutAllAndRemoveAll() {
- Region region = mock(Region.class);
- BatchRecords records = new BatchRecords();
- records.executeOperations(region);
- verify(region, times(1)).putAll(any());
- verify(region, times(1)).removeAll(any());
- }
+ @Test
+ public void executeOperationsShouldInvokePutAllAndRemoveAll() {
+ Region region = mock(Region.class);
+ BatchRecords records = new BatchRecords();
+ records.executeOperations(region);
+ verify(region, times(1)).putAll(any());
+ verify(region, times(1)).removeAll(any());
+ }
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index ed8f040..735f255 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -14,62 +14,61 @@
*/
package geode.kafka.sink;
-import geode.kafka.GeodeSinkConnectorConfig;
-import org.apache.geode.cache.Region;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.TASK_ID;
import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import geode.kafka.GeodeSinkConnectorConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+
public class GeodeKafkaSinkTaskTest {
- private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) {
- HashMap<String, String> props = new HashMap<>();
- props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
- props.put(TASK_ID, "0");
- props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
- props.put(LOCATORS, "localhost[10334]");
- return props;
- }
+ private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) {
+ HashMap<String, String> props = new HashMap<>();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
+ props.put(TASK_ID, "0");
+ props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
+ props.put(LOCATORS, "localhost[10334]");
+ return props;
+ }
- @Test
- public void putRecordsAddsToRegionBatchRecords() {
- boolean nullMeansRemove = true;
- GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
- HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
+ @Test
+ public void putRecordsAddsToRegionBatchRecords() {
+ boolean nullMeansRemove = true;
+ GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
+ HashMap<String, String> props = createTestSinkProps(nullMeansRemove);
- SinkRecord topicRecord = mock(SinkRecord.class);
- when(topicRecord.topic()).thenReturn("topic");
- when(topicRecord.value()).thenReturn("value");
- when(topicRecord.key()).thenReturn("key");
+ SinkRecord topicRecord = mock(SinkRecord.class);
+ when(topicRecord.topic()).thenReturn("topic");
+ when(topicRecord.value()).thenReturn("value");
+ when(topicRecord.key()).thenReturn("key");
- List<SinkRecord> records = new ArrayList();
- records.add(topicRecord);
+ List<SinkRecord> records = new ArrayList();
+ records.add(topicRecord);
- HashMap<String, Region> regionNameToRegion = new HashMap<>();
- GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props);
- HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
- BatchRecords batchRecords = mock(BatchRecords.class);
- batchRecordsMap.put("region", batchRecords);
- task.configure(geodeSinkConnectorConfig);
- task.setRegionNameToRegion(regionNameToRegion);
+ HashMap<String, Region> regionNameToRegion = new HashMap<>();
+ GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props);
+ HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+ BatchRecords batchRecords = mock(BatchRecords.class);
+ batchRecordsMap.put("region", batchRecords);
+ task.configure(geodeSinkConnectorConfig);
+ task.setRegionNameToRegion(regionNameToRegion);
- task.put(records, batchRecordsMap);
- assertTrue(batchRecordsMap.containsKey("region"));
- verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
- }
+ task.put(records, batchRecordsMap);
+ assertTrue(batchRecordsMap.containsKey("region"));
+ verify(batchRecords, times(1)).addUpdateOperation(topicRecord, nullMeansRemove);
+ }
}
diff --git a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
index 633c7ae..d45c0c8 100644
--- a/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -14,52 +14,52 @@
*/
package geode.kafka.sink;
-import org.junit.Test;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BINDINGS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.junit.Test;
public class GeodeKafkaSinkTest {
- @Test
- public void taskConfigsCreatesMaxNumberOfTasks() {
- GeodeKafkaSink sink = new GeodeKafkaSink();
- Map<String, String> props = new HashMap();
- props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
- sink.start(props);
- Collection<Map<String,String>> tasks = sink.taskConfigs(5);
- assertEquals(5, tasks.size());
- }
+ @Test
+ public void taskConfigsCreatesMaxNumberOfTasks() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+ sink.start(props);
+ Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+ assertEquals(5, tasks.size());
+ }
- @Test
- public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
- GeodeKafkaSink sink = new GeodeKafkaSink();
- Map<String, String> props = new HashMap();
- props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
- sink.start(props);
- Collection<Map<String,String>> tasks = sink.taskConfigs(5);
- for(Map<String, String> prop : tasks) {
- assertEquals("[someTopic:someRegion]", prop.get(TOPIC_TO_REGION_BINDINGS));
- }
+ @Test
+ public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+ sink.start(props);
+ Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+ for (Map<String, String> prop : tasks) {
+ assertEquals("[someTopic:someRegion]", prop.get(TOPIC_TO_REGION_BINDINGS));
}
+ }
- @Test
- public void eachTaskHasUniqueTaskIds() {
- GeodeKafkaSink sink = new GeodeKafkaSink();
- Map<String, String> props = new HashMap();
- props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
- sink.start(props);
- Collection<Map<String,String>> tasks = sink.taskConfigs(5);
- HashSet<String> seenIds = new HashSet();
- for(Map<String, String> taskProp : tasks) {
- assertTrue(seenIds.add(taskProp.get(TASK_ID)));
- }
+ @Test
+ public void eachTaskHasUniqueTaskIds() {
+ GeodeKafkaSink sink = new GeodeKafkaSink();
+ Map<String, String> props = new HashMap();
+ props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
+ sink.start(props);
+ Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+ HashSet<String> seenIds = new HashSet();
+ for (Map<String, String> taskProp : tasks) {
+ assertTrue(seenIds.add(taskProp.get(TASK_ID)));
}
+ }
}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 33f1ab5..a919b96 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -14,20 +14,6 @@
*/
package geode.kafka.source;
-import geode.kafka.GeodeContext;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.query.CqEvent;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
import static org.hamcrest.CoreMatchers.is;
@@ -41,190 +27,212 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import geode.kafka.GeodeContext;
+import org.junit.Test;
+
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.query.CqEvent;
+
public class GeodeKafkaSourceTaskTest {
- @Test
- public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
- GeodeContext geodeContext = mock(GeodeContext.class);
- BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = true;
- boolean isDurable = false;
- List<CqEvent> fakeInitialResults = new LinkedList<>();
- for (int i = 0; i < 10; i++) {
- fakeInitialResults.add(mock(CqEvent.class));
- }
-
- when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
- assertEquals(10, eventBuffer.size());
+ @Test
+ public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+ boolean loadEntireRegion = true;
+ boolean isDurable = false;
+ List<CqEvent> fakeInitialResults = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ fakeInitialResults.add(mock(CqEvent.class));
}
- @Test
- public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
- GeodeContext geodeContext = mock(GeodeContext.class);
- BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = false;
- boolean isDurable = false;
- List<CqEvent> fakeInitialResults = new LinkedList<>();
- for (int i = 0; i < 10; i++) {
- fakeInitialResults.add(mock(CqEvent.class));
- }
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+ .thenReturn(fakeInitialResults);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+ "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ assertEquals(10, eventBuffer.size());
+ }
- when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
- assertEquals(0, eventBuffer.size());
+ @Test
+ public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+ boolean loadEntireRegion = false;
+ boolean isDurable = false;
+ List<CqEvent> fakeInitialResults = new LinkedList<>();
+ for (int i = 0; i < 10; i++) {
+ fakeInitialResults.add(mock(CqEvent.class));
}
- @Test
- public void cqListenerOnEventPopulatesEventsBuffer() {
- GeodeContext geodeContext = mock(GeodeContext.class);
- BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
- boolean loadEntireRegion = false;
- boolean isDurable = false;
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+ .thenReturn(fakeInitialResults);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+ "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ assertEquals(0, eventBuffer.size());
+ }
- when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
+ @Test
+ public void cqListenerOnEventPopulatesEventsBuffer() {
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+ boolean loadEntireRegion = false;
+ boolean isDurable = false;
- listener.onEvent(mock(CqEvent.class));
- assertEquals(1, eventBuffer.size());
- }
+ when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
+ .thenReturn(new ArrayList());
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ GeodeKafkaSourceListener listener =
+ task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
+ "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
- @Test
- public void readyForEventsIsCalledIfDurable() {
- ClientCache clientCache = mock(ClientCache.class);
+ listener.onEvent(mock(CqEvent.class));
+ assertEquals(1, eventBuffer.size());
+ }
- GeodeContext geodeContext = mock(GeodeContext.class);
- when(geodeContext.getClientCache()).thenReturn(clientCache);
+ @Test
+ public void readyForEventsIsCalledIfDurable() {
+ ClientCache clientCache = mock(ClientCache.class);
- GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
- when (config.isDurable()).thenReturn(true);
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, null, "", false);
- verify(clientCache, times(1)).readyForEvents();
- }
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
- @Test
- public void cqIsInvokedForEveryRegionWithATopic() {
- ClientCache clientCache = mock(ClientCache.class);
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+ when(config.isDurable()).thenReturn(true);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "", false);
+ verify(clientCache, times(1)).readyForEvents();
+ }
- GeodeContext geodeContext = mock(GeodeContext.class);
- when(geodeContext.getClientCache()).thenReturn(clientCache);
+ @Test
+ public void cqIsInvokedForEveryRegionWithATopic() {
+ ClientCache clientCache = mock(ClientCache.class);
- Map<String, List<String>> regionToTopicsMap = new HashMap<>();
- regionToTopicsMap.put("region1", new ArrayList());
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
- GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
- when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
+ Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+ regionToTopicsMap.put("region1", new ArrayList());
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
- verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
- }
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+ when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
- @Test
- public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
- ClientCache clientCache = mock(ClientCache.class);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
+ verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean());
+ }
- GeodeContext geodeContext = mock(GeodeContext.class);
- when(geodeContext.getClientCache()).thenReturn(clientCache);
+ @Test
+ public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() {
+ ClientCache clientCache = mock(ClientCache.class);
- Map<String, List<String>> regionToTopicsMap = new HashMap<>();
- regionToTopicsMap.put("region1", new ArrayList());
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
- GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
- when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
+ Map<String, List<String>> regionToTopicsMap = new HashMap<>();
+ regionToTopicsMap.put("region1", new ArrayList());
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
- verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
- }
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+ when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
- @Test
- public void readyForEventsIsNotCalledIfNotDurable() {
- ClientCache clientCache = mock(ClientCache.class);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext,
+ createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
+ verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(),
+ anyBoolean());
+ }
- GeodeContext geodeContext = mock(GeodeContext.class);
- when(geodeContext.getClientCache()).thenReturn(clientCache);
+ @Test
+ public void readyForEventsIsNotCalledIfNotDurable() {
+ ClientCache clientCache = mock(ClientCache.class);
- GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
- when (config.isDurable()).thenReturn(false);
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- task.installOnGeode(config, geodeContext, null, "", false);
- verify(clientCache, times(0)).readyForEvents();
- }
+ GeodeContext geodeContext = mock(GeodeContext.class);
+ when(geodeContext.getClientCache()).thenReturn(clientCache);
- @Test
- public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
-// BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-// CqEvent cqEvent = mock(CqEvent.class);
-// when(cqEvent.getNewValue()).thenReturn("New Value");
-// GeodeEvent event = mock(GeodeEvent.class);
-// when(event.getEvent()).thenReturn(cqEvent);
-// eventBuffer.add(event);
-//
-// List<String> topics = new ArrayList<>();
-// topics.add("myTopic");
-//
-// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-// task.startForTesting(eventBuffer, topics, 1);
-// List<SourceRecord> records = task.poll();
-// assertEquals(1, records.size());
- }
+ GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
+ when(config.isDurable()).thenReturn(false);
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ task.installOnGeode(config, geodeContext, null, "", false);
+ verify(clientCache, times(0)).readyForEvents();
+ }
- @Test
- public void installOnGeodeShouldCallCq() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- }
+ @Test
+ public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
+ // BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+ // CqEvent cqEvent = mock(CqEvent.class);
+ // when(cqEvent.getNewValue()).thenReturn("New Value");
+ // GeodeEvent event = mock(GeodeEvent.class);
+ // when(event.getEvent()).thenReturn(cqEvent);
+ // eventBuffer.add(event);
+ //
+ // List<String> topics = new ArrayList<>();
+ // topics.add("myTopic");
+ //
+ // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ // task.startForTesting(eventBuffer, topics, 1);
+ // List<SourceRecord> records = task.poll();
+ // assertEquals(1, records.size());
+ }
+
+ @Test
+ public void installOnGeodeShouldCallCq() {
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ }
+ @Test
+ public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
+ GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ List<String> regionNames = Arrays.asList(new String[] {"region1", "region2", "region3"});
+ Map<String, Map<String, String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
+ assertThat(3, is(sourcePartitions.size()));
+ assertThat(true, is(sourcePartitions.get("region1").get(REGION_PARTITION).equals("region1")));
+ assertThat(true, is(sourcePartitions.get("region2").get(REGION_PARTITION).equals("region2")));
+ assertThat(true, is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
+ }
+
+ @Test
+ public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
+
+ }
+
+ @Test
+ public void shouldNotBeDurableIfDurableClientIdIsNull() {
+
+ }
+
+ @Test
+ public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
+
+ }
- @Test
- public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
- GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"});
- Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames);
- assertThat(3, is(sourcePartitions.size()));
- assertThat(true, is(sourcePartitions.get("region1").get(REGION_PARTITION).equals("region1")));
- assertThat(true, is(sourcePartitions.get("region2").get(REGION_PARTITION).equals("region2")));
- assertThat(true, is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
- }
-
- @Test
- public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
-
- }
-
- @Test
- public void shouldNotBeDurableIfDurableClientIdIsNull() {
-
- }
-
- @Test
- public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
-
- }
+ @Test
+ public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+ // GeodeContext geodeContext = mock(GeodeContext.class);
+ // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+ }
- @Test
- public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-// GeodeContext geodeContext = mock(GeodeContext.class);
-// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
- }
-
-
- private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
- return new EventBufferSupplier() {
- @Override
- public BlockingQueue<GeodeEvent> get() {
- return eventBuffer;
- }
- };
- }
+ private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
+ return new EventBufferSupplier() {
+ @Override
+ public BlockingQueue<GeodeEvent> get() {
+ return eventBuffer;
+ }
+ };
+ }
}
diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index 3e61c77..6632d75 100644
--- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -14,53 +14,53 @@
*/
package geode.kafka.source;
-import org.junit.Test;
+import static geode.kafka.GeodeConnectorConfig.TASK_ID;
+import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import static geode.kafka.GeodeConnectorConfig.TASK_ID;
-import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.junit.Test;
public class GeodeKafkaSourceTest {
- @Test
- public void taskConfigsCreatesMaxNumberOfTasks() {
- GeodeKafkaSource source = new GeodeKafkaSource();
- Map<String, String> props = new HashMap();
- props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
- source.start(props);
- Collection<Map<String,String>> tasks = source.taskConfigs(5);
- assertEquals(5, tasks.size());
- }
+ @Test
+ public void taskConfigsCreatesMaxNumberOfTasks() {
+ GeodeKafkaSource source = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ source.start(props);
+ Collection<Map<String, String>> tasks = source.taskConfigs(5);
+ assertEquals(5, tasks.size());
+ }
- @Test
- public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
- GeodeKafkaSource source = new GeodeKafkaSource();
- Map<String, String> props = new HashMap();
- props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
- source.start(props);
- Collection<Map<String,String>> tasks = source.taskConfigs(5);
- for(Map<String, String> prop : tasks) {
- assertEquals("[someRegion:someTopic]", prop.get(REGION_TO_TOPIC_BINDINGS));
- }
+ @Test
+ public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
+ GeodeKafkaSource source = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ source.start(props);
+ Collection<Map<String, String>> tasks = source.taskConfigs(5);
+ for (Map<String, String> prop : tasks) {
+ assertEquals("[someRegion:someTopic]", prop.get(REGION_TO_TOPIC_BINDINGS));
}
+ }
- @Test
- public void eachTaskHasUniqueTaskIds() {
- GeodeKafkaSource sink = new GeodeKafkaSource();
- Map<String, String> props = new HashMap();
- props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
- sink.start(props);
- Collection<Map<String,String>> tasks = sink.taskConfigs(5);
- HashSet<String> seenIds = new HashSet();
- for(Map<String, String> taskProp : tasks) {
- assertTrue(seenIds.add(taskProp.get(TASK_ID)));
- }
+ @Test
+ public void eachTaskHasUniqueTaskIds() {
+ GeodeKafkaSource sink = new GeodeKafkaSource();
+ Map<String, String> props = new HashMap();
+ props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
+ sink.start(props);
+ Collection<Map<String, String>> tasks = sink.taskConfigs(5);
+ HashSet<String> seenIds = new HashSet();
+ for (Map<String, String> taskProp : tasks) {
+ assertTrue(seenIds.add(taskProp.get(TASK_ID)));
}
+ }
}
diff --git a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
index c75e1b8..a545a72 100644
--- a/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
+++ b/src/test/java/geode/kafka/source/GeodeSourceConnectorConfigTest.java
@@ -14,26 +14,26 @@
*/
package geode.kafka.source;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.TASK_ID;
import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
public class GeodeSourceConnectorConfigTest {
- @Test
- public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
- Map<String, String> props = new HashMap<>();
- props.put(TASK_ID, "0");
- props.put(DURABLE_CLIENT_ID_PREFIX, "");
- props.put(LOCATORS, "localhost[10334]");
- GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
- assertEquals("", config.getDurableClientId());
- }
+ @Test
+ public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TASK_ID, "0");
+ props.put(DURABLE_CLIENT_ID_PREFIX, "");
+ props.put(LOCATORS, "localhost[10334]");
+ GeodeSourceConnectorConfig config = new GeodeSourceConnectorConfig(props);
+ assertEquals("", config.getDurableClientId());
+ }
}
diff --git a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
index b4a429b..87a0e07 100644
--- a/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
+++ b/src/test/java/geode/kafka/source/SharedEventBufferSupplierTest.java
@@ -1,51 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package geode.kafka.source;
-import org.junit.Test;
-
-import java.util.concurrent.BlockingQueue;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
+import java.util.concurrent.BlockingQueue;
+
+import org.junit.Test;
+
public class SharedEventBufferSupplierTest {
- @Test
- public void creatingNewSharedEventSupplierShouldCreateInstance() {
- SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
- assertNotNull(supplier.get());
- }
+ @Test
+ public void creatingNewSharedEventSupplierShouldCreateInstance() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ assertNotNull(supplier.get());
+ }
- @Test
- public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
- SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
- BlockingQueue<GeodeEvent> queue = supplier.get();
- supplier = new SharedEventBufferSupplier(1);
- assertEquals(queue, supplier.get());
- }
+ @Test
+ public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ supplier = new SharedEventBufferSupplier(1);
+ assertEquals(queue, supplier.get());
+ }
- @Test
- public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
- SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
- SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
- assertEquals(supplier.get(), newSupplier.get());
- }
+ @Test
+ public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertEquals(supplier.get(), newSupplier.get());
+ }
- @Test
- public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
- SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
- BlockingQueue<GeodeEvent> queue = supplier.get();
- SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
- assertNotEquals(queue, newSupplier.get());
- }
+ @Test
+ public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ BlockingQueue<GeodeEvent> queue = supplier.get();
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertNotEquals(queue, newSupplier.get());
+ }
- @Test
- public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
- SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
- GeodeEvent geodeEvent = mock(GeodeEvent.class);
- supplier.get().add(geodeEvent);
- SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
- assertEquals(geodeEvent, newSupplier.get().poll());
- }
+ @Test
+ public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
+ SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
+ GeodeEvent geodeEvent = mock(GeodeEvent.class);
+ supplier.get().add(geodeEvent);
+ SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
+ assertEquals(geodeEvent, newSupplier.get().poll());
+ }
}