RYA-377 Fixed a bug where streams jobs would not resume where they had left off after being resumed.
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
index e587998..aef7c58 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -25,7 +25,7 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -103,8 +103,13 @@
// Setup the Kafka Stream program.
final Properties streamsProps = new Properties();
- streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
- streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID());
+ streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
+
+ // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run.
+ streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + queryId);
+
+ // Always start at the beginning of the input topic.
+ streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));