RYA-377 Fixing broken integration tests.
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 788b41f..f2100e8 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -102,7 +102,8 @@
                 "--ryaInstance", "" + ryaInstance,
                 "--kafkaHostname", kafka.getKafkaHostname(),
                 "--kafkaPort", kafka.getKafkaPort(),
-                "--queryID", UUID.randomUUID().toString()
+                "--queryID", UUID.randomUUID().toString(),
+                "--zookeepers", kafka.getZookeeperServers()
         };
 
         // Run the test. This will throw an exception.
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
index b7e2be2..ee25f8c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -60,7 +60,6 @@
      * @param statementsTopic - The topic statements will be written to. (not null)
      * @param resultsTopic - The topic results will be read from. (not null)
      * @param builder - The streams topology that will be executed. (not null)
-     * @param startupMs - How long to wait for the topology to start before writing the statements.
      * @param statements - The statements that will be loaded into the topic. (not null)
      * @param expected - The expected results. (not null)
      * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
@@ -72,7 +71,6 @@
             final String statementsTopic,
             final String resultsTopic,
             final TopologyBuilder builder,
-            final int startupMs,
             final List<VisibilityStatement> statements,
             final Set<T> expected,
             final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
@@ -98,7 +96,7 @@
             streams.start();
 
             // Wait for the streams application to start. Streams only see data after their consumers are connected.
-            Thread.sleep(startupMs);
+            Thread.sleep(6000);
 
             // Load the statements into the input topic.
             try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 80b6e42..33dc945 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -75,7 +75,7 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -113,7 +113,7 @@
         expected.add( new VisibilityBindingSet(bs, "a|b") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -147,7 +147,7 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -190,6 +190,6 @@
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index ccf5c0c..072469a 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -95,7 +95,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -142,7 +142,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -188,7 +188,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -238,7 +238,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -288,7 +288,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -389,7 +389,7 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 4000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -452,6 +452,6 @@
         final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index fb5305f..aaa67ea 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -81,6 +81,6 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index 642ecbc..c090afa 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -115,7 +115,7 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     private List<VisibilityStatement> getStatements() throws Exception {
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index cac9559..0a2185d 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -109,9 +109,9 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
-    
+
     @Test
     public void showBeforeWorks() throws Exception {
         // Enumerate some topics that will be re-used
@@ -143,9 +143,9 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
-    
+
     @Test
     public void showAfterWorks() throws Exception {
         // Enumerate some topics that will be re-used
@@ -177,7 +177,7 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     private List<VisibilityStatement> getStatements() throws Exception {
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index 51bb0ae..bdb9be6 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -111,7 +111,7 @@
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -162,7 +162,7 @@
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -219,7 +219,7 @@
         expected.add( new VisibilityBindingSet(bs, "a&c") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -260,7 +260,7 @@
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -311,6 +311,6 @@
         expected.add( new VisibilityBindingSet(bs, "c") );
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index c96919c..a8de401 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -87,6 +87,6 @@
         expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
 
         // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class);
     }
 }
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 63c2cc7..2af3a49 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -80,6 +80,6 @@
         expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
         expected.add(new VisibilityBindingSet(expectedBs, "a"));
 
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file