blob: aef7c58a84db0d4a6d77968f6f55dc7af192b876 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.streams.kafka.interactor;
import static java.util.Objects.requireNonNull;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.RunQuery;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* A Kafka Streams implementation of {@link RunQuery}.
*/
@DefaultAnnotation(NonNull.class)
public class KafkaRunQuery implements RunQuery {
private static final Logger log = LoggerFactory.getLogger(KafkaRunQuery.class);
private final String kafkaHostname;
private final String kafkaPort;
private final String statementsTopic;
private final String resultsTopic;
private final TopologyBuilderFactory topologyFactory;
private final QueryRepository queryRepo;
/**
* Constructs an instance of {@link KafkaRunQuery}.
*
* @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null)
* @param kafkaPort - The port of the Kafka Broker to connect to. (not null)
* @param statementsTopic - The name of the topic that statements will be read from. (not null)
* @param resultsTopic - The name of the topic that query results will be writen to. (not null)
* @param queryRepo - The query repository that holds queries that are registered. (not null)
* @param topologyFactory - Builds Kafka Stream processing topologies from SPARQL. (not null)
*/
public KafkaRunQuery(
final String kafkaHostname,
final String kafkaPort,
final String statementsTopic,
final String resultsTopic,
final QueryRepository queryRepo,
final TopologyBuilderFactory topologyFactory) {
this.kafkaHostname = requireNonNull( kafkaHostname );
this.kafkaPort = requireNonNull( kafkaPort );
this.statementsTopic = requireNonNull(statementsTopic );
this.resultsTopic = requireNonNull( resultsTopic );
this.topologyFactory = requireNonNull( topologyFactory );
this.queryRepo = requireNonNull( queryRepo );
}
@Override
public void run(final UUID queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Fetch the query from the repository. Throw an exception if it isn't present.
final Optional<StreamsQuery> query = queryRepo.get(queryId);
if(!query.isPresent()) {
throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because no such query " +
"is currently registered.");
}
// Build a processing topology using the SPARQL, provided statements topic, and provided results topic.
final String sparql = query.get().getSparql();
final TopologyBuilder topologyBuilder;
try {
topologyBuilder = topologyFactory.build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
} catch (final Exception e) {
throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because a processing " +
"topolgoy could not be built for the SPARQL " + sparql, e);
}
// Setup the Kafka Stream program.
final Properties streamsProps = new Properties();
streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
// Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run.
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + queryId);
// Always start at the beginning of the input topic.
streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
// If an unhandled exception is thrown, rethrow it.
streams.setUncaughtExceptionHandler((t, e) -> {
// Log the problem and kill the program.
log.error("Unhandled exception while processing the Rya Streams query. Shutting down.", e);
System.exit(1);
});
// Setup a shutdown hook that kills the streams program at shutdown.
final CountDownLatch awaitTermination = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
awaitTermination.countDown();
}
});
// Run the streams program and wait for termination.
streams.start();
try {
awaitTermination.await();
} catch (final InterruptedException e) {
log.warn("Interrupted while waiting for termination. Shutting down.");
}
streams.close();
}
}