blob: 299eeb47a8bf4550f6f59f03c2df988aede417da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.jcr.observation;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.emptyMap;
import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
import static org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER;
import static org.apache.jackrabbit.oak.spi.observation.ChangeSet.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.Filter;
import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.observation.ChangeSet;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.stats.TimeSeriesMax;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.Monitor.Guard;
/**
* A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
* based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}.
* <p>
* After instantiation a {@code ChangeProcessor} must be started in order to start
* delivering observation events and stopped to stop doing so.
*/
class ChangeProcessor implements FilteringAwareObserver {
private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
private static final PerfLogger PERF_LOGGER = new PerfLogger(
LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
private enum FilterResult {
/** marks a commit as to be included, ie delivered.
* It's okay to falsely mark a commit as included,
* since filtering (as part of converting to events)
* will be applied at a later stage again. */
INCLUDE,
/** mark a commit as not of interest to this ChangeProcessor.
* Exclusion is definite, ie it's not okay to falsely
* mark a commit as excluded */
EXCLUDE,
/** mark a commit as included but indicate that this
* is not a result of prefiltering but that prefiltering
* was skipped/not applicable for some reason */
PREFILTERING_SKIPPED
}
/**
* Fill ratio of the revision queue at which commits should be delayed
* (conditional of {@code commitRateLimiter} being non {@code null}).
*/
public static final double DELAY_THRESHOLD;
/**
* Maximal number of milli seconds a commit is delayed once {@code DELAY_THRESHOLD}
* kicks in.
*/
public static final int MAX_DELAY;
//It'd would have been more useful to have following 2 properties as instance variables
//which got set by tests. But, the tests won't get a handle to the actual instance, so
//static-members it is.
/**
* Number of milliseconds to wait before issuing consecutive queue full warn messages
* Controlled by command line property "oak.observation.full-queue.warn.interval".
* Note, the command line parameter is wait interval in minutes.
*/
static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
.getInteger("oak.observation.full-queue.warn.interval", 30));
static Clock clock = Clock.SIMPLE;
// OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
static {
final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
final String maxDelayStr = System.getProperty("oak.commitRateLimiter.maxDelay");
double delayThreshold = 0.8; /* default is 0.8 still */
int maxDelay = 10000; /* default is 10000 still */
try{
if (delayThresholdStr != null && delayThresholdStr.length() != 0) {
delayThreshold = Double.parseDouble(delayThresholdStr);
LOG.info("<clinit> using oak.commitRateLimiter.delayThreshold of " + delayThreshold);
}
} catch(RuntimeException e) {
LOG.warn("<clinit> could not parse oak.commitRateLimiter.delayThreshold, using default(" + delayThreshold + "): " + e, e);
}
try{
if (maxDelayStr != null && maxDelayStr.length() != 0) {
maxDelay = Integer.parseInt(maxDelayStr);
LOG.info("<clinit> using oak.commitRateLimiter.maxDelay of " + maxDelay + "ms");
}
} catch(RuntimeException e) {
LOG.warn("<clinit> could not parse oak.commitRateLimiter.maxDelay, using default(" + maxDelay + "): " + e, e);
}
DELAY_THRESHOLD = delayThreshold;
MAX_DELAY = maxDelay;
}
private static final AtomicInteger COUNTER = new AtomicInteger();
/**
* JMX ObjectName property storing the listenerId which allows
* to correlate various mbeans
*/
static final String LISTENER_ID = "listenerId";
private final ContentSession contentSession;
private final NamePathMapper namePathMapper;
private final ListenerTracker tracker;
private final EventListener eventListener;
private final AtomicReference<FilterProvider> filterProvider;
private final MeterStats eventCount;
private final TimerStats eventDuration;
private final TimeSeriesMax maxQueueLengthRecorder;
private final int queueLength;
private final CommitRateLimiter commitRateLimiter;
private final BlobAccessProvider blobAccessProvider;
/**
* Lazy initialization via the {@link #start(Whiteboard)} method
*/
private String listenerId;
/**
* Lazy initialization via the {@link #start(Whiteboard)} method
*/
private CompositeRegistration registration;
/**
* for statistics: tracks how many times prefiltering excluded a commit
*/
private int prefilterExcludeCount;
/**
* for statistics: tracks how many times prefiltering included a commit
*/
private int prefilterIncludeCount;
/**
* for statistics: tracks how many times prefiltering was ignored (not evaluated at all),
* either because it was disabled, queue too small, CommitInfo null or CommitContext null
*/
private int prefilterSkipCount;
public ChangeProcessor(
ContentSession contentSession,
NamePathMapper namePathMapper,
ListenerTracker tracker,
FilterProvider filter,
StatisticManager statisticManager,
int queueLength,
CommitRateLimiter commitRateLimiter,
BlobAccessProvider blobAccessProvider) {
this.contentSession = contentSession;
this.namePathMapper = namePathMapper;
this.tracker = tracker;
eventListener = tracker.getTrackedListener();
filterProvider = new AtomicReference<FilterProvider>(filter);
this.eventCount = statisticManager.getMeter(OBSERVATION_EVENT_COUNTER);
this.eventDuration = statisticManager.getTimer(OBSERVATION_EVENT_DURATION);
this.maxQueueLengthRecorder = statisticManager.maxQueLengthRecorder();
this.queueLength = queueLength;
this.commitRateLimiter = commitRateLimiter;
this.blobAccessProvider = blobAccessProvider;
}
/**
* Set the filter for the events this change processor will generate.
* @param filter
*/
public void setFilterProvider(FilterProvider filter) {
filterProvider.set(filter);
}
/** for testing only - hence package protected **/
FilterProvider getFilterProvider() {
return filterProvider.get();
}
@NotNull
public ChangeProcessorMBean getMBean() {
return new ChangeProcessorMBean() {
@Override
public int getPrefilterExcludeCount() {
return prefilterExcludeCount;
}
@Override
public int getPrefilterIncludeCount() {
return prefilterIncludeCount;
}
@Override
public int getPrefilterSkipCount() {
return prefilterSkipCount;
}
};
}
/**
* Start this change processor
* @param whiteboard the whiteboard instance to used for scheduling individual
* runs of this change processor.
* @throws IllegalStateException if started already
*/
public synchronized void start(Whiteboard whiteboard) {
checkState(registration == null, "Change processor started already");
final WhiteboardExecutor executor = new WhiteboardExecutor();
executor.start(whiteboard);
final FilteringObserver filteringObserver = createObserver(executor);
listenerId = COUNTER.incrementAndGet() + "";
Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
String name = tracker.toString();
registration = new CompositeRegistration(
whiteboard.register(Observer.class, filteringObserver, emptyMap()),
registerMBean(whiteboard, EventListenerMBean.class,
tracker.getListenerMBean(), "EventListener", name, attrs),
registerMBean(whiteboard, BackgroundObserverMBean.class,
filteringObserver.getBackgroundObserver().getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
registerMBean(whiteboard, ChangeProcessorMBean.class,
getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
//TODO If FilterProvider gets changed later then MBean would need to be
// re-registered
registerMBean(whiteboard, FilterConfigMBean.class,
filterProvider.get().getConfigMBean(), FilterConfigMBean.TYPE, name, attrs),
new Registration() {
@Override
public void unregister() {
filteringObserver.close();
}
},
new Registration() {
@Override
public void unregister() {
executor.stop();
}
},
scheduleWithFixedDelay(whiteboard, new Runnable() {
@Override
public void run() {
tracker.recordOneSecond();
}
}, 1)
);
}
private FilteringObserver createObserver(final WhiteboardExecutor executor) {
FilteringDispatcher fd = new FilteringDispatcher(this);
BackgroundObserver bo = new BackgroundObserver(fd, executor, queueLength) {
private volatile long delay;
private volatile boolean blocking;
private long lastQueueFullWarnTimestamp = -1;
@Override
protected void added(int newQueueSize) {
queueSizeChanged(newQueueSize);
}
@Override
protected void removed(int newQueueSize, long created) {
queueSizeChanged(newQueueSize);
}
private void queueSizeChanged(int newQueueSize) {
maxQueueLengthRecorder.recordValue(newQueueSize);
tracker.recordQueueLength(newQueueSize);
if (newQueueSize >= queueLength) {
if (commitRateLimiter != null) {
if (!blocking) {
logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
}
commitRateLimiter.blockCommits();
} else if (!blocking) {
logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
}
blocking = true;
} else {
double fillRatio = (double) newQueueSize / queueLength;
if (fillRatio > DELAY_THRESHOLD) {
if (commitRateLimiter != null) {
if (delay == 0) {
LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
}
// Linear backoff proportional to the number of items exceeding
// DELAY_THRESHOLD. Offset by 1 to trigger the log message in the
// else branch once the queue falls below DELAY_THRESHOLD again.
int newDelay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / (1 - DELAY_THRESHOLD) * MAX_DELAY);
if (newDelay > delay) {
delay = newDelay;
commitRateLimiter.setDelay(delay);
}
}
} else {
if (commitRateLimiter != null) {
if (delay > 0) {
LOG.debug("Revision queue becoming empty. Unblocking commits");
commitRateLimiter.setDelay(0);
delay = 0;
}
if (blocking) {
LOG.debug("Revision queue becoming empty. Stop delaying commits.");
commitRateLimiter.unblockCommits();
blocking = false;
}
} else {
blocking = false;
}
}
}
}
private void logQueueFullWarning(String message) {
long currTime = clock.getTime();
if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) {
LOG.warn("{} Suppressing further such cases for {} minutes.",
message,
TimeUnit.MILLISECONDS.toMinutes(QUEUE_FULL_WARN_INTERVAL));
lastQueueFullWarnTimestamp = currTime;
} else {
LOG.debug(message);
}
}
@Override
public String toString() {
return "Prefiltering BackgroundObserver for "+ChangeProcessor.this;
}
};
return new FilteringObserver(bo, new Filter() {
@Override
public boolean excludes(NodeState root, CommitInfo info) {
final FilterResult filterResult = evalPrefilter(root, info, getChangeSet(info));
switch (filterResult) {
case PREFILTERING_SKIPPED: {
prefilterSkipCount++;
return false;
}
case EXCLUDE: {
prefilterExcludeCount++;
return true;
}
case INCLUDE: {
prefilterIncludeCount++;
return false;
}
default: {
LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult);
prefilterSkipCount++;
return false;
}
}
}
});
}
private final Monitor runningMonitor = new Monitor();
private final RunningGuard running = new RunningGuard(runningMonitor);
/**
* Try to stop this change processor if running. This method will wait
* the specified time for a pending event listener to complete. If
* no timeout occurred no further events will be delivered after this
* method returns.
* <p>
* Does nothing if stopped already.
*
* @param timeOut time this method will wait for an executing event
* listener to complete.
* @param unit time unit for {@code timeOut}
* @return {@code true} if no time out occurred and this change processor
* could be stopped, {@code false} otherwise.
* @throws IllegalStateException if not yet started
*/
public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
checkState(registration != null, "Change processor not started");
if (running.stop()) {
if (runningMonitor.enter(timeOut, unit)) {
registration.unregister();
runningMonitor.leave();
return true;
} else {
// Timed out
return false;
}
} else {
// Stopped already
return true;
}
}
/**
* Stop this change processor after all pending events have been
* delivered. In contrast to {@link #stopAndWait(int, java.util.concurrent.TimeUnit)}
* this method returns immediately without waiting for pending listeners to
* complete.
*/
public synchronized void stop() {
checkState(registration != null, "Change processor not started");
if (running.stop()) {
registration.unregister();
runningMonitor.leave();
}
}
/**
* Utility method that extracts the ChangeSet from a CommitInfo if possible.
* @param info
* @return
*/
public static ChangeSet getChangeSet(CommitInfo info) {
if (info == null) {
return null;
}
CommitContext context = (CommitContext) info.getInfo().get(CommitContext.NAME);
if (context == null) {
return null;
}
return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
}
@Override
public void contentChanged(@NotNull NodeState before,
@NotNull NodeState after,
@NotNull CommitInfo info) {
checkNotNull(before); // OAK-5160 before is now guaranteed to be non-null
checkNotNull(after);
checkNotNull(info);
try {
long start = PERF_LOGGER.start();
FilterProvider provider = filterProvider.get();
// FIXME don't rely on toString for session id
if (provider.includeCommit(contentSession.toString(), info)) {
EventFilter filter = provider.getFilter(before, after);
EventIterator events = new EventQueue(namePathMapper,
blobAccessProvider, info, before, after,
provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER),
provider.getEventAggregator());
long time = System.nanoTime();
boolean hasEvents = events.hasNext();
tracker.recordProducerTime(System.nanoTime() - time, TimeUnit.NANOSECONDS);
if (hasEvents && runningMonitor.enterIf(running)) {
if (commitRateLimiter != null) {
commitRateLimiter.beforeNonBlocking();
}
try {
CountingIterator countingEvents = new CountingIterator(events);
eventListener.onEvent(countingEvents);
countingEvents.updateCounters(eventCount, eventDuration);
} finally {
if (commitRateLimiter != null) {
commitRateLimiter.afterNonBlocking();
}
runningMonitor.leave();
}
}
}
PERF_LOGGER.end(start, 100,
"Generated events (before: {}, after: {})",
before, after);
} catch (Exception e) {
LOG.warn("Error while dispatching observation events for " + tracker, e);
}
}
private static class CountingIterator implements EventIterator {
private final long t0 = System.nanoTime();
private final EventIterator events;
private long eventCount;
private long sysTime;
public CountingIterator(EventIterator events) {
this.events = events;
}
public void updateCounters(MeterStats eventCount, TimerStats eventDuration) {
checkState(this.eventCount >= 0);
eventCount.mark(this.eventCount);
eventDuration.update(System.nanoTime() - t0 - sysTime, TimeUnit.NANOSECONDS);
this.eventCount = -1;
}
@Override
public Event next() {
if (eventCount == -1) {
LOG.warn("Access to EventIterator outside the onEvent callback detected. This will " +
"cause observation related values in RepositoryStatistics to become unreliable.");
eventCount = -2;
}
long t0 = System.nanoTime();
try {
return events.nextEvent();
} finally {
eventCount++;
sysTime += System.nanoTime() - t0;
}
}
@Override
public boolean hasNext() {
long t0 = System.nanoTime();
try {
return events.hasNext();
} finally {
sysTime += System.nanoTime() - t0;
}
}
@Override
public Event nextEvent() {
return next();
}
@Override
public void skip(long skipNum) {
long t0 = System.nanoTime();
try {
events.skip(skipNum);
} finally {
sysTime += System.nanoTime() - t0;
}
}
@Override
public long getSize() {
return events.getSize();
}
@Override
public long getPosition() {
return events.getPosition();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private static class RunningGuard extends Guard {
private boolean stopped;
public RunningGuard(Monitor monitor) {
super(monitor);
}
@Override
public boolean isSatisfied() {
return !stopped;
}
/**
* @return {@code true} if this call set this guard to stopped,
* {@code false} if another call set this guard to stopped before.
*/
public boolean stop() {
boolean wasStopped = stopped;
stopped = true;
return !wasStopped;
}
}
@Override
public String toString() {
return "ChangeProcessor ["
+ "listenerId=" + listenerId
+ ", tracker=" + tracker
+ ", contentSession=" + contentSession
+ ", eventCount=" + eventCount
+ ", eventDuration=" + eventDuration
+ ", commitRateLimiter=" + commitRateLimiter
+ ", running=" + running.isSatisfied() + "]";
}
/** for logging only **/
String getListenerToString() {
if (tracker == null) {
return "null";
}
EventListenerMBean listenerMBean = tracker.getListenerMBean();
if (listenerMBean == null) {
return "null (no listener mbean)";
}
return listenerMBean.getToString();
}
/**
* Evaluate the prefilter for a given commit.
* @param changeSet
*
* @return a FilterResult indicating either inclusion, exclusion or
* inclusion-due-to-skipping. The latter is used to reflect
* prefilter evaluation better in statistics (as it could also have
* been reported just as include)
*/
private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) {
if (info == null) {
return FilterResult.PREFILTERING_SKIPPED;
}
if (root == null) {
// likely only occurs at startup
// we can't do any diffing etc, so just not exclude it
return FilterResult.PREFILTERING_SKIPPED;
}
final FilterProvider fp = filterProvider.get();
// FIXME don't rely on toString for session id
if (!fp.includeCommit(contentSession.toString(), info)) {
// 'classic' (and cheap pre-) filtering
return FilterResult.EXCLUDE;
}
if (changeSet == null) {
// then can't do any prefiltering since it was not
// able to complete the sets (within the given boundaries)
// (this corresponds to a large commit, which thus can't
// go through prefiltering)
return FilterResult.PREFILTERING_SKIPPED;
}
final ChangeSetFilter prefilter = fp;
if (prefilter.excludes(changeSet)) {
return FilterResult.EXCLUDE;
} else {
return FilterResult.INCLUDE;
}
}
}