RYA-377 Loading Statements into a topic.

Conflicts:
	extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml
index 48879d9..d779c7b 100644
--- a/extras/rya.streams/client/pom.xml
+++ b/extras/rya.streams/client/pom.xml
@@ -41,13 +41,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.api</artifactId>
+            <artifactId>rya.streams.kafka</artifactId>
         </dependency>
-
-        <!-- OpenRDF dependencies -->
         <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-queryrender</artifactId>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api</artifactId>
         </dependency>
 
         <!-- Third Party dependencies -->
@@ -64,12 +62,5 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-
-        <!-- Test dependences -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
index f75a389..93df2ae 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
@@ -26,7 +26,7 @@
 
 import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
 import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException;
-import org.apache.rya.streams.client.command.LoadTriplesCommand;
+import org.apache.rya.streams.client.command.LoadStatementsCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +53,7 @@
     private static final ImmutableMap<String, RyaStreamsCommand> COMMANDS;
     static {
         final Set<Class<? extends RyaStreamsCommand>> commandClasses = new HashSet<>();
-        commandClasses.add(LoadTriplesCommand.class);
+        commandClasses.add(LoadStatementsCommand.class);
         final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder();
         for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) {
             try {
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
similarity index 83%
rename from extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
rename to extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 3c3004e..d023b1f 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -22,9 +22,15 @@
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Properties;
 
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.api.interactor.LoadStatements;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,8 +47,8 @@
  * application.
  */
 @DefaultAnnotation(NonNull.class)
-public class LoadTriplesCommand implements RyaStreamsCommand {
-    private static final Logger log = LoggerFactory.getLogger(LoadTriplesCommand.class);
+public class LoadStatementsCommand implements RyaStreamsCommand {
+    private static final Logger log = LoggerFactory.getLogger(LoadStatementsCommand.class);
 
     /**
      * Command line parameters that are used by this command to configure itself.
@@ -56,10 +62,6 @@
         private short kafkaPort;
         @Parameter(names= {"--kafkaHostname", "-i"}, required = true, description = "The IP or Hostname to use to connect to Kafka.")
         private String kafkaIP;
-        @Parameter(names= {"--zookeeperPort", "-q"}, required = true, description = "The port to use to connect to Zookeeper.")
-        private short zookeeperPort;
-        @Parameter(names= {"--zookeeperHostname", "-z"}, required = true, description = "The IP or Hostname to use to connect to Zookeeper.")
-        private String zookeeperIP;
         @Parameter(names= {"--visibilities", "-v"}, required = true, description = "The visibilities to assign to the statements being loaded in.")
         private String visibilities;
 
@@ -87,14 +89,6 @@
                 parameters.append("\n");
             }
 
-            if (Strings.isNullOrEmpty(zookeeperIP)) {
-                parameters.append("\tZookeeper Location: " + zookeeperIP);
-                if (zookeeperPort > 0) {
-                    parameters.append(":" + zookeeperPort);
-                }
-                parameters.append("\n");
-            }
-
             if (Strings.isNullOrEmpty(visibilities)) {
                 parameters.append("\tVisibilities: " + visibilities);
                 parameters.append("\n");
@@ -140,8 +134,9 @@
         log.trace("Loading RDF Statements from the Triples file '" + params.triplesFile + "'.");
         final Path triplesPath = Paths.get( params.triplesFile );
 
-        final LoadStatements statements = null;
-        try {
+        final Properties producerProps = buildProperties(params);
+        try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
+            final LoadStatements statements = new KafkaLoadStatements(params.topicName, producer);
             statements.load(triplesPath, params.visibilities);
         } catch (final Exception e) {
             log.error("Unable to parse statement file: " + triplesPath.toString(), e);
@@ -149,4 +144,11 @@
 
         log.trace("Finished executing the Load Triples Command.");
     }
-}
\ No newline at end of file
+
+    private Properties buildProperties(final Parameters params) {
+        requireNonNull(params);
+        final Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+        return props;
+    }
+}
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index e0c196f..2d70e2c 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -43,6 +43,10 @@
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.streams.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api</artifactId>
+        </dependency>
 
         <!-- Kafka dependencies -->
         <dependency>
@@ -54,7 +58,7 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
-
+        
         <!-- Test dependencies -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
new file mode 100644
index 0000000..d5a6213
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.FileReader;
+import java.nio.file.Path;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFParser;
+import org.openrdf.rio.Rio;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Loads {@link VisibilityStatement}s from an RDF file into a kafka topic.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaLoadStatements implements LoadStatements {
+    private static final Logger log = LoggerFactory.getLogger(KafkaLoadStatements.class);
+
+    private static final String KAFKA_CLIENT = "Load Statements";
+    private final String topic;
+    private final Producer<?, VisibilityStatement> producer;
+
+    /**
+     * Creates a new {@link KafkaLoadStatements}.
+     *
+     * @param topic - The Kafka topic to load statements into. (not null)
+     * @param producer - The {@link Producer} connected to Kafka. (not null)
+     */
+    public KafkaLoadStatements(final String topic, final Producer<?, VisibilityStatement> producer) {
+        this.topic = requireNonNull(topic);
+        this.producer = requireNonNull(producer);
+    }
+
+
+    @Override
+    public void load(final Path statementsPath, final String visibilities) throws Exception {
+        requireNonNull(statementsPath);
+        requireNonNull(visibilities);
+
+        final RDFParser parser = Rio.createParser(RDFFormat.forFileName(statementsPath.getFileName().toString()));
+        parser.setRDFHandler(new RDFHandlerBase() {
+            @Override
+            public void startRDF() throws RDFHandlerException {
+                log.trace("starting loading statements.");
+            }
+
+            @Override
+            public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+                final VisibilityStatement visiStatement = new VisibilityStatement(stmnt, visibilities);
+                producer.send(new ProducerRecord<>(topic, visiStatement));
+            }
+
+            @Override
+            public void endRDF() throws RDFHandlerException {
+                producer.flush();
+                log.trace("done.");
+            }
+        });
+        parser.parse(new FileReader(statementsPath.toFile()), "");
+    }
+}
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
new file mode 100644
index 0000000..8103e57
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.rio.UnsupportedRDFormatException;
+
+/**
+ * Integration tests the {@link KafkaLoadStatements} command
+ */
+public class KafkaLoadStatementsIT extends KafkaITBase {
+    private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
+
+    private static final Path INVALID = Paths.get("src/test/resources/invalid.INVALID");
+
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+    @Test(expected = UnsupportedRDFormatException.class)
+    public void test_invalidFile() throws Exception {
+        final String topic = rule.getKafkaTopicName();
+        final String visibilities = "a|b|c";
+        final Properties props = rule.createBootstrapServerConfig();
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
+        try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) {
+            final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer);
+            command.load(INVALID, visibilities);
+        }
+    }
+
+    @Test
+    public void testTurtle() throws Exception {
+        final String topic = rule.getKafkaTopicName();
+        final String visibilities = "a|b|c";
+        final Properties props = rule.createBootstrapServerConfig();
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
+        try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) {
+            final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer);
+            command.load(TURTLE_FILE, visibilities);
+        }
+
+        // Read a VisibilityBindingSet from the test topic.
+        final List<VisibilityStatement> read = new ArrayList<>();
+
+        final Properties consumerProps = rule.createBootstrapServerConfig();
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+        try (final KafkaConsumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+            consumer.subscribe(Arrays.asList(rule.getKafkaTopicName()));
+            final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(1000);
+
+            assertEquals(3, records.count());
+            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+            while(iter.hasNext()) {
+                final VisibilityStatement visiSet = iter.next().value();
+                read.add(visiSet);
+            }
+        }
+
+        final List<VisibilityStatement> original = new ArrayList<>();
+        final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+        original.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+                visibilities));
+        original.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+                visibilities));
+        original.add(new VisibilityStatement(
+                VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+                visibilities));
+        // Show the written statement matches the read one.
+        assertEquals(original, read);
+    }
+}
diff --git a/extras/rya.streams/kafka/src/test/resources/invalid.INVALID b/extras/rya.streams/kafka/src/test/resources/invalid.INVALID
new file mode 100644
index 0000000..f71c8c6
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/invalid.INVALID
@@ -0,0 +1 @@
+this file should not be parsed due to invalid extension
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/resources/statements.ttl b/extras/rya.streams/kafka/src/test/resources/statements.ttl
new file mode 100644
index 0000000..3456cc2
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/statements.ttl
@@ -0,0 +1,5 @@
+@prefix example: <http://example#> .
+
+example:alice example:talksTo example:bob . 
+example:bob example:talksTo example:charlie . 
+example:charlie example:likes example:icecream .