| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.streams.api.queries; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| |
| import org.apache.rya.streams.api.entity.StreamsQuery; |
| import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.util.concurrent.AbstractScheduledService; |
| |
| import edu.umd.cs.findbugs.annotations.DefaultAnnotation; |
| import edu.umd.cs.findbugs.annotations.NonNull; |
| import info.aduna.iteration.CloseableIteration; |
| |
| /** |
| * An in memory implementation of {@link QueryRepository}. It is lazily |
| * initialized the first time one of its functions is invoked and it updates |
| * its view of the {@link QueryChangeLog} any time a method is invoked that |
| * requires the latest view of the queries. |
| * </p> |
| * Thread safe. |
| */ |
| @DefaultAnnotation(NonNull.class) |
| public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository { |
| private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class); |
| |
| private final ReentrantLock lock = new ReentrantLock(); |
| |
| /** |
| * The change log that is the ground truth for describing what the queries look like. |
| */ |
| private final QueryChangeLog changeLog; |
| |
| /** |
| * Represents the position within the {@link QueryChangeLog} that {@code queriesCache} represents. |
| */ |
| private Optional<Long> cachePosition = Optional.empty(); |
| |
| /** |
| * The most recently cached view of the queries within this repository. |
| */ |
| private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>(); |
| |
| /** |
| * The listeners to be notified when new QueryChangeLogs come in. |
| */ |
| private final List<QueryChangeLogListener> listeners = new ArrayList<>(); |
| |
| /** |
| * The {@link Scheduler} the repository uses to periodically poll for query updates. |
| */ |
| private final Scheduler scheduler; |
| |
| /** |
| * Constructs an instance of {@link InMemoryQueryRepository}. |
| * |
| * @param changeLog - The change log that this repository will maintain and be based on. (not null) |
| * @param scheduler - The {@link Scheduler} this service uses to periodically check for query updates. (not null) |
| */ |
| public InMemoryQueryRepository(final QueryChangeLog changeLog, final Scheduler scheduler) { |
| this.changeLog = requireNonNull(changeLog); |
| this.scheduler = requireNonNull(scheduler); |
| } |
| |
| @Override |
| public StreamsQuery add(final String query, final boolean isActive) |
| throws QueryRepositoryException, IllegalStateException { |
| requireNonNull(query); |
| |
| lock.lock(); |
| try { |
| checkState(); |
| // First record the change to the log. |
| final UUID queryId = UUID.randomUUID(); |
| final QueryChange change = QueryChange.create(queryId, query, isActive); |
| changeLog.write(change); |
| |
| // Update the cache to represent what is currently in the log. |
| updateCache(); |
| |
| // Return the StreamsQuery that represents the just added query. |
| return queriesCache.get(queryId); |
| |
| } catch (final QueryChangeLogException e) { |
| throw new QueryRepositoryException("Could not create a Rya Streams query for the SPARQL string: " + query, e); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException, IllegalStateException { |
| requireNonNull(queryId); |
| |
| lock.lock(); |
| try { |
| checkState(); |
| // Update the cache to represent what is currently in the log. |
| updateCache(); |
| |
| return Optional.ofNullable( queriesCache.get(queryId) ); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void updateIsActive(final UUID queryId, final boolean isActive) |
| throws QueryRepositoryException, IllegalStateException { |
| requireNonNull(queryId); |
| |
| lock.lock(); |
| try { |
| checkState(); |
| // Update the cache to represent what is currently in the log. |
| updateCache(); |
| |
| // Ensure the query is in the log. |
| if(!queriesCache.containsKey(queryId)) { |
| throw new QueryRepositoryException("No query exists for ID " + queryId + "."); |
| } |
| |
| // First record the change to the log. |
| final QueryChange change = QueryChange.update(queryId, isActive); |
| changeLog.write(change); |
| |
| } catch (final QueryChangeLogException e) { |
| throw new QueryRepositoryException("Could not update the Rya Streams query for with ID: " + queryId, e); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void delete(final UUID queryId) throws QueryRepositoryException, IllegalStateException { |
| requireNonNull(queryId); |
| |
| lock.lock(); |
| try { |
| checkState(); |
| // First record the change to the log. |
| final QueryChange change = QueryChange.delete(queryId); |
| changeLog.write(change); |
| |
| } catch (final QueryChangeLogException e) { |
| throw new QueryRepositoryException("Could not delete a Rya Streams query for the Query ID: " + queryId, e); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException { |
| lock.lock(); |
| try { |
| checkState(); |
| // Update the cache to represent what is currently in the log. |
| updateCache(); |
| |
| // Our internal cache is already up to date, so just return its values. |
| return queriesCache.values() |
| .stream() |
| .collect(Collectors.toSet()); |
| |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| protected void shutDown() throws Exception { |
| super.shutDown(); |
| lock.lock(); |
| try { |
| changeLog.close(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}. |
| */ |
| private void updateCache() { |
| requireNonNull(changeLog); |
| |
| CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null; |
| try { |
| // Iterate over everything since the last position that was handled within the change log. |
| if(cachePosition.isPresent()) { |
| it = changeLog.readFromPosition(cachePosition.get() + 1); |
| } else { |
| it = changeLog.readFromStart(); |
| } |
| |
| // Apply each change to the cache. |
| while(it.hasNext()) { |
| final ChangeLogEntry<QueryChange> entry = it.next(); |
| final QueryChange change = entry.getEntry(); |
| final UUID queryId = change.getQueryId(); |
| |
| switch(change.getChangeType()) { |
| case CREATE: |
| final StreamsQuery query = new StreamsQuery( |
| queryId, |
| change.getSparql().get(), |
| change.getIsActive().get()); |
| queriesCache.put(queryId, query); |
| break; |
| |
| case UPDATE: |
| if(queriesCache.containsKey(queryId)) { |
| final StreamsQuery old = queriesCache.get(queryId); |
| final StreamsQuery updated = new StreamsQuery( |
| old.getQueryId(), |
| old.getSparql(), |
| change.getIsActive().get()); |
| queriesCache.put(queryId, updated); |
| } |
| break; |
| |
| case DELETE: |
| queriesCache.remove(queryId); |
| break; |
| } |
| |
| final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId)); |
| listeners.forEach(listener -> listener.notify(entry, newQueryState)); |
| |
| cachePosition = Optional.of( entry.getPosition() ); |
| } |
| |
| } catch (final QueryChangeLogException e) { |
| // Rethrow the exception because the object the supplier tried to create could not be created. |
| throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e); |
| |
| } finally { |
| // Try to close the iteration if it was opened. |
| try { |
| if(it != null) { |
| it.close(); |
| } |
| } catch (final QueryChangeLogException e) { |
| LOG.error("Could not close the " + CloseableIteration.class.getName(), e); |
| } |
| } |
| } |
| |
| @Override |
| protected void runOneIteration() throws Exception { |
| lock.lock(); |
| try { |
| updateCache(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| protected Scheduler scheduler() { |
| return scheduler; |
| } |
| |
| @Override |
| public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) { |
| //locks to prevent the current state from changing while subscribing. |
| lock.lock(); |
| try { |
| listeners.add(listener); |
| |
| //return the current state of the query repository |
| return queriesCache.values() |
| .stream() |
| .collect(Collectors.toSet()); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void unsubscribe(final QueryChangeLogListener listener) { |
| lock.lock(); |
| try { |
| listeners.remove(listener); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void checkState() { |
| if (!super.isRunning() && !listeners.isEmpty()) { |
| throw new IllegalStateException( |
| "The Query Repository is subscribed to, but the service has not been started."); |
| } |
| } |
| } |