| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.jackrabbit.oak.jcr.observation; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static java.util.Collections.emptyMap; |
| import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER; |
| import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION; |
| import static org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER; |
| import static org.apache.jackrabbit.oak.spi.observation.ChangeSet.COMMIT_CONTEXT_OBSERVATION_CHANGESET; |
| import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; |
| import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay; |
| |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import javax.jcr.observation.Event; |
| import javax.jcr.observation.EventIterator; |
| import javax.jcr.observation.EventListener; |
| |
| import org.apache.jackrabbit.api.jmx.EventListenerMBean; |
| import org.apache.jackrabbit.commons.observation.ListenerTracker; |
| import org.apache.jackrabbit.oak.api.ContentSession; |
| import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider; |
| import org.apache.jackrabbit.oak.commons.PerfLogger; |
| import org.apache.jackrabbit.oak.namepath.NamePathMapper; |
| import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter; |
| import org.apache.jackrabbit.oak.plugins.observation.Filter; |
| import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver; |
| import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher; |
| import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.Filters; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter; |
| import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver; |
| import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean; |
| import org.apache.jackrabbit.oak.spi.commit.CommitContext; |
| import org.apache.jackrabbit.oak.spi.commit.CommitInfo; |
| import org.apache.jackrabbit.oak.spi.commit.Observer; |
| import org.apache.jackrabbit.oak.spi.observation.ChangeSet; |
| import org.apache.jackrabbit.oak.spi.state.NodeState; |
| import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration; |
| import org.apache.jackrabbit.oak.spi.whiteboard.Registration; |
| import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; |
| import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.apache.jackrabbit.oak.stats.MeterStats; |
| import org.apache.jackrabbit.oak.stats.StatisticManager; |
| import org.apache.jackrabbit.oak.stats.TimerStats; |
| import org.apache.jackrabbit.stats.TimeSeriesMax; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.util.concurrent.Monitor; |
| import com.google.common.util.concurrent.Monitor.Guard; |
| |
| /** |
| * A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s |
| * based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}. |
| * <p> |
| * After instantiation a {@code ChangeProcessor} must be started in order to start |
| * delivering observation events and stopped to stop doing so. |
| */ |
| class ChangeProcessor implements FilteringAwareObserver { |
| private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class); |
| private static final PerfLogger PERF_LOGGER = new PerfLogger( |
| LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf")); |
| |
| private enum FilterResult { |
| /** marks a commit as to be included, ie delivered. |
| * It's okay to falsely mark a commit as included, |
| * since filtering (as part of converting to events) |
| * will be applied at a later stage again. */ |
| INCLUDE, |
| /** mark a commit as not of interest to this ChangeProcessor. |
| * Exclusion is definite, ie it's not okay to falsely |
| * mark a commit as excluded */ |
| EXCLUDE, |
| /** mark a commit as included but indicate that this |
| * is not a result of prefiltering but that prefiltering |
| * was skipped/not applicable for some reason */ |
| PREFILTERING_SKIPPED |
| } |
| |
| /** |
| * Fill ratio of the revision queue at which commits should be delayed |
| * (conditional of {@code commitRateLimiter} being non {@code null}). |
| */ |
| public static final double DELAY_THRESHOLD; |
| |
| /** |
| * Maximal number of milli seconds a commit is delayed once {@code DELAY_THRESHOLD} |
| * kicks in. |
| */ |
| public static final int MAX_DELAY; |
| |
| //It'd would have been more useful to have following 2 properties as instance variables |
| //which got set by tests. But, the tests won't get a handle to the actual instance, so |
| //static-members it is. |
| /** |
| * Number of milliseconds to wait before issuing consecutive queue full warn messages |
| * Controlled by command line property "oak.observation.full-queue.warn.interval". |
| * Note, the command line parameter is wait interval in minutes. |
| */ |
| static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer |
| .getInteger("oak.observation.full-queue.warn.interval", 30)); |
| static Clock clock = Clock.SIMPLE; |
| |
| // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now |
| static { |
| final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold"); |
| final String maxDelayStr = System.getProperty("oak.commitRateLimiter.maxDelay"); |
| double delayThreshold = 0.8; /* default is 0.8 still */ |
| int maxDelay = 10000; /* default is 10000 still */ |
| try{ |
| if (delayThresholdStr != null && delayThresholdStr.length() != 0) { |
| delayThreshold = Double.parseDouble(delayThresholdStr); |
| LOG.info("<clinit> using oak.commitRateLimiter.delayThreshold of " + delayThreshold); |
| } |
| } catch(RuntimeException e) { |
| LOG.warn("<clinit> could not parse oak.commitRateLimiter.delayThreshold, using default(" + delayThreshold + "): " + e, e); |
| } |
| try{ |
| if (maxDelayStr != null && maxDelayStr.length() != 0) { |
| maxDelay = Integer.parseInt(maxDelayStr); |
| LOG.info("<clinit> using oak.commitRateLimiter.maxDelay of " + maxDelay + "ms"); |
| } |
| } catch(RuntimeException e) { |
| LOG.warn("<clinit> could not parse oak.commitRateLimiter.maxDelay, using default(" + maxDelay + "): " + e, e); |
| } |
| DELAY_THRESHOLD = delayThreshold; |
| MAX_DELAY = maxDelay; |
| } |
| |
| private static final AtomicInteger COUNTER = new AtomicInteger(); |
| |
| /** |
| * JMX ObjectName property storing the listenerId which allows |
| * to correlate various mbeans |
| */ |
| static final String LISTENER_ID = "listenerId"; |
| |
| private final ContentSession contentSession; |
| private final NamePathMapper namePathMapper; |
| private final ListenerTracker tracker; |
| private final EventListener eventListener; |
| private final AtomicReference<FilterProvider> filterProvider; |
| private final MeterStats eventCount; |
| private final TimerStats eventDuration; |
| private final TimeSeriesMax maxQueueLengthRecorder; |
| private final int queueLength; |
| private final CommitRateLimiter commitRateLimiter; |
| private final BlobAccessProvider blobAccessProvider; |
| |
| /** |
| * Lazy initialization via the {@link #start(Whiteboard)} method |
| */ |
| private String listenerId; |
| |
| /** |
| * Lazy initialization via the {@link #start(Whiteboard)} method |
| */ |
| private CompositeRegistration registration; |
| |
| /** |
| * for statistics: tracks how many times prefiltering excluded a commit |
| */ |
| private int prefilterExcludeCount; |
| |
| /** |
| * for statistics: tracks how many times prefiltering included a commit |
| */ |
| private int prefilterIncludeCount; |
| |
| /** |
| * for statistics: tracks how many times prefiltering was ignored (not evaluated at all), |
| * either because it was disabled, queue too small, CommitInfo null or CommitContext null |
| */ |
| private int prefilterSkipCount; |
| |
| public ChangeProcessor( |
| ContentSession contentSession, |
| NamePathMapper namePathMapper, |
| ListenerTracker tracker, |
| FilterProvider filter, |
| StatisticManager statisticManager, |
| int queueLength, |
| CommitRateLimiter commitRateLimiter, |
| BlobAccessProvider blobAccessProvider) { |
| this.contentSession = contentSession; |
| this.namePathMapper = namePathMapper; |
| this.tracker = tracker; |
| eventListener = tracker.getTrackedListener(); |
| filterProvider = new AtomicReference<FilterProvider>(filter); |
| this.eventCount = statisticManager.getMeter(OBSERVATION_EVENT_COUNTER); |
| this.eventDuration = statisticManager.getTimer(OBSERVATION_EVENT_DURATION); |
| this.maxQueueLengthRecorder = statisticManager.maxQueLengthRecorder(); |
| this.queueLength = queueLength; |
| this.commitRateLimiter = commitRateLimiter; |
| this.blobAccessProvider = blobAccessProvider; |
| } |
| |
| /** |
| * Set the filter for the events this change processor will generate. |
| * @param filter |
| */ |
| public void setFilterProvider(FilterProvider filter) { |
| filterProvider.set(filter); |
| } |
| |
| /** for testing only - hence package protected **/ |
| FilterProvider getFilterProvider() { |
| return filterProvider.get(); |
| } |
| |
| @NotNull |
| public ChangeProcessorMBean getMBean() { |
| return new ChangeProcessorMBean() { |
| |
| @Override |
| public int getPrefilterExcludeCount() { |
| return prefilterExcludeCount; |
| } |
| |
| @Override |
| public int getPrefilterIncludeCount() { |
| return prefilterIncludeCount; |
| } |
| |
| @Override |
| public int getPrefilterSkipCount() { |
| return prefilterSkipCount; |
| } |
| |
| }; |
| } |
| |
| |
| /** |
| * Start this change processor |
| * @param whiteboard the whiteboard instance to used for scheduling individual |
| * runs of this change processor. |
| * @throws IllegalStateException if started already |
| */ |
| public synchronized void start(Whiteboard whiteboard) { |
| checkState(registration == null, "Change processor started already"); |
| final WhiteboardExecutor executor = new WhiteboardExecutor(); |
| executor.start(whiteboard); |
| final FilteringObserver filteringObserver = createObserver(executor); |
| listenerId = COUNTER.incrementAndGet() + ""; |
| Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId); |
| String name = tracker.toString(); |
| registration = new CompositeRegistration( |
| whiteboard.register(Observer.class, filteringObserver, emptyMap()), |
| registerMBean(whiteboard, EventListenerMBean.class, |
| tracker.getListenerMBean(), "EventListener", name, attrs), |
| registerMBean(whiteboard, BackgroundObserverMBean.class, |
| filteringObserver.getBackgroundObserver().getMBean(), BackgroundObserverMBean.TYPE, name, attrs), |
| registerMBean(whiteboard, ChangeProcessorMBean.class, |
| getMBean(), ChangeProcessorMBean.TYPE, name, attrs), |
| //TODO If FilterProvider gets changed later then MBean would need to be |
| // re-registered |
| registerMBean(whiteboard, FilterConfigMBean.class, |
| filterProvider.get().getConfigMBean(), FilterConfigMBean.TYPE, name, attrs), |
| new Registration() { |
| @Override |
| public void unregister() { |
| filteringObserver.close(); |
| } |
| }, |
| new Registration() { |
| @Override |
| public void unregister() { |
| executor.stop(); |
| } |
| }, |
| scheduleWithFixedDelay(whiteboard, new Runnable() { |
| @Override |
| public void run() { |
| tracker.recordOneSecond(); |
| } |
| }, 1) |
| ); |
| } |
| |
| private FilteringObserver createObserver(final WhiteboardExecutor executor) { |
| FilteringDispatcher fd = new FilteringDispatcher(this); |
| BackgroundObserver bo = new BackgroundObserver(fd, executor, queueLength) { |
| private volatile long delay; |
| private volatile boolean blocking; |
| |
| private long lastQueueFullWarnTimestamp = -1; |
| |
| @Override |
| protected void added(int newQueueSize) { |
| queueSizeChanged(newQueueSize); |
| } |
| |
| @Override |
| protected void removed(int newQueueSize, long created) { |
| queueSizeChanged(newQueueSize); |
| } |
| |
| private void queueSizeChanged(int newQueueSize) { |
| maxQueueLengthRecorder.recordValue(newQueueSize); |
| tracker.recordQueueLength(newQueueSize); |
| if (newQueueSize >= queueLength) { |
| if (commitRateLimiter != null) { |
| if (!blocking) { |
| logQueueFullWarning("Revision queue is full. Further commits will be blocked."); |
| } |
| commitRateLimiter.blockCommits(); |
| } else if (!blocking) { |
| logQueueFullWarning("Revision queue is full. Further revisions will be compacted."); |
| } |
| blocking = true; |
| } else { |
| double fillRatio = (double) newQueueSize / queueLength; |
| if (fillRatio > DELAY_THRESHOLD) { |
| if (commitRateLimiter != null) { |
| if (delay == 0) { |
| LOG.warn("Revision queue is becoming full. Further commits will be delayed."); |
| } |
| |
| // Linear backoff proportional to the number of items exceeding |
| // DELAY_THRESHOLD. Offset by 1 to trigger the log message in the |
| // else branch once the queue falls below DELAY_THRESHOLD again. |
| int newDelay = 1 + (int) ((fillRatio - DELAY_THRESHOLD) / (1 - DELAY_THRESHOLD) * MAX_DELAY); |
| if (newDelay > delay) { |
| delay = newDelay; |
| commitRateLimiter.setDelay(delay); |
| } |
| } |
| } else { |
| if (commitRateLimiter != null) { |
| if (delay > 0) { |
| LOG.debug("Revision queue becoming empty. Unblocking commits"); |
| commitRateLimiter.setDelay(0); |
| delay = 0; |
| } |
| if (blocking) { |
| LOG.debug("Revision queue becoming empty. Stop delaying commits."); |
| commitRateLimiter.unblockCommits(); |
| blocking = false; |
| } |
| } else { |
| blocking = false; |
| } |
| } |
| } |
| } |
| |
| private void logQueueFullWarning(String message) { |
| long currTime = clock.getTime(); |
| if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) { |
| LOG.warn("{} Suppressing further such cases for {} minutes.", |
| message, |
| TimeUnit.MILLISECONDS.toMinutes(QUEUE_FULL_WARN_INTERVAL)); |
| lastQueueFullWarnTimestamp = currTime; |
| } else { |
| LOG.debug(message); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "Prefiltering BackgroundObserver for "+ChangeProcessor.this; |
| } |
| }; |
| return new FilteringObserver(bo, new Filter() { |
| |
| @Override |
| public boolean excludes(NodeState root, CommitInfo info) { |
| final FilterResult filterResult = evalPrefilter(root, info, getChangeSet(info)); |
| switch (filterResult) { |
| case PREFILTERING_SKIPPED: { |
| prefilterSkipCount++; |
| return false; |
| } |
| case EXCLUDE: { |
| prefilterExcludeCount++; |
| return true; |
| } |
| case INCLUDE: { |
| prefilterIncludeCount++; |
| return false; |
| } |
| default: { |
| LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult); |
| prefilterSkipCount++; |
| return false; |
| } |
| } |
| } |
| }); |
| } |
| |
| private final Monitor runningMonitor = new Monitor(); |
| private final RunningGuard running = new RunningGuard(runningMonitor); |
| |
| /** |
| * Try to stop this change processor if running. This method will wait |
| * the specified time for a pending event listener to complete. If |
| * no timeout occurred no further events will be delivered after this |
| * method returns. |
| * <p> |
| * Does nothing if stopped already. |
| * |
| * @param timeOut time this method will wait for an executing event |
| * listener to complete. |
| * @param unit time unit for {@code timeOut} |
| * @return {@code true} if no time out occurred and this change processor |
| * could be stopped, {@code false} otherwise. |
| * @throws IllegalStateException if not yet started |
| */ |
| public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) { |
| checkState(registration != null, "Change processor not started"); |
| if (running.stop()) { |
| if (runningMonitor.enter(timeOut, unit)) { |
| registration.unregister(); |
| runningMonitor.leave(); |
| return true; |
| } else { |
| // Timed out |
| return false; |
| } |
| } else { |
| // Stopped already |
| return true; |
| } |
| } |
| |
| /** |
| * Stop this change processor after all pending events have been |
| * delivered. In contrast to {@link #stopAndWait(int, java.util.concurrent.TimeUnit)} |
| * this method returns immediately without waiting for pending listeners to |
| * complete. |
| */ |
| public synchronized void stop() { |
| checkState(registration != null, "Change processor not started"); |
| if (running.stop()) { |
| registration.unregister(); |
| runningMonitor.leave(); |
| } |
| } |
| |
| /** |
| * Utility method that extracts the ChangeSet from a CommitInfo if possible. |
| * @param info |
| * @return |
| */ |
| public static ChangeSet getChangeSet(CommitInfo info) { |
| if (info == null) { |
| return null; |
| } |
| CommitContext context = (CommitContext) info.getInfo().get(CommitContext.NAME); |
| if (context == null) { |
| return null; |
| } |
| return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET); |
| } |
| |
| @Override |
| public void contentChanged(@NotNull NodeState before, |
| @NotNull NodeState after, |
| @NotNull CommitInfo info) { |
| checkNotNull(before); // OAK-5160 before is now guaranteed to be non-null |
| checkNotNull(after); |
| checkNotNull(info); |
| try { |
| long start = PERF_LOGGER.start(); |
| FilterProvider provider = filterProvider.get(); |
| // FIXME don't rely on toString for session id |
| if (provider.includeCommit(contentSession.toString(), info)) { |
| EventFilter filter = provider.getFilter(before, after); |
| EventIterator events = new EventQueue(namePathMapper, |
| blobAccessProvider, info, before, after, |
| provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER), |
| provider.getEventAggregator()); |
| |
| long time = System.nanoTime(); |
| boolean hasEvents = events.hasNext(); |
| tracker.recordProducerTime(System.nanoTime() - time, TimeUnit.NANOSECONDS); |
| if (hasEvents && runningMonitor.enterIf(running)) { |
| if (commitRateLimiter != null) { |
| commitRateLimiter.beforeNonBlocking(); |
| } |
| try { |
| CountingIterator countingEvents = new CountingIterator(events); |
| eventListener.onEvent(countingEvents); |
| countingEvents.updateCounters(eventCount, eventDuration); |
| } finally { |
| if (commitRateLimiter != null) { |
| commitRateLimiter.afterNonBlocking(); |
| } |
| runningMonitor.leave(); |
| } |
| } |
| } |
| PERF_LOGGER.end(start, 100, |
| "Generated events (before: {}, after: {})", |
| before, after); |
| } catch (Exception e) { |
| LOG.warn("Error while dispatching observation events for " + tracker, e); |
| } |
| } |
| |
| private static class CountingIterator implements EventIterator { |
| private final long t0 = System.nanoTime(); |
| private final EventIterator events; |
| private long eventCount; |
| private long sysTime; |
| |
| public CountingIterator(EventIterator events) { |
| this.events = events; |
| } |
| |
| public void updateCounters(MeterStats eventCount, TimerStats eventDuration) { |
| checkState(this.eventCount >= 0); |
| eventCount.mark(this.eventCount); |
| eventDuration.update(System.nanoTime() - t0 - sysTime, TimeUnit.NANOSECONDS); |
| this.eventCount = -1; |
| } |
| |
| @Override |
| public Event next() { |
| if (eventCount == -1) { |
| LOG.warn("Access to EventIterator outside the onEvent callback detected. This will " + |
| "cause observation related values in RepositoryStatistics to become unreliable."); |
| eventCount = -2; |
| } |
| |
| long t0 = System.nanoTime(); |
| try { |
| return events.nextEvent(); |
| } finally { |
| eventCount++; |
| sysTime += System.nanoTime() - t0; |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| long t0 = System.nanoTime(); |
| try { |
| return events.hasNext(); |
| } finally { |
| sysTime += System.nanoTime() - t0; |
| } |
| } |
| |
| @Override |
| public Event nextEvent() { |
| return next(); |
| } |
| |
| @Override |
| public void skip(long skipNum) { |
| long t0 = System.nanoTime(); |
| try { |
| events.skip(skipNum); |
| } finally { |
| sysTime += System.nanoTime() - t0; |
| } |
| } |
| |
| @Override |
| public long getSize() { |
| return events.getSize(); |
| } |
| |
| @Override |
| public long getPosition() { |
| return events.getPosition(); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private static class RunningGuard extends Guard { |
| private boolean stopped; |
| |
| public RunningGuard(Monitor monitor) { |
| super(monitor); |
| } |
| |
| @Override |
| public boolean isSatisfied() { |
| return !stopped; |
| } |
| |
| /** |
| * @return {@code true} if this call set this guard to stopped, |
| * {@code false} if another call set this guard to stopped before. |
| */ |
| public boolean stop() { |
| boolean wasStopped = stopped; |
| stopped = true; |
| return !wasStopped; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "ChangeProcessor [" |
| + "listenerId=" + listenerId |
| + ", tracker=" + tracker |
| + ", contentSession=" + contentSession |
| + ", eventCount=" + eventCount |
| + ", eventDuration=" + eventDuration |
| + ", commitRateLimiter=" + commitRateLimiter |
| + ", running=" + running.isSatisfied() + "]"; |
| } |
| |
| /** for logging only **/ |
| String getListenerToString() { |
| if (tracker == null) { |
| return "null"; |
| } |
| EventListenerMBean listenerMBean = tracker.getListenerMBean(); |
| if (listenerMBean == null) { |
| return "null (no listener mbean)"; |
| } |
| return listenerMBean.getToString(); |
| } |
| |
| /** |
| * Evaluate the prefilter for a given commit. |
| * @param changeSet |
| * |
| * @return a FilterResult indicating either inclusion, exclusion or |
| * inclusion-due-to-skipping. The latter is used to reflect |
| * prefilter evaluation better in statistics (as it could also have |
| * been reported just as include) |
| */ |
| private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) { |
| if (info == null) { |
| return FilterResult.PREFILTERING_SKIPPED; |
| } |
| if (root == null) { |
| // likely only occurs at startup |
| // we can't do any diffing etc, so just not exclude it |
| return FilterResult.PREFILTERING_SKIPPED; |
| } |
| |
| final FilterProvider fp = filterProvider.get(); |
| // FIXME don't rely on toString for session id |
| if (!fp.includeCommit(contentSession.toString(), info)) { |
| // 'classic' (and cheap pre-) filtering |
| return FilterResult.EXCLUDE; |
| } |
| if (changeSet == null) { |
| // then can't do any prefiltering since it was not |
| // able to complete the sets (within the given boundaries) |
| // (this corresponds to a large commit, which thus can't |
| // go through prefiltering) |
| return FilterResult.PREFILTERING_SKIPPED; |
| } |
| |
| final ChangeSetFilter prefilter = fp; |
| if (prefilter.excludes(changeSet)) { |
| return FilterResult.EXCLUDE; |
| } else { |
| return FilterResult.INCLUDE; |
| } |
| } |
| } |