/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.streams.querymanager;

import static java.util.Objects.requireNonNull;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.api.queries.QueryChange;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryChangeLogListener;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.UncheckedExecutionException;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
 * A service for managing {@link StreamsQuery} running on a Rya Streams system.
 * <p>
 * Only one QueryManager needs to be running to manage any number of rya
 * instances/rya streams instances.
 */
@DefaultAnnotation(NonNull.class)
public class QueryManager extends AbstractService {
    private static final Logger log = LoggerFactory.getLogger(QueryManager.class);

    /**
     * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
     * Rya instnace.
     */
    private final QueryChangeLogSource changeLogSource;

    /**
     * The engine that is responsible for executing {@link StreamsQuery}s.
     */
    private final QueryExecutor queryExecutor;

    /**
     * How long blocking operations will be attempted before potentially trying again.
     */
    private final long blockingValue;

    /**
     * The units for {@link #blockingValue}.
     */
    private final TimeUnit blockingUnits;

    /**
     * Used to inform threads that the application is shutting down, so they must stop work.
     */
    private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);

    /**
     * This thread pool manages the two thread used to work the {@link LogEvent}s
     * and the {@link QueryEvent}s.
     */
    private final ExecutorService executor = Executors.newFixedThreadPool(2);

    /**
     * Creates a new {@link QueryManager}.
     *
     * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
     * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
     * @param blockingValue - How long blocking operations will try before looping. (> 0)
     * @param blockingUnits - The units of the {@code blockingValue}. (not null)
     */
    public QueryManager(
            final QueryExecutor queryExecutor,
            final QueryChangeLogSource source,
            final long blockingValue,
            final TimeUnit blockingUnits) {
        this.changeLogSource = requireNonNull(source);
        this.queryExecutor = requireNonNull(queryExecutor);
        Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
        this.blockingValue = blockingValue;
        this.blockingUnits = requireNonNull(blockingUnits);
    }

    @Override
    protected void doStart() {
        log.info("Starting a QueryManager.");

        // A work queue of discovered Query Change Logs that need to be handled.
        // This queue exists so that the source notifying thread may be released
        // immediately instead of calling into blocking functions.
        final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);

        // A work queue of discovered Query Changes from the monitored Query Change Logs
        // that need to be handled. This queue exists so that the Query Repository notifying
        // thread may be released immediately instead of calling into blocking functions.
        final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);

        try {
            // Start up a LogEventWorker using the executor service.
            executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));

            // Start up a QueryEvent Worker using the executor service.
            executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));

            // Start up the query execution framework.
            queryExecutor.startAndWait();

            // Startup the source that discovers new Query Change Logs.
            changeLogSource.startAndWait();

            // Subscribe the source a listener that writes to the LogEventWorker's work queue.
            changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
        } catch(final RejectedExecutionException | UncheckedExecutionException e) {
            log.error("Could not start up a QueryManager.", e);
            notifyFailed(e);
        }

        // Notify the service was successfully started.
        notifyStarted();

        log.info("QueryManager has finished starting.");
    }

    @Override
    protected void doStop() {
        log.info("Stopping a QueryManager.");

        // Set the shutdown flag so that all components that rely on that signal will stop processing.
        shutdownSignal.set(true);

        // Stop the workers and wait for them to die.
        executor.shutdownNow();
        try {
            if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
            }
        } catch (final InterruptedException e) {
            log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
        }

        // Stop the source of new Change Logs.
        try {
            changeLogSource.stopAndWait();
        } catch(final UncheckedExecutionException e) {
            log.warn("Could not stop the Change Log Source.", e);
        }

        // Stop the query execution framework.
        try {
            queryExecutor.stopAndWait();
        } catch(final UncheckedExecutionException e) {
            log.warn("Could not stop the Query Executor", e);
        }

        // Notify the service was successfully stopped.
        notifyStopped();

        log.info("QueryManager has finished stopping.");
    }

    /**
     * Offer a unit of work to a blocking queue until it is either accepted, or the
     * shutdown signal is set.
     *
     * @param workQueue - The blocking work queue to write to. (not null)
     * @param event - The event that will be offered to the work queue. (not null)
     * @param offerValue - How long to wait when offering new work.
     * @param offerUnits - The unit for the {@code offerValue}. (not null)
     * @param shutdownSignal - Used to signal application shutdown has started, so
     *   this method may terminate without ever placing the event on the queue. (not null)
     * @return {@code true} if the evet nwas added to the queue, otherwise false.
     */
    private static <T> boolean offerUntilAcceptedOrShutdown(
            final BlockingQueue<T> workQueue,
            final T event,
            final long offerValue,
            final TimeUnit offerUnits,
            final AtomicBoolean shutdownSignal) {
        requireNonNull(workQueue);
        requireNonNull(event);
        requireNonNull(shutdownSignal);

        boolean submitted = false;
        while(!submitted && !shutdownSignal.get()) {
            try {
                submitted = workQueue.offer(event, offerValue, offerUnits);
                if(!submitted) {
                    log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
                }
            } catch (final InterruptedException e) {
                log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
            }
        }
        return submitted;
    }

    /**
     * An observation that a {@link QueryChangeLog} was created within or
     * removed from a {@link QueryChangeLogSource}.
     */
    @DefaultAnnotation(NonNull.class)
    static class LogEvent {

        /**
         * The types of events that may be observed.
         */
        static enum LogEventType {
            /**
             * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
             */
            CREATE,

            /**
             * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
             */
            DELETE;
        }

        private final String ryaInstance;
        private final LogEventType eventType;
        private final Optional<QueryChangeLog> log;

        /**
         * Constructs an instance of {@link LogEvent}.
         *
         * @param ryaInstance - The Rya Instance the log is/was for. (not null)
         * @param eventType - The type of event that was observed. (not null)
         * @param log - The log if this is a create event. (not null)
         */
        private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
            this.ryaInstance = requireNonNull(ryaInstance);
            this.eventType = requireNonNull(eventType);
            this.log = requireNonNull(log);
        }

        /**
         * @return The Rya Instance whose log was either created or deleted.
         */
        public String getRyaInstanceName() {
            return ryaInstance;
        }

        /**
         * @return The type of event that was observed.
         */
        public LogEventType getEventType() {
            return eventType;
        }

        /**
         * @return The {@link QueryChangeLog} if this is a CREATE event.
         */
        public Optional<QueryChangeLog> getQueryChangeLog() {
            return log;
        }

        @Override
        public String toString() {
            return "LogEvent {\n" +
                   "    Rya Instance: " + ryaInstance + ",\n" +
                   "    Event Type: " + eventType + "\n" +
                   "}";
        }

        /**
         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
         * {@link QueryChangeLogSource}.
         *
         * @param ryaInstance - The Rya Instance the created log is for. (not null)
         * @param log - The created {@link QueryChangeLog. (not null)
         * @return A {@link LogEvent} built using the provided values.
         */
        public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
            return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
        }

        /**
         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
         * a {@link QueryChangeLogSource}.
         *
         * @param ryaInstance - The Rya Instance whose log was deleted. (not null)
         * @return A {@link LogEvent} built using the provided values.
         */
        public static LogEvent delete(final String ryaInstance) {
            return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
        }
    }

    /**
     * An observation that a {@link StreamsQuery} needs to be executing or not
     * via the provided {@link QueryExecutor}.
     */
    @DefaultAnnotation(NonNull.class)
    static class QueryEvent {

        /**
         * The type of events that may be observed.
         */
        public static enum QueryEventType {
            /**
             * Indicates a {@link StreamsQuery} needs to be executing.
             */
            EXECUTING,

            /**
             * Indicates a {@link StreamsQuery} needs to be stopped.
             */
            STOPPED,

            /**
             * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
             */
            STOP_ALL;
        }

        private final String ryaInstance;
        private final QueryEventType type;
        private final Optional<UUID> queryId;
        private final Optional<StreamsQuery> query;

        /**
         * Constructs an instance of {@link QueryEvent}.
         *
         * @param ryaInstance - The Rya instance that generated the event. (not null)
         * @param type - Indicates whether the query needs to be executing or not. (not null)
         * @param queryId - If stopped, the ID of the query that must not be running. (not null)
         * @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
         */
        private QueryEvent(
                final String ryaInstance,
                final QueryEventType type,
                final Optional<UUID> queryId,
                final Optional<StreamsQuery> query) {
            this.ryaInstance = requireNonNull(ryaInstance);
            this.type = requireNonNull(type);
            this.queryId = requireNonNull(queryId);
            this.query = requireNonNull(query);
        }

        /**
         * @return The Rya instance that generated the event.
         */
        public String getRyaInstance() {
            return ryaInstance;
        }

        /**
         * @return Indicates whether the query needs to be executing or not.
         */
        public QueryEventType getType() {
            return type;
        }

        /**
         * @return If stopped, the ID of the query that must not be running. Otherwise absent.
         */
        public Optional<UUID> getQueryId() {
            return queryId;
        }

        /**
         * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
         */
        public Optional<StreamsQuery> getStreamsQuery() {
            return query;
        }

        @Override
        public int hashCode() {
            return Objects.hash(ryaInstance, type, queryId, query);
        }

        @Override
        public boolean equals(final Object o) {
            if(o instanceof QueryEvent) {
                final QueryEvent other = (QueryEvent) o;
                return Objects.equals(ryaInstance, other.ryaInstance) &&
                        Objects.equals(type, other.type) &&
                        Objects.equals(queryId, other.queryId) &&
                        Objects.equals(query, other.query);
            }
            return false;
        }

        @Override
        public String toString() {
            final StringBuilder string = new StringBuilder();
            string.append("Query Event {\n")
                  .append("    Rya Instance: ").append(ryaInstance).append(",\n")
                  .append("    Type: ").append(type).append(",\n");
            switch(type) {
                case EXECUTING:
                    append(string, query.get());
                    break;
                case STOPPED:
                    string.append("    Query ID: ").append(queryId.get()).append("\n");
                    break;
                case STOP_ALL:
                    break;
                default:
                    // Default to showing everything that is in the object.
                    string.append("    Query ID: ").append(queryId.get()).append("\n");
                    append(string, query.get());
                    break;
            }
            string.append("}");
            return string.toString();
        }

        private void append(final StringBuilder string, final StreamsQuery query) {
            requireNonNull(string);
            requireNonNull(query);
            string.append("    Streams Query {\n")
                  .append("        Query ID: ").append(query.getQueryId()).append(",\n")
                  .append("        Is Active: ").append(query.isActive()).append(",\n")
                  .append("        SPARQL: ").append(query.getSparql()).append("\n")
                  .append("    }");
        }

        /**
         * Create a {@link QueryEvent} that indicates a query needs to be executing.
         *
         * @param ryaInstance - The Rya instance that generated the event. (not null)
         * @param query - The StreamsQuery that defines what should be executing. (not null)
         * @return A {@link QueryEvent} built using the provided values.
         */
        public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
            return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
        }

        /**
         * Create a {@link QueryEvent} that indicates a query needs to be stopped.
         *
         * @param ryaInstance - The Rya instance that generated the event. (not null)
         * @param queryId - The ID of the query that must not be running. (not null)
         * @return A {@link QueryEvent} built using the provided values.
         */
        public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
            return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
        }

        /**
         * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
         *
         * @param ryaInstance - The Rya instance that generated the event. (not null)
         * @return A {@link QueryEvent} built using the provided values.
         */
        public static QueryEvent stopALL(final String ryaInstance) {
            return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
        }
    }

    /**
     * Listens to a {@link QueryChangeLogSource} and adds observations to the provided
     * work queue. It does so until the provided shutdown signal is set.
     */
    @DefaultAnnotation(NonNull.class)
    static class LogEventWorkGenerator implements SourceListener {

        private final BlockingQueue<LogEvent> workQueue;
        private final AtomicBoolean shutdownSignal;
        private final long offerValue;
        private final TimeUnit offerUnits;

        /**
         * Constructs an instance of {@link QueryManagerSourceListener}.
         *
         * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
         * @param offerValue - How long to wait when offering new work.
         * @param offerUnits - The unit for the {@code offerValue}. (not null)
         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
         *   to the work queue because the application is shutting down. (not null)
         */
        public LogEventWorkGenerator(
                final BlockingQueue<LogEvent> workQueue,
                final long offerValue,
                final TimeUnit offerUnits,
                final AtomicBoolean shutdownSignal) {
            this.workQueue = requireNonNull(workQueue);
            this.shutdownSignal = requireNonNull(shutdownSignal);
            this.offerValue = offerValue;
            this.offerUnits = requireNonNull(offerUnits);
        }

        @Override
        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
            log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
                    "queries that are set to active within it will be started.");

            // Create an event that summarizes this notification.
            final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);

            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
        }

        @Override
        public void notifyDelete(final String ryaInstanceName) {
            log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
                    "queries related to that instance will be stopped.");

            // Create an event that summarizes this notification.
            final LogEvent event = LogEvent.delete(ryaInstanceName);

            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
        }
    }

    /**
     * Processes a work queue of {@link LogEvent}s.
     * <p/>
     * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
     * that generates {@link QueryEvent}s based on the content and updates to the discovered
     * {@link QueryChagneLog}.
     * <p/>
     * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
     * is written to the work queue.
     */
    @DefaultAnnotation(NonNull.class)
    static class LogEventWorker implements Runnable {

        /**
         * A map of Rya Instance name to he Query Repository for that instance.
         */
        private final Map<String, QueryRepository> repos = new HashMap<>();

        private final BlockingQueue<LogEvent> logWorkQueue;
        private final BlockingQueue<QueryEvent> queryWorkQueue;
        private final long blockingValue;
        private final TimeUnit blockingUnits;
        private final AtomicBoolean shutdownSignal;

        /**
         * Constructs an instance of {@link LogEventWorker}.
         *
         * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
         * @param blockingValue - How long to wait when polling/offering new work.
         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
         *   may exit the {@link #run()} method. (not null)
         */
        public LogEventWorker(
                final BlockingQueue<LogEvent> logWorkQueue,
                final BlockingQueue<QueryEvent> queryWorkQueue,
                final long blockingValue,
                final TimeUnit blockingUnits,
                final AtomicBoolean shutdownSignal) {
            this.logWorkQueue = requireNonNull(logWorkQueue);
            this.queryWorkQueue = requireNonNull(queryWorkQueue);
            this.blockingValue = blockingValue;
            this.blockingUnits = requireNonNull(blockingUnits);
            this.shutdownSignal = requireNonNull(shutdownSignal);
        }

        @Override
        public void run() {
            // Run until the shutdown signal is set.
            while(!shutdownSignal.get()) {
                try {
                    // Pull a unit of work from the queue.
                    log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
                    final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
                    if(logEvent == null) {
                        // Poll again if nothing was found.
                        continue;
                    }

                    log.info("LogEventWorker - handling: \n" + logEvent);
                    final String ryaInstance = logEvent.getRyaInstanceName();

                    switch(logEvent.getEventType()) {
                        case CREATE:
                            // If we see a create message for a Rya Instance we are already maintaining,
                            // then don't do anything.
                            if(repos.containsKey(ryaInstance)) {
                                log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
                                        ryaInstance + ". This message will be ignored.");
                                continue;
                            }

                            // Create and start a QueryRepository for the discovered log. Hold onto the repository
                            // so that it may be shutdown later.
                            final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
                            final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
                            repo.startAndWait();
                            repos.put(ryaInstance, repo);

                            // Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
                            // A count down latch is used to ensure the returned set of queries are handled
                            // prior to any notifications from the repository.
                            final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
                            final QueryEventWorkGenerator queryWorkGenerator =
                                    new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
                                            blockingValue, blockingUnits, shutdownSignal);

                            log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
                            final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
                            log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");

                            // Handle the view of the queries within the repository as it existed when
                            // the subscription was registered.
                            queries.stream()
                            .forEach(query -> {
                                // Create a QueryEvent that represents the active state of the existing query.
                                final QueryEvent queryEvent = query.isActive() ?
                                        QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
                                log.debug("LogEventWorker - offering: " + queryEvent);

                                // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
                                offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
                            });

                            // Indicate the subscription work is finished so that the registered listener may start
                            // adding work to the queue.
                            log.info("LogEventWorker - Counting down the subscription work latch.");
                            subscriptionWorkFinished.countDown();
                            break;

                        case DELETE:
                            if(repos.containsKey(ryaInstance)) {
                                // Shut down the query repository for the Rya instance. This ensures the listener will
                                // not receive any more work that needs to be done.
                                final QueryRepository deletedRepo = repos.remove(ryaInstance);
                                deletedRepo.stopAndWait();

                                // Add work that stops all of the queries related to the instance.
                                final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
                                offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
                            }
                            break;
                    }
                } catch (final InterruptedException e) {
                    log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
                }
            }

            log.info("LogEventWorker shutting down...");

            // Shutdown all of the QueryRepositories that were started.
            repos.values().forEach(repo -> repo.stopAndWait());

            log.info("LogEventWorker shut down.");
        }
    }

    /**
     * Listens to a {@link QueryRepository} and adds observations to the provided work queue.
     * It does so until the provided shutdown signal is set.
     */
    @DefaultAnnotation(NonNull.class)
    static class QueryEventWorkGenerator implements QueryChangeLogListener {

        private final String ryaInstance;
        private final CountDownLatch subscriptionWorkFinished;
        private final BlockingQueue<QueryEvent> queryWorkQueue;
        private final long blockingValue;
        private final TimeUnit blockingUnits;
        private final AtomicBoolean shutdownSignal;

        /**
         * Constructs an instance of {@link QueryEventWorkGenerator}.
         *
         * @param ryaInstance - The rya instance whose log this objects is watching. (not null)
         * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
         *   listener handles notifications is completed. (not null)
         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
         * @param blockingValue - How long to wait when polling/offering new work.
         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
         *   to the work queue because the application is shutting down. (not null)
         */
        public QueryEventWorkGenerator(
                final String ryaInstance,
                final CountDownLatch subscriptionWorkFinished,
                final BlockingQueue<QueryEvent> queryWorkQueue,
                final long blockingValue,
                final TimeUnit blockingUnits,
                final AtomicBoolean shutdownSignal) {
            this.ryaInstance = requireNonNull(ryaInstance);
            this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
            this.queryWorkQueue = requireNonNull(queryWorkQueue);
            this.blockingValue = blockingValue;
            this.blockingUnits = requireNonNull(blockingUnits);
            this.shutdownSignal = requireNonNull(shutdownSignal);
        }

        @Override
        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
            requireNonNull(queryChangeEvent);
            requireNonNull(newQueryState);

            // Wait for the subscription work to be finished.
            try {
                log.debug("Waiting for Subscription Work Finished latch to release...");
                while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
                    log.debug("Still waiting...");
                }
                log.debug("Subscription Work Finished latch to released.");
            } catch (final InterruptedException e) {
                log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
                        "released. Shutting down?", e);
            }

            // If we left the loop because of a shutdown, return immediately.
            if(shutdownSignal.get()) {
                log.debug("Not processing notification. Shutting down.");
                return;
            }

            // Generate work from the notification.
            final QueryChange change = queryChangeEvent.getEntry();
            switch(change.getChangeType()) {
                case CREATE:
                    if(newQueryState.isPresent()) {
                        log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
                        final StreamsQuery newQuery = newQueryState.get();
                        if(newQuery.isActive()) {
                            final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
                        }
                    } else {
                        log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
                                "StreamsQuery representing the created query. The query will not be processed.");
                    }
                    break;

                case DELETE:
                    final UUID deletedQueryId = change.getQueryId();
                    log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
                    final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
                    offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
                    break;

                case UPDATE:
                    if(newQueryState.isPresent()) {
                        final StreamsQuery updatedQuery = newQueryState.get();
                        if(updatedQuery.isActive()) {
                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
                                    updatedQuery.getQueryId() + " to be active.");
                            final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
                        } else {
                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
                                    updatedQuery.getQueryId() + " to be inactive.");
                            final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
                            offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
                        }
                    } else {
                        log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
                                "StreamsQuery representing the created query. The query will not be processed.");
                    }
                    break;
            }
        }
    }

    /**
     * Processes a work queue of {@link QueryEvent}s.
     * <p/>
     * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
     */
    @DefaultAnnotation(NonNull.class)
    static class QueryEventWorker implements Runnable {

        private final BlockingQueue<QueryEvent> workQueue;
        private final QueryExecutor queryExecutor;
        private final long pollingValue;
        private final TimeUnit pollingUnits;
        private final AtomicBoolean shutdownSignal;

        /**
         * Constructs an instance of {@link QueryEventWorker}.
         *
         * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
         * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
         * @param pollingValue - How long to wait when polling for new work.
         * @param pollingUnits - The units for the {@code pollingValue}. (not null)
         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
         *   may exit the {@link #run()} method. (not null)
         */
        public QueryEventWorker(
                final BlockingQueue<QueryEvent> workQueue,
                final QueryExecutor queryExecutor,
                final long pollingValue,
                final TimeUnit pollingUnits,
                final AtomicBoolean shutdownSignal) {
            this.workQueue = requireNonNull(workQueue);
            this.queryExecutor = requireNonNull(queryExecutor);
            this.pollingValue = pollingValue;
            this.pollingUnits = requireNonNull(pollingUnits);
            this.shutdownSignal = requireNonNull(shutdownSignal);
        }

        @Override
        public void run() {
            log.info("QueryEventWorker starting.");

            // Run until the shutdown signal is set.
            while(!shutdownSignal.get()) {
                // Pull a unit of work from the queue.
                try {
                    log.debug("Polling the work queue for a new QueryEvent.");
                    final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
                    if(event == null) {
                        // Poll again if nothing was found.
                        continue;
                    }

                    log.info("QueryEventWorker handling:\n" + event);

                    // Ensure the state within the executor matches the query event's state.
                    switch(event.getType()) {
                        case EXECUTING:
                            try {
                                queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
                            } catch (final IllegalStateException | QueryExecutorException e) {
                                log.error("Could not start a query represented by the following work: " + event, e);
                            }
                            break;

                        case STOPPED:
                            try {
                                queryExecutor.stopQuery(event.getQueryId().get());
                            } catch (final IllegalStateException | QueryExecutorException e) {
                                log.error("Could not stop a query represented by the following work: " + event, e);
                            }
                            break;

                        case STOP_ALL:
                            try {
                                queryExecutor.stopAll(event.getRyaInstance());
                            } catch (final IllegalStateException | QueryExecutorException e) {
                                log.error("Could not stop all queries represented by the following work: " + event, e);
                            }
                        break;
                    }
                } catch (final InterruptedException e) {
                    log.debug("QueryEventWorker interrupted. Probably shutting down.");
                }
            }
            log.info("QueryEventWorker shut down.");
        }
    }
}