blob: dfea9a2d89ac18440d2cff2cf5838a213648cc3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.jcr.observation;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Sets.newHashSet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR;
import static org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR_STAR;
import java.security.Principal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jcr.RepositoryException;
import javax.jcr.UnsupportedRepositoryOperationException;
import javax.jcr.nodetype.NoSuchNodeTypeException;
import javax.jcr.nodetype.NodeTypeIterator;
import javax.jcr.observation.EventJournal;
import javax.jcr.observation.EventListener;
import javax.jcr.observation.EventListenerIterator;
import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.Root;
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
import org.apache.jackrabbit.oak.jcr.session.SessionContext;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.ExcludeExternal;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl;
import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class ObservationManagerImpl implements JackrabbitObservationManager {
private static final Logger LOG = LoggerFactory.getLogger(ObservationManagerImpl.class);
private static final int STOP_TIME_OUT = 1000;
public static final Marker OBSERVATION =
MarkerFactory.getMarker("observation");
private static final Marker DEPRECATED =
MarkerFactory.getMarker("deprecated");
private final Map<EventListener, ChangeProcessor> processors =
new HashMap<EventListener, ChangeProcessor>();
private final SessionDelegate sessionDelegate;
private final ReadOnlyNodeTypeManager ntMgr;
private final AuthorizationConfiguration authorizationConfig;
private final NamePathMapper namePathMapper;
private final Whiteboard whiteboard;
private final StatisticManager statisticManager;
private final int queueLength;
private final CommitRateLimiter commitRateLimiter;
private final PermissionProviderFactory permissionProviderFactory;
private final BlobAccessProvider blobAccessProvider;
/**
* Create a new instance based on a {@link ContentSession} that needs to implement
* {@link Observable}.
*
* @param sessionContext session delegate of the session in whose context this observation manager
* operates.
* @param nodeTypeManager node type manager for the content session
* @param whiteboard
* @throws IllegalArgumentException if {@code contentSession} doesn't implement {@code Observable}.
*/
public ObservationManagerImpl(
SessionContext sessionContext, ReadOnlyNodeTypeManager nodeTypeManager,
Whiteboard whiteboard, int queueLength, CommitRateLimiter commitRateLimiter) {
this.sessionDelegate = sessionContext.getSessionDelegate();
this.authorizationConfig = sessionContext.getSecurityProvider().getConfiguration(AuthorizationConfiguration.class);
this.ntMgr = nodeTypeManager;
this.namePathMapper = sessionContext;
this.whiteboard = whiteboard;
this.statisticManager = sessionContext.getStatisticManager();
this.queueLength = queueLength;
this.commitRateLimiter = commitRateLimiter;
this.blobAccessProvider = sessionContext.getBlobAccessProvider();
this.permissionProviderFactory = new PermissionProviderFactory() {
Set<Principal> principals = sessionDelegate.getAuthInfo().getPrincipals();
@NotNull
@Override
public PermissionProvider create(Root root) {
return authorizationConfig.getPermissionProvider(root,
sessionDelegate.getWorkspaceName(), principals);
}
};
}
public void dispose() {
List<ChangeProcessor> toBeStopped;
synchronized (this) {
toBeStopped = newArrayList(processors.values());
processors.clear();
}
for (ChangeProcessor processor : toBeStopped) {
stop(processor);
}
}
/** for testing only, hence package protected **/
synchronized ChangeProcessor getChangeProcessor(EventListener listener) {
return processors.get(listener);
}
private synchronized void addEventListener(EventListener listener, ListenerTracker tracker,
FilterProvider filterProvider) {
ChangeProcessor processor = processors.get(listener);
if (processor == null) {
LOG.debug(OBSERVATION,
"Registering event listener {} with filter {}", listener, filterProvider);
// TODO sharing the namePathMapper across different thread might lead to lock contention.
// If this turns out to be problematic we might create a dedicated snapshot for each
// session. See OAK-1368.
processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
tracker, filterProvider, statisticManager, queueLength,
commitRateLimiter, blobAccessProvider);
processors.put(listener, processor);
processor.start(whiteboard);
} else {
LOG.debug(OBSERVATION,
"Changing event listener {} to filter {}", listener, filterProvider);
processor.setFilterProvider(filterProvider);
}
}
/**
* Adds an event listener that listens for the events specified
* by the {@code filterProvider} passed to this method.
* <p>
* The set of events will be further filtered by the access rights
* of the current {@code Session}.
* <p>
* The filters of an already-registered {@code EventListener} can be
* changed at runtime by re-registering the same {@code EventListener}
* object (i.e. the same actual Java object) with a new filter provider.
* The implementation must ensure that no events are lost during the
* changeover.
*
* @param listener an {@link EventListener} object.
* @param filterProvider filter provider specifying the filter for this listener
*/
public void addEventListener(EventListener listener, FilterProvider filterProvider) {
// FIXME Add support for FilterProvider in ListenerTracker
ListenerTracker tracker = new WarningListenerTracker(
true, listener, 0, null, true, null, null, false);
addEventListener(listener, tracker, filterProvider);
}
@Override
public void addEventListener(EventListener listener, int eventTypes, String absPath,
boolean isDeep, String[] uuids, String[] nodeTypeName, boolean noLocal)
throws RepositoryException {
JackrabbitEventFilter filter = new JackrabbitEventFilter();
filter.setEventTypes(eventTypes);
if (absPath != null) {
filter.setAbsPath(absPath);
}
filter.setIsDeep(isDeep);
if (uuids != null) {
filter.setIdentifiers(uuids);
}
if (nodeTypeName != null) {
filter.setNodeTypes(nodeTypeName);
}
filter.setNoLocal(noLocal);
filter.setNoExternal(listener instanceof ExcludeExternal);
addEventListener(listener, filter);
}
@Override
public void addEventListener(EventListener listener, JackrabbitEventFilter filter)
throws RepositoryException {
OakEventFilterImpl oakEventFilter = null;
if (filter instanceof OakEventFilterImpl) {
oakEventFilter = (OakEventFilterImpl) filter;
}
int eventTypes = filter.getEventTypes();
boolean isDeep = filter.getIsDeep();
String[] uuids = filter.getIdentifiers();
String[] nodeTypeName = filter.getNodeTypes();
boolean noLocal = filter.getNoLocal();
boolean noExternal = filter.getNoExternal() || listener instanceof ExcludeExternal;
boolean noInternal = filter.getNoInternal();
Set<String> includePaths = getOakPaths(namePathMapper, filter.getAdditionalPaths());
String absPath = filter.getAbsPath();
if (absPath != null) {
includePaths.add(namePathMapper.getOakPath(absPath));
}
Set<String> excludedPaths = getOakPaths(namePathMapper, filter.getExcludedPaths());
PathUtils.unifyInExcludes(includePaths, excludedPaths);
if (oakEventFilter != null) {
String[] includeGlobPaths = oakEventFilter.getIncludeGlobPaths();
if (includeGlobPaths != null) {
includePaths.addAll(Arrays.asList(includeGlobPaths));
}
}
if (includePaths.isEmpty()) {
LOG.warn("The passed filter excludes all events. No event listener registered");
return;
}
FilterBuilder filterBuilder = new FilterBuilder();
String depthPattern = isDeep ? STAR + '/' + STAR_STAR : STAR;
List<Condition> includeConditions = newArrayList();
filterBuilder.addPathsForMBean(includePaths);
for (String path : includePaths) {
final String deepenedPath;
if (path.endsWith(STAR)) {
// that's the case for a glob ending with * already, so
// no need to add another * or **
deepenedPath = path;
} else if (path.contains(STAR)) {
// for any other glob path that doesn't end with *
// we only add a single *, not a **
deepenedPath = concat (path, STAR);
} else {
// for any non-glob path we do it the traditional way
deepenedPath = concat(path, depthPattern);
}
includeConditions.add(filterBuilder.path(deepenedPath));
if (oakEventFilter != null && oakEventFilter.getIncludeAncestorsRemove()) {
// with the 'includeAncestorsRemove' extension we need
// to register '/' as the base path - done in wrapMainCondition
// - in order to catch any node removal. So we have to skip adding
// the subtree here as a result.
continue;
}
// only register the part leading to the first STAR:
filterBuilder.addSubTree(pathWithoutGlob(path));
}
List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths);
final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName);
Selector nodeTypeSelector = Selectors.PARENT;
boolean deleteSubtree = true;
if (oakEventFilter != null) {
Condition additionalIncludes = oakEventFilter.getAdditionalIncludeConditions(includePaths);
if (additionalIncludes != null) {
includeConditions.add(additionalIncludes);
}
filterBuilder.aggregator(oakEventFilter.getAggregator());
if (oakEventFilter.getApplyNodeTypeOnSelf()) {
nodeTypeSelector = Selectors.THIS;
}
if (oakEventFilter.getIncludeSubtreeOnRemove()) {
deleteSubtree = false;
}
}
if (deleteSubtree) {
excludeConditions.add(filterBuilder.deleteSubtree());
}
Condition condition = filterBuilder.all(
filterBuilder.all(excludeConditions),
filterBuilder.any(includeConditions),
// filterBuilder.deleteSubtree(), // moved depending on deleteSubtree on excludeConditions
filterBuilder.moveSubtree(),
filterBuilder.eventType(eventTypes),
filterBuilder.uuid(Selectors.PARENT, uuids),
filterBuilder.nodeType(nodeTypeSelector, validatedNodeTypeNames),
filterBuilder.accessControl(permissionProviderFactory));
if (oakEventFilter != null) {
condition = oakEventFilter.wrapMainCondition(condition, filterBuilder, permissionProviderFactory);
}
filterBuilder
.includeSessionLocal(!noLocal)
.includeClusterExternal(!noExternal)
.includeClusterLocal(!noInternal)
.condition(condition);
// FIXME support multiple path in ListenerTracker
ListenerTracker tracker = new WarningListenerTracker(
!noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
Set<String> additionalIncludePaths = null;
if (oakEventFilter != null) {
additionalIncludePaths = oakEventFilter.calcPrefilterIncludePaths(includePaths);
}
// OAK-5082 : node type filtering should not only be direct but include derived types
// one easy way to solve this is to 'explode' the node types at start by including
// all subtypes of every registered node type
HashSet<String> explodedNodeTypes = null;
if (validatedNodeTypeNames != null) {
explodedNodeTypes = newHashSet();
for (String nt : validatedNodeTypeNames) {
explodeSubtypes(nt, explodedNodeTypes);
}
}
// OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering
// for things like propertyNames/nodeTypes/nodeNames/paths which cannot be
// applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that.
filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, isDeep, additionalIncludePaths, excludedPaths, null,
explodedNodeTypes, null));
addEventListener(listener, tracker, filterBuilder.build());
}
private void explodeSubtypes(String nodeType, Set<String> set) throws RepositoryException {
set.add(nodeType);
NodeTypeIterator it = ntMgr.getNodeType(nodeType).getSubtypes();
while(it.hasNext()) {
String subnt = String.valueOf(it.next());
if (!set.contains(subnt)) {
set.add(subnt);
explodeSubtypes(subnt, set);
}
}
}
private String pathWithoutGlob(String path) {
if (!path.contains("*")) {
return path;
}
Iterator<String> it = elements(path).iterator();
String result = "/";
while(it.hasNext()) {
String next = it.next();
if (next.contains("*")) {
// then stop here
break;
}
result = concat(result, next);
}
return result;
}
private static List<Condition> createExclusions(FilterBuilder filterBuilder, Iterable<String> excludedPaths) {
List<Condition> conditions = newArrayList();
for (String path : excludedPaths) {
conditions.add(filterBuilder.not(filterBuilder.path(path + '/' + STAR_STAR)));
}
return conditions;
}
private static Set<String> getOakPaths(NamePathMapper mapper, String[] paths) {
Set<String> oakPaths = newHashSet();
for (String path : paths) {
oakPaths.add(mapper.getOakPath(path));
}
return oakPaths;
}
@Override
public void removeEventListener(EventListener listener) {
ChangeProcessor processor;
synchronized (this) {
processor = processors.remove(listener);
}
if (processor != null) {
stop(processor); // needs to happen outside synchronization
}
}
@Override
public EventListenerIterator getRegisteredEventListeners() {
return new EventListenerIteratorAdapter(processors.keySet());
}
@Override
public void setUserData(@Nullable String userData) {
sessionDelegate.setUserData(userData);
}
@Override
public EventJournal getEventJournal() throws RepositoryException {
throw new UnsupportedRepositoryOperationException();
}
@Override
public EventJournal getEventJournal(int eventTypes, String absPath, boolean isDeep, String[] uuid, String[]
nodeTypeName) throws RepositoryException {
throw new UnsupportedRepositoryOperationException();
}
//------------------------------------------------------------< private >---
/**
* Validates the given node type names.
*
* @param nodeTypeNames the node type names.
* @return the node type names as oak names.
* @throws javax.jcr.nodetype.NoSuchNodeTypeException if one of the node type names refers to
* an non-existing node type.
* @throws javax.jcr.RepositoryException if an error occurs while reading from the
* node type manager.
*/
@Nullable
private String[] validateNodeTypeNames(@Nullable String[] nodeTypeNames)
throws NoSuchNodeTypeException, RepositoryException {
if (nodeTypeNames == null) {
return null;
}
String[] oakNames = new String[nodeTypeNames.length];
for (int i = 0; i < nodeTypeNames.length; i++) {
ntMgr.getNodeType(nodeTypeNames[i]);
oakNames[i] = namePathMapper.getOakName(nodeTypeNames[i]);
}
return oakNames;
}
private static void stop(ChangeProcessor processor) {
if (!processor.stopAndWait(STOP_TIME_OUT, MILLISECONDS)) {
LOG.warn(
OBSERVATION,
"Timed out waiting for change processor to stop after "
+ STOP_TIME_OUT
+ " milliseconds. Falling back to asynchronous stop on "
+ processor
+ " (listener details: '"
+ processor.getListenerToString()
+ "')");
processor.stop();
}
}
private class WarningListenerTracker extends ListenerTracker {
private final boolean enableWarning;
public WarningListenerTracker(
boolean enableWarning, EventListener listener, int eventTypes, String absPath,
boolean isDeep, String[] uuids, String[] nodeTypeName, boolean noLocal) {
super(listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
this.enableWarning = enableWarning;
}
@Override
protected void warn(String message) {
if (enableWarning) {
LOG.warn(DEPRECATED, message, initStackTrace);
}
}
@Override
protected void beforeEventDelivery() {
sessionDelegate.refreshAtNextAccess();
}
}
}