Merge pull request #71 from orpiske/issue-67-cassandra-it-case

Fix issue #67: Flaky test: CamelSinkCassandraITCase
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/cassandra/dao/TestDataDao.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/cassandra/dao/TestDataDao.java
index 24dfce8..1059675 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/cassandra/dao/TestDataDao.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/cassandra/dao/TestDataDao.java
@@ -78,8 +78,8 @@
         session.execute(statement);
     }
 
-    public boolean hasData() {
-        ResultSet rs = session.execute("select * from test_data");
+    public boolean hasEnoughData(long expected) {
+        ResultSet rs = session.execute("select count(*) from test_data");
 
         if (rs == null) {
             return false;
@@ -90,7 +90,9 @@
             return false;
         }
 
-        return true;
+        long count = all.get(0).getLong("count");
+
+        return count == expected;
     }
 
     public String getInsertStatement() {
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/cassandra/CamelSinkCassandraITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/cassandra/CamelSinkCassandraITCase.java
index be541cc..856016b 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/cassandra/CamelSinkCassandraITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/cassandra/CamelSinkCassandraITCase.java
@@ -109,7 +109,7 @@
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
 
-        TestCommon.waitFor(testDataDao::hasData);
+        TestCommon.waitFor(testDataDao::hasEnoughData, (long) expect);
         testDataDao.getData(this::checkRetrievedData);
         assertTrue(String.format("Did not receive as much data as expected: %d < %d", received, expect),
                 received >= expect);