| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.jackrabbit.oak.jcr.observation; |
| |
| import static com.google.common.collect.Lists.newLinkedList; |
| |
| import java.util.LinkedList; |
| import java.util.NoSuchElementException; |
| |
| import javax.jcr.observation.Event; |
| import javax.jcr.observation.EventIterator; |
| |
| import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider; |
| import org.apache.jackrabbit.oak.commons.PathUtils; |
| import org.apache.jackrabbit.oak.namepath.NamePathMapper; |
| import org.apache.jackrabbit.oak.plugins.observation.EventGenerator; |
| import org.apache.jackrabbit.oak.plugins.observation.EventHandler; |
| import org.apache.jackrabbit.oak.plugins.observation.FilteredHandler; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter; |
| import org.apache.jackrabbit.oak.spi.commit.CommitInfo; |
| import org.apache.jackrabbit.oak.spi.state.NodeState; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Queue of JCR Events generated from a given content change |
| */ |
| class EventQueue implements EventIterator { |
| |
| private final EventGenerator generator; |
| |
| private final LinkedList<Event> queue = newLinkedList(); |
| |
| private long position = 0; |
| |
| public EventQueue( |
| @NotNull NamePathMapper mapper, |
| @NotNull BlobAccessProvider blobAccessProvider, CommitInfo info, |
| @NotNull NodeState before, @NotNull NodeState after, |
| @NotNull Iterable<String> basePaths, @NotNull EventFilter filter, |
| @Nullable EventAggregator aggregator) { |
| this.generator = new EventGenerator(); |
| EventFactory factory = new EventFactory(mapper, blobAccessProvider, info); |
| EventHandler handler = new FilteredHandler( |
| filter, new QueueingHandler(this, factory, aggregator, before, after)); |
| for (String path : basePaths) { |
| addHandler(before, after, path, handler, generator); |
| } |
| } |
| |
| private static void addHandler(NodeState before, NodeState after, String path, |
| EventHandler handler, EventGenerator generator) { |
| for (String name : PathUtils.elements(path)) { |
| before = before.getChildNode(name); |
| after = after.getChildNode(name); |
| handler = handler.getChildHandler(name, before, after); |
| if (handler == null) { |
| return; |
| } |
| } |
| generator.addHandler(before, after, handler); |
| } |
| |
| /** |
| * Called by the {@link QueueingHandler} to add new events to the queue. |
| */ |
| void addEvent(Event event) { |
| queue.add(event); |
| } |
| |
| //-----------------------------------------------------< EventIterator >-- |
| |
| @Override |
| public long getSize() { |
| if (generator.isDone()) { |
| // no more new events will be generated, so count just those |
| // that have already been iterated and those left in the queue |
| return position + queue.size(); |
| } else { |
| // the generator is not yet done, so there's no way |
| // to know how many events may still be generated |
| return -1; |
| } |
| } |
| |
| @Override |
| public long getPosition() { |
| return position; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| while (queue.isEmpty()) { |
| if (generator.isDone()) { |
| return false; |
| } else { |
| generator.generate(); |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public void skip(long skipNum) { |
| // generate events until all events to skip have been queued |
| while (skipNum > queue.size()) { |
| // drop all currently queued events as we're skipping them all |
| position += queue.size(); |
| skipNum -= queue.size(); |
| queue.clear(); |
| |
| // generate more events if possible, otherwise fail |
| if (!generator.isDone()) { |
| generator.generate(); |
| } else { |
| throw new NoSuchElementException("Not enough events to skip"); |
| } |
| } |
| |
| // the remaining events to skip are guaranteed to all be in the |
| // queue, so we can just drop those events and advance the position |
| queue.subList(0, (int) skipNum).clear(); |
| position += skipNum; |
| } |
| |
| @Override |
| public Event nextEvent() { |
| if (hasNext()) { |
| position++; |
| return queue.removeFirst(); |
| } else { |
| throw new NoSuchElementException(); |
| } |
| } |
| |
| @Override |
| public Event next() { |
| return nextEvent(); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| } |