RYA-377 Fixing broken integration tests.
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 788b41f..f2100e8 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -102,7 +102,8 @@
"--ryaInstance", "" + ryaInstance,
"--kafkaHostname", kafka.getKafkaHostname(),
"--kafkaPort", kafka.getKafkaPort(),
- "--queryID", UUID.randomUUID().toString()
+ "--queryID", UUID.randomUUID().toString(),
+ "--zookeepers", kafka.getZookeeperServers()
};
// Run the test. This will throw an exception.
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
index b7e2be2..ee25f8c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -60,7 +60,6 @@
* @param statementsTopic - The topic statements will be written to. (not null)
* @param resultsTopic - The topic results will be read from. (not null)
* @param builder - The streams topology that will be executed. (not null)
- * @param startupMs - How long to wait for the topology to start before writing the statements.
* @param statements - The statements that will be loaded into the topic. (not null)
* @param expected - The expected results. (not null)
* @param expectedDeserializerClass - The class of the deserializer that will be used when reading
@@ -72,7 +71,6 @@
final String statementsTopic,
final String resultsTopic,
final TopologyBuilder builder,
- final int startupMs,
final List<VisibilityStatement> statements,
final Set<T> expected,
final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
@@ -98,7 +96,7 @@
streams.start();
// Wait for the streams application to start. Streams only see data after their consumers are connected.
- Thread.sleep(startupMs);
+ Thread.sleep(6000);
// Load the statements into the input topic.
try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 80b6e42..33dc945 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -75,7 +75,7 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -113,7 +113,7 @@
expected.add( new VisibilityBindingSet(bs, "a|b") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -147,7 +147,7 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -190,6 +190,6 @@
expected.add(new VisibilityBindingSet(bs, "a"));
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
index ccf5c0c..072469a 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -95,7 +95,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -142,7 +142,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -188,7 +188,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -238,7 +238,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -288,7 +288,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -389,7 +389,7 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 4000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -452,6 +452,6 @@
final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index fb5305f..aaa67ea 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -81,6 +81,6 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
index 642ecbc..c090afa 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -115,7 +115,7 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
private List<VisibilityStatement> getStatements() throws Exception {
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
index cac9559..0a2185d 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -109,9 +109,9 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
-
+
@Test
public void showBeforeWorks() throws Exception {
// Enumerate some topics that will be re-used
@@ -143,9 +143,9 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
-
+
@Test
public void showAfterWorks() throws Exception {
// Enumerate some topics that will be re-used
@@ -177,7 +177,7 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
private List<VisibilityStatement> getStatements() throws Exception {
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index 51bb0ae..bdb9be6 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -111,7 +111,7 @@
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -162,7 +162,7 @@
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -219,7 +219,7 @@
expected.add( new VisibilityBindingSet(bs, "a&c") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -260,7 +260,7 @@
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -311,6 +311,6 @@
expected.add( new VisibilityBindingSet(bs, "c") );
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index c96919c..a8de401 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -87,6 +87,6 @@
expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
// Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class);
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 63c2cc7..2af3a49 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -80,6 +80,6 @@
expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
expected.add(new VisibilityBindingSet(expectedBs, "a"));
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file