Code improvements:
* Exceptions being thrown as ConnectException
* Return values are used.
* static analyzer recommendations are implemented.
diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
index e630b4e..03a6f0a 100644
--- a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
+++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java
@@ -28,7 +28,7 @@
public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
// Default value = false
public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
- private Map<String, String> internalConfig = new HashMap<>();
+ final private Map<String, String> internalConfig = new HashMap<>();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 7974abd..909cd7c 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -44,8 +44,8 @@
public void addRemoveOperation(SinkRecord record) {
// if a previous operation added to the update map
- // let's just remove it so we don't do a put and then a remove
- // depending on the order of operations (putAll then removeAll or removeAll or putAll)...
+ // let's just remove it so, we don't do a put and then a remove
+ // depending on the order of operations (putAll then removeAll or putAll)...
// ...we could remove one of the if statements.
if (updateMap.containsKey(record.key())) {
updateMap.remove(record.key());
@@ -56,7 +56,7 @@
public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) {
// it's assumed the records in are order
- // if so if a previous value was in the remove list
+ // if so then a previous value was in the remove list
// let's not remove it at the end of this operation
if (nullValuesMeansRemove) {
removeList.remove(record.key());
@@ -70,7 +70,7 @@
region.putAll(updateMap);
region.removeAll(removeList);
} else {
- logger.info("Unable to locate proxy region is null");
+ logger.info("Unable to locate a proxy region. Value is null");
}
}
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1f50ea4..873d89b 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -28,6 +28,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.GeodeContext;
import org.apache.geode.kafka.Version;
@@ -61,13 +62,20 @@
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
+ ClientCache clientCache =
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
+ if (clientCache == null) {
+ throw new ConnectException("Unable start client cache in the sink task");
+ }
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
+ if (e instanceof ConnectException) {
+ throw e;
+ }
throw new ConnectException("Unable to start sink task", e);
}
}
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index 2ace395..7a60513 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -79,11 +79,7 @@
}
public boolean getNullValueBehavior() {
- if (nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE)) {
- return true;
- } else {
- return false;
- }
+ return nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE);
}
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 13e5b60..4ce7371 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
@@ -72,14 +73,17 @@
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
- geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
- geodeConnectorConfig.getDurableClientId(),
- geodeConnectorConfig.getDurableClientTimeout(),
- geodeConnectorConfig.getSecurityClientAuthInit(),
- geodeConnectorConfig.getSecurityUserName(),
- geodeConnectorConfig.getSecurityPassword(),
- geodeConnectorConfig.usesSecurity());
-
+ ClientCache clientCache =
+ geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+ geodeConnectorConfig.getDurableClientId(),
+ geodeConnectorConfig.getDurableClientTimeout(),
+ geodeConnectorConfig.getSecurityClientAuthInit(),
+ geodeConnectorConfig.getSecurityUserName(),
+ geodeConnectorConfig.getSecurityPassword(),
+ geodeConnectorConfig.usesSecurity());
+ if (clientCache == null) {
+ throw new ConnectException("Unable to create client cache in the source task");
+ }
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -92,9 +96,12 @@
loadEntireRegion);
logger.info("Started Apache Geode source task");
} catch (Exception e) {
- e.printStackTrace();
logger.error("Unable to start source task", e);
- throw e;
+ if (e instanceof ConnectException) {
+ throw e;
+ } else {
+ throw new ConnectException(e);
+ }
}
}
diff --git a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
index 1c9e1ff..f7c2a61 100644
--- a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
+++ b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java
@@ -28,7 +28,7 @@
}
public static <T> EnumValidator in(T[] enumerators) {
- Set<String> validValues = new HashSet<String>(enumerators.length);
+ Set<String> validValues = new HashSet<>(enumerators.length);
for (T e : enumerators) {
validValues.add(e.toString().toLowerCase());
}
diff --git a/src/main/resources/kafka-connect-geode-version.properties b/src/main/resources/kafka-connect-geode-version.properties
index 5ea51b3..5efe2ea 100644
--- a/src/main/resources/kafka-connect-geode-version.properties
+++ b/src/main/resources/kafka-connect-geode-version.properties
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-version=${project.version}
\ No newline at end of file
+version=${project.version}
\ No newline at end of file
diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
index 46470c1..dfb990b 100644
--- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -57,16 +57,16 @@
public class JsonPdxConverterDUnitTest {
@Rule
- public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+ final public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
@Rule
public TestName testName = new TestName();
@ClassRule
- public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
+ final public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();
@Rule
- public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+ final public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
@BeforeClass
public static void setup()
diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 411316c..92c78e2 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -71,11 +71,6 @@
public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
- CqResults<Object> fakeInitialResults = new ResultsBag();
- for (int i = 0; i < 10; i++) {
- fakeInitialResults.add(mock(CqEvent.class));
- }
-
when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
.thenReturn(mock(CqQuery.class));
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();