Spotless enabled
* check is as a part of the validate phase.
* license headers are inserted by spotless if not included in java files.
* All the setting files are same as used in the Geode project.
diff --git a/etc/license-header b/etc/license-header
new file mode 100644
index 0000000..4a4e671
--- /dev/null
+++ b/etc/license-header
@@ -0,0 +1,14 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6c2676b..c795830 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,6 +49,7 @@
<awaitility.version>3.1.6</awaitility.version>
<maven-plugin.version>3.8.1</maven-plugin.version>
<zookeeper.version>3.5.7</zookeeper.version>
+ <spotless.version>1.27.0</spotless.version>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>
@@ -165,6 +166,34 @@
<build>
<plugins>
<plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <executions>
+ <execution>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <java>
+ <licenseHeader>
+ <file>etc/license-header</file>
+ </licenseHeader>
+ <eclipse>
+ <file>etc/eclipse-java-google-style.xml</file>
+ <version>4.7.1</version>
+ </eclipse>
+ <removeUnusedImports/>
+ <importOrder>
+ <file>etc/eclipseOrganizeImports.importorder</file>
+ </importOrder>
+ </java>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>io.confluent</groupId>
<version>0.10.0</version>
<artifactId>kafka-connect-maven-plugin</artifactId>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 556678f..8043dbb 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -101,8 +101,7 @@
GEODE_GROUP,
2,
ConfigDef.Width.LONG,
- LOCATORS_DISPLAY_NAME
- );
+ LOCATORS_DISPLAY_NAME);
configDef.define(
SECURITY_USER,
ConfigDef.Type.STRING,
@@ -112,8 +111,7 @@
GEODE_GROUP,
3,
ConfigDef.Width.MEDIUM,
- SECURITY_USER_DISPLAY_NAME
- );
+ SECURITY_USER_DISPLAY_NAME);
configDef.define(
SECURITY_PASSWORD,
ConfigDef.Type.PASSWORD,
@@ -133,8 +131,7 @@
GEODE_GROUP,
5,
ConfigDef.Width.LONG,
- SECURITY_CLIENT_AUTH_INIT_DISPLAY_NAME
- );
+ SECURITY_CLIENT_AUTH_INIT_DISPLAY_NAME);
return configDef;
}
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 5e377e3..f7f8aa0 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -39,16 +39,16 @@
public GeodeContext() {}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String durableClientId, String durableClientTimeout, String securityAuthInit,
- String securityUserName, String securityPassword, boolean usesSecurity) {
+ String durableClientId, String durableClientTimeout, String securityAuthInit,
+ String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}
public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
- String securityAuthInit, String securityUserName, String securityPassword,
- boolean usesSecurity) {
+ String securityAuthInit, String securityUserName, String securityPassword,
+ boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName,
securityPassword, usesSecurity);
return clientCache;
@@ -59,8 +59,8 @@
}
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
- String durableClientTimeOut, String securityAuthInit, String securityUserName,
- String securityPassword, boolean usesSecurity) {
+ String durableClientTimeOut, String securityAuthInit, String securityUserName,
+ String securityPassword, boolean usesSecurity) {
try {
ClientCacheFactory ccf = new ClientCacheFactory();
@@ -74,7 +74,7 @@
}
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
- .set("durable-client-timeout", durableClientTimeOut);
+ .set("durable-client-timeout", durableClientTimeOut);
}
// currently we only allow using the default pool.
// If we ever want to allow adding multiple pools we'll have to configure pool factories
@@ -85,8 +85,8 @@
}
return ccf.create();
} catch (Exception e) {
- throw new ConnectException(
- "Unable to create an client cache connected to Apache Geode cluster");
+ throw new ConnectException(
+ "Unable to create an client cache connected to Apache Geode cluster");
}
}
@@ -102,8 +102,8 @@
}
public <E> CqResults<E> newCqWithInitialResults(String name, String query,
- CqAttributes cqAttributes,
- boolean isDurable) throws ConnectException {
+ CqAttributes cqAttributes,
+ boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
diff --git a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
index 117b28e..4f3e414 100644
--- a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
@@ -24,7 +24,7 @@
public class SystemPropertyAuthInit implements AuthInitialize {
@Override
public Properties getCredentials(Properties securityProps, DistributedMember server,
- boolean isPeer) throws AuthenticationFailedException {
+ boolean isPeer) throws AuthenticationFailedException {
Properties extractedProperties = new Properties();
extractedProperties.put("security-username", securityProps.get("security-username"));
extractedProperties.put("security-password", securityProps.get("security-password"));
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index ef95450..dddb28b 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -20,7 +20,6 @@
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
@@ -28,7 +27,6 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
-import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.GeodeContext;
@@ -62,10 +60,10 @@
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -99,7 +97,7 @@
}
private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
- Map<String, BatchRecords> batchRecordsMap) {
+ Map<String, BatchRecords> batchRecordsMap) {
Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
for (String region : regionsToUpdate) {
updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
@@ -107,7 +105,7 @@
}
private void updateBatchRecordsForRecord(SinkRecord record,
- Map<String, BatchRecords> batchRecordsMap, String region) {
+ Map<String, BatchRecords> batchRecordsMap, String region) {
BatchRecords batchRecords = batchRecordsMap.get(region);
if (batchRecords == null) {
batchRecords = new BatchRecords();
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index cdbe60f..676cd46 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -15,9 +15,9 @@
package org.apache.geode.kafka.source;
import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
-import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index a04ea5e..9223ee4 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -71,12 +71,12 @@
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getDurableClientId(),
- geodeConnectorConfig.getDurableClientTimeout(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
+ geodeConnectorConfig.getDurableClientId(),
+ geodeConnectorConfig.getDurableClientTimeout(),
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -120,7 +120,7 @@
}
void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,
- EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
+ EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -133,8 +133,8 @@
}
GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
- EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
- boolean isDurable) {
+ EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
+ boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index 60ea75c..4210162 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -141,8 +141,7 @@
SOURCE_GROUP,
5,
ConfigDef.Width.MEDIUM,
- CQ_PREFIX_DISPLAY_NAME
- );
+ CQ_PREFIX_DISPLAY_NAME);
configDef.define(
BATCH_SIZE,
@@ -153,8 +152,7 @@
SOURCE_GROUP,
6,
ConfigDef.Width.MEDIUM,
- BATCH_SIZE_DISPLAY_NAME
- );
+ BATCH_SIZE_DISPLAY_NAME);
configDef.define(
QUEUE_SIZE,
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
index 83f23e2..d75f3ac 100644
--- a/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeConfigurationConstants.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package org.apache.geode.kafka.utils;
import org.apache.kafka.common.config.types.Password;
@@ -6,9 +20,9 @@
/**
* GEODE SPECIFIC CONFIGURATION
*/
- //Identifier for each task
+ // Identifier for each task
public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
- //Specifies which Locators to connect to Apache Geode
+ // Specifies which Locators to connect to Apache Geode
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
@@ -17,15 +31,13 @@
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD = "security-password";
public static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
- public static final String
- LOCATORS_DOCUMENTATION =
+ public static final String LOCATORS_DOCUMENTATION =
"A comma separated string of locators that configure which locators to connect to";
- public static final String
- SECURITY_USER_DOCUMENTATION =
+ public static final String SECURITY_USER_DOCUMENTATION =
"Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
- public static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
- public static final String
- SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
+ public static final String SECURITY_PASSWORD_DOCUMENTATION =
+ "Supply a password to be used to authenticate with Geode";
+ public static final String SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
"Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
public static final String GEODE_GROUP = "Geode-Configurations";
public static final String SECURITY_PASSWORD_DISPLAY_NAME = "Apache Geode Password";
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
index 312e851..1c550b5 100644
--- a/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSinkConfigurationConstants.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package org.apache.geode.kafka.utils;
public class GeodeSinkConfigurationConstants {
@@ -8,11 +22,9 @@
public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
- public static final String
- NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
+ public static final String NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
"If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
- public static final String
- TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
+ public static final String TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
"A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
public static final String SINK_GROUP = "Sink-Configurations";
public final static String TOPIC_TO_REGION_BINDINGS_DISPLAY_NAME = "Topic to region mapping";
diff --git a/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
index ebaec95..a43ecf6 100644
--- a/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
+++ b/src/main/java/org/apache/geode/kafka/utils/GeodeSourceConfigurationConstants.java
@@ -1,3 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
package org.apache.geode.kafka.utils;
public class GeodeSourceConfigurationConstants {
@@ -11,7 +25,7 @@
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
public static final String CQ_PREFIX = "cq-prefix";
public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
- //Used as a key for source partitions
+ // Used as a key for source partitions
public static final String REGION_PARTITION = "regionPartition";
public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]";
@@ -23,25 +37,21 @@
public static final String DEFAULT_QUEUE_SIZE = "10000";
public static final String LOAD_ENTIRE_REGION = "load-entire-region";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
- public static final String
- CQS_TO_REGISTER_DOCUMENTATION =
+ public static final String CQS_TO_REGISTER_DOCUMENTATION =
"Internally created and used parameter, for signalling a task to register CQs on Apache Geode";
- public static final String
- REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
+ public static final String REGION_TO_TOPIC_BINDINGS_DOCUMENTATION =
"A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\"";
- public static final String
- DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
+ public static final String DURABLE_CLIENT_ID_PREFIX_DOCUMENTATION =
"Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client";
- public static final String
- LOAD_ENTIRE_REGION_DOCUMENTATION =
+ public static final String LOAD_ENTIRE_REGION_DOCUMENTATION =
"Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a CQ";
- public static final String
- DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
+ public static final String DURABLE_CLIENT_TIME_OUT_DOCUMENTATION =
"How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated";
- public static final String CQ_PREFIX_DOCUMENTATION = "Prefix string to identify Connector CQ's on a Geode server";
- public static final String BATCH_SIZE_DOCUMENTATION = "Maximum number of records to return on each poll";
- public static final String
- QUEUE_SIZE_DOCUMENTATION =
+ public static final String CQ_PREFIX_DOCUMENTATION =
+ "Prefix string to identify Connector CQ's on a Geode server";
+ public static final String BATCH_SIZE_DOCUMENTATION =
+ "Maximum number of records to return on each poll";
+ public static final String QUEUE_SIZE_DOCUMENTATION =
"Maximum number of entries in the connector queue before backing up all Geode CQ listeners sharing the task queue ";
public static final String SOURCE_GROUP = "Source-Configuration";
public static final String CQS_TO_REGISTER_DISPLAY_NAME = "Continuous Queries (CQ) to register";
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
index e05a962..46470c1 100644
--- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -48,7 +48,6 @@
import org.apache.geode.kafka.utilities.KafkaLocalCluster;
import org.apache.geode.kafka.utilities.TestObject;
import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
-import org.apache.geode.pdx.FieldType;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 000842e..eaf1a2a 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -25,8 +25,8 @@
}
public void start(String maxTasks, String sourceRegion, String sinkRegion, String sourceTopic,
- String sinkTopic, String offsetPath, String locatorString, String keyConverter,
- String keyConverterArgs, String valueConverter, String valueConverterArgs)
+ String sinkTopic, String offsetPath, String locatorString, String keyConverter,
+ String keyConverterArgs, String valueConverter, String valueConverterArgs)
throws IOException {
String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic,
offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter,