blob: e6bd800e799c92c3db7b7b4c0df9c4f35e820c20 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.streams.querymanager;
import static java.util.Objects.requireNonNull;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.api.queries.QueryChange;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryChangeLogListener;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.UncheckedExecutionException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* A service for managing {@link StreamsQuery} running on a Rya Streams system.
* <p>
* Only one QueryManager needs to be running to manage any number of rya
* instances/rya streams instances.
*/
@DefaultAnnotation(NonNull.class)
public class QueryManager extends AbstractService {
private static final Logger log = LoggerFactory.getLogger(QueryManager.class);
/**
* The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
* Rya instnace.
*/
private final QueryChangeLogSource changeLogSource;
/**
* The engine that is responsible for executing {@link StreamsQuery}s.
*/
private final QueryExecutor queryExecutor;
/**
* How long blocking operations will be attempted before potentially trying again.
*/
private final long blockingValue;
/**
* The units for {@link #blockingValue}.
*/
private final TimeUnit blockingUnits;
/**
* Used to inform threads that the application is shutting down, so they must stop work.
*/
private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
/**
* This thread pool manages the two thread used to work the {@link LogEvent}s
* and the {@link QueryEvent}s.
*/
private final ExecutorService executor = Executors.newFixedThreadPool(2);
/**
* Creates a new {@link QueryManager}.
*
* @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
* @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
* @param blockingValue - How long blocking operations will try before looping. (> 0)
* @param blockingUnits - The units of the {@code blockingValue}. (not null)
*/
public QueryManager(
final QueryExecutor queryExecutor,
final QueryChangeLogSource source,
final long blockingValue,
final TimeUnit blockingUnits) {
this.changeLogSource = requireNonNull(source);
this.queryExecutor = requireNonNull(queryExecutor);
Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
this.blockingValue = blockingValue;
this.blockingUnits = requireNonNull(blockingUnits);
}
@Override
protected void doStart() {
log.info("Starting a QueryManager.");
// A work queue of discovered Query Change Logs that need to be handled.
// This queue exists so that the source notifying thread may be released
// immediately instead of calling into blocking functions.
final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);
// A work queue of discovered Query Changes from the monitored Query Change Logs
// that need to be handled. This queue exists so that the Query Repository notifying
// thread may be released immediately instead of calling into blocking functions.
final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);
try {
// Start up a LogEventWorker using the executor service.
executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));
// Start up a QueryEvent Worker using the executor service.
executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
// Start up the query execution framework.
queryExecutor.startAndWait();
// Startup the source that discovers new Query Change Logs.
changeLogSource.startAndWait();
// Subscribe the source a listener that writes to the LogEventWorker's work queue.
changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
} catch(final RejectedExecutionException | UncheckedExecutionException e) {
log.error("Could not start up a QueryManager.", e);
notifyFailed(e);
}
// Notify the service was successfully started.
notifyStarted();
log.info("QueryManager has finished starting.");
}
@Override
protected void doStop() {
log.info("Stopping a QueryManager.");
// Set the shutdown flag so that all components that rely on that signal will stop processing.
shutdownSignal.set(true);
// Stop the workers and wait for them to die.
executor.shutdownNow();
try {
if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
}
} catch (final InterruptedException e) {
log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
}
// Stop the source of new Change Logs.
try {
changeLogSource.stopAndWait();
} catch(final UncheckedExecutionException e) {
log.warn("Could not stop the Change Log Source.", e);
}
// Stop the query execution framework.
try {
queryExecutor.stopAndWait();
} catch(final UncheckedExecutionException e) {
log.warn("Could not stop the Query Executor", e);
}
// Notify the service was successfully stopped.
notifyStopped();
log.info("QueryManager has finished stopping.");
}
/**
* Offer a unit of work to a blocking queue until it is either accepted, or the
* shutdown signal is set.
*
* @param workQueue - The blocking work queue to write to. (not null)
* @param event - The event that will be offered to the work queue. (not null)
* @param offerValue - How long to wait when offering new work.
* @param offerUnits - The unit for the {@code offerValue}. (not null)
* @param shutdownSignal - Used to signal application shutdown has started, so
* this method may terminate without ever placing the event on the queue. (not null)
* @return {@code true} if the evet nwas added to the queue, otherwise false.
*/
private static <T> boolean offerUntilAcceptedOrShutdown(
final BlockingQueue<T> workQueue,
final T event,
final long offerValue,
final TimeUnit offerUnits,
final AtomicBoolean shutdownSignal) {
requireNonNull(workQueue);
requireNonNull(event);
requireNonNull(shutdownSignal);
boolean submitted = false;
while(!submitted && !shutdownSignal.get()) {
try {
submitted = workQueue.offer(event, offerValue, offerUnits);
if(!submitted) {
log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
}
} catch (final InterruptedException e) {
log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
}
}
return submitted;
}
/**
* An observation that a {@link QueryChangeLog} was created within or
* removed from a {@link QueryChangeLogSource}.
*/
@DefaultAnnotation(NonNull.class)
static class LogEvent {
/**
* The types of events that may be observed.
*/
static enum LogEventType {
/**
* A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
*/
CREATE,
/**
* A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
*/
DELETE;
}
private final String ryaInstance;
private final LogEventType eventType;
private final Optional<QueryChangeLog> log;
/**
* Constructs an instance of {@link LogEvent}.
*
* @param ryaInstance - The Rya Instance the log is/was for. (not null)
* @param eventType - The type of event that was observed. (not null)
* @param log - The log if this is a create event. (not null)
*/
private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
this.ryaInstance = requireNonNull(ryaInstance);
this.eventType = requireNonNull(eventType);
this.log = requireNonNull(log);
}
/**
* @return The Rya Instance whose log was either created or deleted.
*/
public String getRyaInstanceName() {
return ryaInstance;
}
/**
* @return The type of event that was observed.
*/
public LogEventType getEventType() {
return eventType;
}
/**
* @return The {@link QueryChangeLog} if this is a CREATE event.
*/
public Optional<QueryChangeLog> getQueryChangeLog() {
return log;
}
@Override
public String toString() {
return "LogEvent {\n" +
" Rya Instance: " + ryaInstance + ",\n" +
" Event Type: " + eventType + "\n" +
"}";
}
/**
* Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
* {@link QueryChangeLogSource}.
*
* @param ryaInstance - The Rya Instance the created log is for. (not null)
* @param log - The created {@link QueryChangeLog. (not null)
* @return A {@link LogEvent} built using the provided values.
*/
public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
}
/**
* Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
* a {@link QueryChangeLogSource}.
*
* @param ryaInstance - The Rya Instance whose log was deleted. (not null)
* @return A {@link LogEvent} built using the provided values.
*/
public static LogEvent delete(final String ryaInstance) {
return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
}
}
/**
* An observation that a {@link StreamsQuery} needs to be executing or not
* via the provided {@link QueryExecutor}.
*/
@DefaultAnnotation(NonNull.class)
static class QueryEvent {
/**
* The type of events that may be observed.
*/
public static enum QueryEventType {
/**
* Indicates a {@link StreamsQuery} needs to be executing.
*/
EXECUTING,
/**
* Indicates a {@link StreamsQuery} needs to be stopped.
*/
STOPPED,
/**
* Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
*/
STOP_ALL;
}
private final String ryaInstance;
private final QueryEventType type;
private final Optional<UUID> queryId;
private final Optional<StreamsQuery> query;
/**
* Constructs an instance of {@link QueryEvent}.
*
* @param ryaInstance - The Rya instance that generated the event. (not null)
* @param type - Indicates whether the query needs to be executing or not. (not null)
* @param queryId - If stopped, the ID of the query that must not be running. (not null)
* @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
*/
private QueryEvent(
final String ryaInstance,
final QueryEventType type,
final Optional<UUID> queryId,
final Optional<StreamsQuery> query) {
this.ryaInstance = requireNonNull(ryaInstance);
this.type = requireNonNull(type);
this.queryId = requireNonNull(queryId);
this.query = requireNonNull(query);
}
/**
* @return The Rya instance that generated the event.
*/
public String getRyaInstance() {
return ryaInstance;
}
/**
* @return Indicates whether the query needs to be executing or not.
*/
public QueryEventType getType() {
return type;
}
/**
* @return If stopped, the ID of the query that must not be running. Otherwise absent.
*/
public Optional<UUID> getQueryId() {
return queryId;
}
/**
* @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
*/
public Optional<StreamsQuery> getStreamsQuery() {
return query;
}
@Override
public int hashCode() {
return Objects.hash(ryaInstance, type, queryId, query);
}
@Override
public boolean equals(final Object o) {
if(o instanceof QueryEvent) {
final QueryEvent other = (QueryEvent) o;
return Objects.equals(ryaInstance, other.ryaInstance) &&
Objects.equals(type, other.type) &&
Objects.equals(queryId, other.queryId) &&
Objects.equals(query, other.query);
}
return false;
}
@Override
public String toString() {
final StringBuilder string = new StringBuilder();
string.append("Query Event {\n")
.append(" Rya Instance: ").append(ryaInstance).append(",\n")
.append(" Type: ").append(type).append(",\n");
switch(type) {
case EXECUTING:
append(string, query.get());
break;
case STOPPED:
string.append(" Query ID: ").append(queryId.get()).append("\n");
break;
case STOP_ALL:
break;
default:
// Default to showing everything that is in the object.
string.append(" Query ID: ").append(queryId.get()).append("\n");
append(string, query.get());
break;
}
string.append("}");
return string.toString();
}
private void append(final StringBuilder string, final StreamsQuery query) {
requireNonNull(string);
requireNonNull(query);
string.append(" Streams Query {\n")
.append(" Query ID: ").append(query.getQueryId()).append(",\n")
.append(" Is Active: ").append(query.isActive()).append(",\n")
.append(" SPARQL: ").append(query.getSparql()).append("\n")
.append(" }");
}
/**
* Create a {@link QueryEvent} that indicates a query needs to be executing.
*
* @param ryaInstance - The Rya instance that generated the event. (not null)
* @param query - The StreamsQuery that defines what should be executing. (not null)
* @return A {@link QueryEvent} built using the provided values.
*/
public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
}
/**
* Create a {@link QueryEvent} that indicates a query needs to be stopped.
*
* @param ryaInstance - The Rya instance that generated the event. (not null)
* @param queryId - The ID of the query that must not be running. (not null)
* @return A {@link QueryEvent} built using the provided values.
*/
public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
}
/**
* Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
*
* @param ryaInstance - The Rya instance that generated the event. (not null)
* @return A {@link QueryEvent} built using the provided values.
*/
public static QueryEvent stopALL(final String ryaInstance) {
return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
}
}
/**
* Listens to a {@link QueryChangeLogSource} and adds observations to the provided
* work queue. It does so until the provided shutdown signal is set.
*/
@DefaultAnnotation(NonNull.class)
static class LogEventWorkGenerator implements SourceListener {
private final BlockingQueue<LogEvent> workQueue;
private final AtomicBoolean shutdownSignal;
private final long offerValue;
private final TimeUnit offerUnits;
/**
* Constructs an instance of {@link QueryManagerSourceListener}.
*
* @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
* @param offerValue - How long to wait when offering new work.
* @param offerUnits - The unit for the {@code offerValue}. (not null)
* @param shutdownSignal - Indicates to this listener that it needs to stop adding events
* to the work queue because the application is shutting down. (not null)
*/
public LogEventWorkGenerator(
final BlockingQueue<LogEvent> workQueue,
final long offerValue,
final TimeUnit offerUnits,
final AtomicBoolean shutdownSignal) {
this.workQueue = requireNonNull(workQueue);
this.shutdownSignal = requireNonNull(shutdownSignal);
this.offerValue = offerValue;
this.offerUnits = requireNonNull(offerUnits);
}
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
"queries that are set to active within it will be started.");
// Create an event that summarizes this notification.
final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);
// Offer it to the worker until there is room for it in the work queue, or we are shutting down.
offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
}
@Override
public void notifyDelete(final String ryaInstanceName) {
log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
"queries related to that instance will be stopped.");
// Create an event that summarizes this notification.
final LogEvent event = LogEvent.delete(ryaInstanceName);
// Offer it to the worker until there is room for it in the work queue, or we are shutting down.
offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
}
}
/**
* Processes a work queue of {@link LogEvent}s.
* <p/>
* Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
* that generates {@link QueryEvent}s based on the content and updates to the discovered
* {@link QueryChagneLog}.
* <p/>
* Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
* is written to the work queue.
*/
@DefaultAnnotation(NonNull.class)
static class LogEventWorker implements Runnable {
/**
* A map of Rya Instance name to he Query Repository for that instance.
*/
private final Map<String, QueryRepository> repos = new HashMap<>();
private final BlockingQueue<LogEvent> logWorkQueue;
private final BlockingQueue<QueryEvent> queryWorkQueue;
private final long blockingValue;
private final TimeUnit blockingUnits;
private final AtomicBoolean shutdownSignal;
/**
* Constructs an instance of {@link LogEventWorker}.
*
* @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
* @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
* @param blockingValue - How long to wait when polling/offering new work.
* @param blockingUnits - The unit for the {@code blockingValue}. (not null)
* @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
* may exit the {@link #run()} method. (not null)
*/
public LogEventWorker(
final BlockingQueue<LogEvent> logWorkQueue,
final BlockingQueue<QueryEvent> queryWorkQueue,
final long blockingValue,
final TimeUnit blockingUnits,
final AtomicBoolean shutdownSignal) {
this.logWorkQueue = requireNonNull(logWorkQueue);
this.queryWorkQueue = requireNonNull(queryWorkQueue);
this.blockingValue = blockingValue;
this.blockingUnits = requireNonNull(blockingUnits);
this.shutdownSignal = requireNonNull(shutdownSignal);
}
@Override
public void run() {
// Run until the shutdown signal is set.
while(!shutdownSignal.get()) {
try {
// Pull a unit of work from the queue.
log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
if(logEvent == null) {
// Poll again if nothing was found.
continue;
}
log.info("LogEventWorker - handling: \n" + logEvent);
final String ryaInstance = logEvent.getRyaInstanceName();
switch(logEvent.getEventType()) {
case CREATE:
// If we see a create message for a Rya Instance we are already maintaining,
// then don't do anything.
if(repos.containsKey(ryaInstance)) {
log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
ryaInstance + ". This message will be ignored.");
continue;
}
// Create and start a QueryRepository for the discovered log. Hold onto the repository
// so that it may be shutdown later.
final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
repo.startAndWait();
repos.put(ryaInstance, repo);
// Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
// A count down latch is used to ensure the returned set of queries are handled
// prior to any notifications from the repository.
final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
final QueryEventWorkGenerator queryWorkGenerator =
new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
blockingValue, blockingUnits, shutdownSignal);
log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
// Handle the view of the queries within the repository as it existed when
// the subscription was registered.
queries.stream()
.forEach(query -> {
// Create a QueryEvent that represents the active state of the existing query.
final QueryEvent queryEvent = query.isActive() ?
QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
log.debug("LogEventWorker - offering: " + queryEvent);
// Offer it to the worker until there is room for it in the work queue, or we are shutting down.
offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
});
// Indicate the subscription work is finished so that the registered listener may start
// adding work to the queue.
log.info("LogEventWorker - Counting down the subscription work latch.");
subscriptionWorkFinished.countDown();
break;
case DELETE:
if(repos.containsKey(ryaInstance)) {
// Shut down the query repository for the Rya instance. This ensures the listener will
// not receive any more work that needs to be done.
final QueryRepository deletedRepo = repos.remove(ryaInstance);
deletedRepo.stopAndWait();
// Add work that stops all of the queries related to the instance.
final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
}
break;
}
} catch (final InterruptedException e) {
log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
}
}
log.info("LogEventWorker shutting down...");
// Shutdown all of the QueryRepositories that were started.
repos.values().forEach(repo -> repo.stopAndWait());
log.info("LogEventWorker shut down.");
}
}
/**
* Listens to a {@link QueryRepository} and adds observations to the provided work queue.
* It does so until the provided shutdown signal is set.
*/
@DefaultAnnotation(NonNull.class)
static class QueryEventWorkGenerator implements QueryChangeLogListener {
private final String ryaInstance;
private final CountDownLatch subscriptionWorkFinished;
private final BlockingQueue<QueryEvent> queryWorkQueue;
private final long blockingValue;
private final TimeUnit blockingUnits;
private final AtomicBoolean shutdownSignal;
/**
* Constructs an instance of {@link QueryEventWorkGenerator}.
*
* @param ryaInstance - The rya instance whose log this objects is watching. (not null)
* @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
* listener handles notifications is completed. (not null)
* @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
* @param blockingValue - How long to wait when polling/offering new work.
* @param blockingUnits - The unit for the {@code blockingValue}. (not null)
* @param shutdownSignal - Indicates to this listener that it needs to stop adding events
* to the work queue because the application is shutting down. (not null)
*/
public QueryEventWorkGenerator(
final String ryaInstance,
final CountDownLatch subscriptionWorkFinished,
final BlockingQueue<QueryEvent> queryWorkQueue,
final long blockingValue,
final TimeUnit blockingUnits,
final AtomicBoolean shutdownSignal) {
this.ryaInstance = requireNonNull(ryaInstance);
this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
this.queryWorkQueue = requireNonNull(queryWorkQueue);
this.blockingValue = blockingValue;
this.blockingUnits = requireNonNull(blockingUnits);
this.shutdownSignal = requireNonNull(shutdownSignal);
}
@Override
public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
requireNonNull(queryChangeEvent);
requireNonNull(newQueryState);
// Wait for the subscription work to be finished.
try {
log.debug("Waiting for Subscription Work Finished latch to release...");
while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
log.debug("Still waiting...");
}
log.debug("Subscription Work Finished latch to released.");
} catch (final InterruptedException e) {
log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
"released. Shutting down?", e);
}
// If we left the loop because of a shutdown, return immediately.
if(shutdownSignal.get()) {
log.debug("Not processing notification. Shutting down.");
return;
}
// Generate work from the notification.
final QueryChange change = queryChangeEvent.getEntry();
switch(change.getChangeType()) {
case CREATE:
if(newQueryState.isPresent()) {
log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
final StreamsQuery newQuery = newQueryState.get();
if(newQuery.isActive()) {
final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
}
} else {
log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
"StreamsQuery representing the created query. The query will not be processed.");
}
break;
case DELETE:
final UUID deletedQueryId = change.getQueryId();
log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
break;
case UPDATE:
if(newQueryState.isPresent()) {
final StreamsQuery updatedQuery = newQueryState.get();
if(updatedQuery.isActive()) {
log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
updatedQuery.getQueryId() + " to be active.");
final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
} else {
log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
updatedQuery.getQueryId() + " to be inactive.");
final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
}
} else {
log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
"StreamsQuery representing the created query. The query will not be processed.");
}
break;
}
}
}
/**
* Processes a work queue of {@link QueryEvent}s.
* <p/>
* Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
*/
@DefaultAnnotation(NonNull.class)
static class QueryEventWorker implements Runnable {
private final BlockingQueue<QueryEvent> workQueue;
private final QueryExecutor queryExecutor;
private final long pollingValue;
private final TimeUnit pollingUnits;
private final AtomicBoolean shutdownSignal;
/**
* Constructs an instance of {@link QueryEventWorker}.
*
* @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
* @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
* @param pollingValue - How long to wait when polling for new work.
* @param pollingUnits - The units for the {@code pollingValue}. (not null)
* @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
* may exit the {@link #run()} method. (not null)
*/
public QueryEventWorker(
final BlockingQueue<QueryEvent> workQueue,
final QueryExecutor queryExecutor,
final long pollingValue,
final TimeUnit pollingUnits,
final AtomicBoolean shutdownSignal) {
this.workQueue = requireNonNull(workQueue);
this.queryExecutor = requireNonNull(queryExecutor);
this.pollingValue = pollingValue;
this.pollingUnits = requireNonNull(pollingUnits);
this.shutdownSignal = requireNonNull(shutdownSignal);
}
@Override
public void run() {
log.info("QueryEventWorker starting.");
// Run until the shutdown signal is set.
while(!shutdownSignal.get()) {
// Pull a unit of work from the queue.
try {
log.debug("Polling the work queue for a new QueryEvent.");
final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
if(event == null) {
// Poll again if nothing was found.
continue;
}
log.info("QueryEventWorker handling:\n" + event);
// Ensure the state within the executor matches the query event's state.
switch(event.getType()) {
case EXECUTING:
try {
queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
} catch (final IllegalStateException | QueryExecutorException e) {
log.error("Could not start a query represented by the following work: " + event, e);
}
break;
case STOPPED:
try {
queryExecutor.stopQuery(event.getQueryId().get());
} catch (final IllegalStateException | QueryExecutorException e) {
log.error("Could not stop a query represented by the following work: " + event, e);
}
break;
case STOP_ALL:
try {
queryExecutor.stopAll(event.getRyaInstance());
} catch (final IllegalStateException | QueryExecutorException e) {
log.error("Could not stop all queries represented by the following work: " + event, e);
}
break;
}
} catch (final InterruptedException e) {
log.debug("QueryEventWorker interrupted. Probably shutting down.");
}
}
log.info("QueryEventWorker shut down.");
}
}
}