RYA-467 responding to code review. Closes #286
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index edcc252..ed513ec 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -19,7 +19,6 @@
package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
-import static org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder.CLEANUP_POLICY_COMPACT;
import java.util.HashSet;
import java.util.Optional;
@@ -47,6 +46,7 @@
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.log.LogConfig;
/**
* A command that runs a Rya Streams processing topology on the node the client is executed on until it has finished.
@@ -141,7 +141,7 @@
topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) );
final Properties topicProps = new KafkaTopicPropertiesBuilder()
- .setCleanupPolicy(CLEANUP_POLICY_COMPACT)
+ .setCleanupPolicy(LogConfig.Compact())
.build();
KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1, Optional.of(topicProps));
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
index 7df9891..b5001dd 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.streams.kafka.interactor;
import static java.util.Objects.requireNonNull;
@@ -5,6 +23,10 @@
import java.util.Optional;
import java.util.Properties;
+import org.apache.kafka.common.config.ConfigException;
+
+import kafka.log.LogConfig;
+
/**
* Properties builder to be used when creating new Kafka Topics.
*
@@ -12,15 +34,11 @@
* {@link https://kafka.apache.org/documentation/#topicconfigs}
*/
public class KafkaTopicPropertiesBuilder {
- /*----- Cleanup Policy -----*/
- public static final String CLEANUP_POLICY_KEY = "cleanup.policy";
- public static final String CLEANUP_POLICY_DELETE = "cleanup.policy";
- public static final String CLEANUP_POLICY_COMPACT = "cleanup.policy";
-
-
private Optional<String> cleanupPolicy;
/**
* Sets the cleanup.policy of the Kafka Topic.
+ * Valid properties are:
+ * {@link LogConfig#compact()} and {@link LogConfig#Delete()}
*
* @param policy - The cleanup policy to use.
* @return The builder.
@@ -33,14 +51,16 @@
/**
* Builds the Kafka topic properties.
* @return The {@link Properties} of the Kafka Topic.
+ * @throws ConfigException - If any of the properties are misconfigured.
*/
- public Properties build() {
+ public Properties build() throws ConfigException {
final Properties props = new Properties();
if(cleanupPolicy.isPresent()) {
- props.setProperty(CLEANUP_POLICY_KEY, cleanupPolicy.get());
+ props.setProperty(LogConfig.CleanupPolicyProp(), cleanupPolicy.get());
}
+ LogConfig.validate(props);
return props;
}
}
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
new file mode 100644
index 0000000..1550a16
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilderTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import kafka.log.LogConfig;
+
+public class KafkaTopicPropertiesBuilderTest {
+
+ @Test(expected=ConfigException.class)
+ public void invalidProperty() {
+ new KafkaTopicPropertiesBuilder()
+ .setCleanupPolicy("invalid")
+ .build();
+ }
+
+ @Test
+ public void validProperty() {
+ final Properties props = new KafkaTopicPropertiesBuilder()
+ .setCleanupPolicy(LogConfig.Compact())
+ .build();
+
+ final Properties expected = new Properties();
+ expected.setProperty(LogConfig.CleanupPolicyProp(), LogConfig.Compact());
+ assertEquals(expected, props);
+ }
+}