RYA-377 Fixed a bug where streams jobs would not resume where they had left off after being resumed.
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
index e587998..aef7c58 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -25,7 +25,7 @@
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -103,8 +103,13 @@
 
         // Setup the Kafka Stream program.
         final Properties streamsProps = new Properties();
-        streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
-        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID());
+        streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
+
+        // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run.
+        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + queryId);
+
+        // Always start at the beginning of the input topic.
+        streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));