RYA-467 update topic cleanup.policy

Added a topic properties builder.  Only one
property is currently in the builder, but
adding others should be easy.
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index f9a9458..edcc252 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -19,9 +19,11 @@
 package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder.CLEANUP_POLICY_COMPACT;
 
 import java.util.HashSet;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@
 import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaRunQuery;
+import org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
 
@@ -136,7 +139,11 @@
                 final Set<String> topics = new HashSet<>();
                 topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
                 topics.add( KafkaTopics.queryResultsTopic(params.ryaInstance, queryId) );
-                KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
+
+                final Properties topicProps = new KafkaTopicPropertiesBuilder()
+                    .setCleanupPolicy(CLEANUP_POLICY_COMPACT)
+                    .build();
+                KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1, Optional.of(topicProps));
 
                 // Run the query that uses those topics.
                 final KafkaRunQuery runQuery = new KafkaRunQuery(
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index f0cc842..e445f47 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -127,12 +127,14 @@
      * @param topicNames - The topics that will be created. (not null)
      * @param partitions - The number of partitions that each of the topics will have.
      * @param replicationFactor - The replication factor of the topics that are created.
+     * @param topicProperties - The optional properties of the topics to create.
      */
     public static void createTopics(
             final String zookeeperServers,
             final Set<String> topicNames,
             final int partitions,
-            final int replicationFactor) {
+            final int replicationFactor,
+            final Optional<Properties> topicProperties) {
         requireNonNull(zookeeperServers);
         requireNonNull(topicNames);
 
@@ -141,7 +143,7 @@
             zkUtils = ZkUtils.apply(new ZkClient(zookeeperServers, 30000, 30000, ZKStringSerializer$.MODULE$), false);
             for(final String topicName : topicNames) {
                 if(!AdminUtils.topicExists(zkUtils, topicName)) {
-                    AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Disabled$.MODULE$);
+                    AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, topicProperties.orElse(new Properties()), RackAwareMode.Disabled$.MODULE$);
                 }
             }
         }
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
index 771e1c8..eb6c9c4 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
@@ -20,6 +20,8 @@
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.rya.streams.kafka.KafkaTopics;
@@ -50,11 +52,13 @@
      * @param topicNames - The topics that will be created. (not null)
      * @param partitions - The number of partitions that each of the topics will have.
      * @param replicationFactor - The replication factor of the topics that are created.
+     * @param topicProperties - The optional properties of the topics to create.
      */
     public void createTopics(
             final Set<String> topicNames,
             final int partitions,
-            final int replicationFactor) {
-        KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor);
+            final int replicationFactor,
+            final Optional<Properties> topicProperties) {
+        KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor, topicProperties);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
new file mode 100644
index 0000000..7df9891
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaTopicPropertiesBuilder.java
@@ -0,0 +1,46 @@
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Properties builder to be used when creating new Kafka Topics.
+ *
+ * Descriptions of properties can be found at
+ * {@link https://kafka.apache.org/documentation/#topicconfigs}
+ */
+public class KafkaTopicPropertiesBuilder {
+    /*----- Cleanup Policy -----*/
+    public static final String CLEANUP_POLICY_KEY = "cleanup.policy";
+    public static final String CLEANUP_POLICY_DELETE = "cleanup.policy";
+    public static final String CLEANUP_POLICY_COMPACT = "cleanup.policy";
+
+
+    private Optional<String> cleanupPolicy;
+    /**
+     * Sets the cleanup.policy of the Kafka Topic.
+     *
+     * @param policy - The cleanup policy to use.
+     * @return The builder.
+     */
+    public KafkaTopicPropertiesBuilder setCleanupPolicy(final String policy) {
+        cleanupPolicy = Optional.of(requireNonNull(policy));
+        return this;
+    }
+
+    /**
+     * Builds the Kafka topic properties.
+     * @return The {@link Properties} of the Kafka Topic.
+     */
+    public Properties build() {
+        final Properties props = new Properties();
+
+        if(cleanupPolicy.isPresent()) {
+            props.setProperty(CLEANUP_POLICY_KEY, cleanupPolicy.get());
+        }
+
+        return props;
+    }
+}
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 4bd022a..2341739 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -24,6 +24,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
@@ -131,7 +132,9 @@
                     KafkaTopics.queryResultsTopic(ryaInstance, query.getQueryId()));
 
             // Make sure the Query Results topic exists for the query.
-            createKafkaTopic.createTopics(topics, 1, 1);
+            // Since this is running in the JVM, the properties are left empty
+            //   so the cleanup.policy will default to delete to reduce memory usage.
+            createKafkaTopic.createTopics(topics, 1, 1, Optional.empty());
 
             // Setup the Kafka Streams job that will execute.
             final KafkaStreams streams = streamsFactory.make(ryaInstance, query);