RYA-377 Loading Statements into a topic.
Conflicts:
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml
index 48879d9..d779c7b 100644
--- a/extras/rya.streams/client/pom.xml
+++ b/extras/rya.streams/client/pom.xml
@@ -41,13 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.api</artifactId>
+ <artifactId>rya.streams.kafka</artifactId>
</dependency>
-
- <!-- OpenRDF dependencies -->
<dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryrender</artifactId>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.api</artifactId>
</dependency>
<!-- Third Party dependencies -->
@@ -64,12 +62,5 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
-
- <!-- Test dependences -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
index f75a389..93df2ae 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
@@ -26,7 +26,7 @@
import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException;
-import org.apache.rya.streams.client.command.LoadTriplesCommand;
+import org.apache.rya.streams.client.command.LoadStatementsCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@
private static final ImmutableMap<String, RyaStreamsCommand> COMMANDS;
static {
final Set<Class<? extends RyaStreamsCommand>> commandClasses = new HashSet<>();
- commandClasses.add(LoadTriplesCommand.class);
+ commandClasses.add(LoadStatementsCommand.class);
final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder();
for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) {
try {
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
similarity index 83%
rename from extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
rename to extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 3c3004e..d023b1f 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadTriplesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -22,9 +22,15 @@
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.api.interactor.LoadStatements;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +47,8 @@
* application.
*/
@DefaultAnnotation(NonNull.class)
-public class LoadTriplesCommand implements RyaStreamsCommand {
- private static final Logger log = LoggerFactory.getLogger(LoadTriplesCommand.class);
+public class LoadStatementsCommand implements RyaStreamsCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoadStatementsCommand.class);
/**
* Command line parameters that are used by this command to configure itself.
@@ -56,10 +62,6 @@
private short kafkaPort;
@Parameter(names= {"--kafkaHostname", "-i"}, required = true, description = "The IP or Hostname to use to connect to Kafka.")
private String kafkaIP;
- @Parameter(names= {"--zookeeperPort", "-q"}, required = true, description = "The port to use to connect to Zookeeper.")
- private short zookeeperPort;
- @Parameter(names= {"--zookeeperHostname", "-z"}, required = true, description = "The IP or Hostname to use to connect to Zookeeper.")
- private String zookeeperIP;
@Parameter(names= {"--visibilities", "-v"}, required = true, description = "The visibilities to assign to the statements being loaded in.")
private String visibilities;
@@ -87,14 +89,6 @@
parameters.append("\n");
}
- if (Strings.isNullOrEmpty(zookeeperIP)) {
- parameters.append("\tZookeeper Location: " + zookeeperIP);
- if (zookeeperPort > 0) {
- parameters.append(":" + zookeeperPort);
- }
- parameters.append("\n");
- }
-
if (Strings.isNullOrEmpty(visibilities)) {
parameters.append("\tVisibilities: " + visibilities);
parameters.append("\n");
@@ -140,8 +134,9 @@
log.trace("Loading RDF Statements from the Triples file '" + params.triplesFile + "'.");
final Path triplesPath = Paths.get( params.triplesFile );
- final LoadStatements statements = null;
- try {
+ final Properties producerProps = buildProperties(params);
+ try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
+ final LoadStatements statements = new KafkaLoadStatements(params.topicName, producer);
statements.load(triplesPath, params.visibilities);
} catch (final Exception e) {
log.error("Unable to parse statement file: " + triplesPath.toString(), e);
@@ -149,4 +144,11 @@
log.trace("Finished executing the Load Triples Command.");
}
-}
\ No newline at end of file
+
+ private Properties buildProperties(final Parameters params) {
+ requireNonNull(params);
+ final Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+ return props;
+ }
+}
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index e0c196f..2d70e2c 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -43,6 +43,10 @@
<groupId>org.apache.rya</groupId>
<artifactId>rya.streams.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.api</artifactId>
+ </dependency>
<!-- Kafka dependencies -->
<dependency>
@@ -54,7 +58,7 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-
+
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
new file mode 100644
index 0000000..d5a6213
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.FileReader;
+import java.nio.file.Path;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFParser;
+import org.openrdf.rio.Rio;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Loads {@link VisibilityStatement}s from an RDF file into a kafka topic.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaLoadStatements implements LoadStatements {
+ private static final Logger log = LoggerFactory.getLogger(KafkaLoadStatements.class);
+
+ private static final String KAFKA_CLIENT = "Load Statements";
+ private final String topic;
+ private final Producer<?, VisibilityStatement> producer;
+
+ /**
+ * Creates a new {@link KafkaLoadStatements}.
+ *
+ * @param topic - The Kafka topic to load statements into. (not null)
+ * @param producer - The {@link Producer} connected to Kafka. (not null)
+ */
+ public KafkaLoadStatements(final String topic, final Producer<?, VisibilityStatement> producer) {
+ this.topic = requireNonNull(topic);
+ this.producer = requireNonNull(producer);
+ }
+
+
+ @Override
+ public void load(final Path statementsPath, final String visibilities) throws Exception {
+ requireNonNull(statementsPath);
+ requireNonNull(visibilities);
+
+ final RDFParser parser = Rio.createParser(RDFFormat.forFileName(statementsPath.getFileName().toString()));
+ parser.setRDFHandler(new RDFHandlerBase() {
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ log.trace("starting loading statements.");
+ }
+
+ @Override
+ public void handleStatement(final Statement stmnt) throws RDFHandlerException {
+ final VisibilityStatement visiStatement = new VisibilityStatement(stmnt, visibilities);
+ producer.send(new ProducerRecord<>(topic, visiStatement));
+ }
+
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ producer.flush();
+ log.trace("done.");
+ }
+ });
+ parser.parse(new FileReader(statementsPath.toFile()), "");
+ }
+}
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
new file mode 100644
index 0000000..8103e57
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.rio.UnsupportedRDFormatException;
+
+/**
+ * Integration tests the {@link KafkaLoadStatements} command
+ */
+public class KafkaLoadStatementsIT extends KafkaITBase {
+ private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
+
+ private static final Path INVALID = Paths.get("src/test/resources/invalid.INVALID");
+
+ @Rule
+ public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+ @Test(expected = UnsupportedRDFormatException.class)
+ public void test_invalidFile() throws Exception {
+ final String topic = rule.getKafkaTopicName();
+ final String visibilities = "a|b|c";
+ final Properties props = rule.createBootstrapServerConfig();
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
+ try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) {
+ final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer);
+ command.load(INVALID, visibilities);
+ }
+ }
+
+ @Test
+ public void testTurtle() throws Exception {
+ final String topic = rule.getKafkaTopicName();
+ final String visibilities = "a|b|c";
+ final Properties props = rule.createBootstrapServerConfig();
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName());
+ try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) {
+ final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer);
+ command.load(TURTLE_FILE, visibilities);
+ }
+
+ // Read a VisibilityBindingSet from the test topic.
+ final List<VisibilityStatement> read = new ArrayList<>();
+
+ final Properties consumerProps = rule.createBootstrapServerConfig();
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
+
+ try (final KafkaConsumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Arrays.asList(rule.getKafkaTopicName()));
+ final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(1000);
+
+ assertEquals(3, records.count());
+ final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+ while(iter.hasNext()) {
+ final VisibilityStatement visiSet = iter.next().value();
+ read.add(visiSet);
+ }
+ }
+
+ final List<VisibilityStatement> original = new ArrayList<>();
+ final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ original.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
+ visibilities));
+ original.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
+ visibilities));
+ original.add(new VisibilityStatement(
+ VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
+ visibilities));
+ // Show the written statement matches the read one.
+ assertEquals(original, read);
+ }
+}
diff --git a/extras/rya.streams/kafka/src/test/resources/invalid.INVALID b/extras/rya.streams/kafka/src/test/resources/invalid.INVALID
new file mode 100644
index 0000000..f71c8c6
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/invalid.INVALID
@@ -0,0 +1 @@
+this file should not be parsed due to invalid extension
\ No newline at end of file
diff --git a/extras/rya.streams/kafka/src/test/resources/statements.ttl b/extras/rya.streams/kafka/src/test/resources/statements.ttl
new file mode 100644
index 0000000..3456cc2
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/statements.ttl
@@ -0,0 +1,5 @@
+@prefix example: <http://example#> .
+
+example:alice example:talksTo example:bob .
+example:bob example:talksTo example:charlie .
+example:charlie example:likes example:icecream .