| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.jackrabbit.oak.jcr.observation; |
| |
| import static com.google.common.collect.Lists.newArrayList; |
| import static com.google.common.collect.Sets.newHashSet; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.jackrabbit.oak.commons.PathUtils.concat; |
| import static org.apache.jackrabbit.oak.commons.PathUtils.elements; |
| import static org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR; |
| import static org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR_STAR; |
| |
| import java.security.Principal; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import javax.jcr.RepositoryException; |
| import javax.jcr.UnsupportedRepositoryOperationException; |
| import javax.jcr.nodetype.NoSuchNodeTypeException; |
| import javax.jcr.nodetype.NodeTypeIterator; |
| import javax.jcr.observation.EventJournal; |
| import javax.jcr.observation.EventListener; |
| import javax.jcr.observation.EventListenerIterator; |
| |
| import org.apache.jackrabbit.api.observation.JackrabbitEventFilter; |
| import org.apache.jackrabbit.api.observation.JackrabbitObservationManager; |
| import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter; |
| import org.apache.jackrabbit.commons.observation.ListenerTracker; |
| import org.apache.jackrabbit.oak.api.ContentSession; |
| import org.apache.jackrabbit.oak.api.Root; |
| import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider; |
| import org.apache.jackrabbit.oak.commons.PathUtils; |
| import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate; |
| import org.apache.jackrabbit.oak.jcr.session.SessionContext; |
| import org.apache.jackrabbit.oak.namepath.NamePathMapper; |
| import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager; |
| import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter; |
| import org.apache.jackrabbit.oak.plugins.observation.ExcludeExternal; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl; |
| import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors; |
| import org.apache.jackrabbit.oak.spi.commit.Observable; |
| import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration; |
| import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider; |
| import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; |
| import org.apache.jackrabbit.oak.stats.StatisticManager; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.Marker; |
| import org.slf4j.MarkerFactory; |
| |
| public class ObservationManagerImpl implements JackrabbitObservationManager { |
| private static final Logger LOG = LoggerFactory.getLogger(ObservationManagerImpl.class); |
| private static final int STOP_TIME_OUT = 1000; |
| |
| public static final Marker OBSERVATION = |
| MarkerFactory.getMarker("observation"); |
| |
| private static final Marker DEPRECATED = |
| MarkerFactory.getMarker("deprecated"); |
| |
| private final Map<EventListener, ChangeProcessor> processors = |
| new HashMap<EventListener, ChangeProcessor>(); |
| |
| private final SessionDelegate sessionDelegate; |
| private final ReadOnlyNodeTypeManager ntMgr; |
| private final AuthorizationConfiguration authorizationConfig; |
| private final NamePathMapper namePathMapper; |
| private final Whiteboard whiteboard; |
| private final StatisticManager statisticManager; |
| private final int queueLength; |
| private final CommitRateLimiter commitRateLimiter; |
| private final PermissionProviderFactory permissionProviderFactory; |
| private final BlobAccessProvider blobAccessProvider; |
| |
| /** |
| * Create a new instance based on a {@link ContentSession} that needs to implement |
| * {@link Observable}. |
| * |
| * @param sessionContext session delegate of the session in whose context this observation manager |
| * operates. |
| * @param nodeTypeManager node type manager for the content session |
| * @param whiteboard |
| * @throws IllegalArgumentException if {@code contentSession} doesn't implement {@code Observable}. |
| */ |
| public ObservationManagerImpl( |
| SessionContext sessionContext, ReadOnlyNodeTypeManager nodeTypeManager, |
| Whiteboard whiteboard, int queueLength, CommitRateLimiter commitRateLimiter) { |
| |
| this.sessionDelegate = sessionContext.getSessionDelegate(); |
| this.authorizationConfig = sessionContext.getSecurityProvider().getConfiguration(AuthorizationConfiguration.class); |
| this.ntMgr = nodeTypeManager; |
| this.namePathMapper = sessionContext; |
| this.whiteboard = whiteboard; |
| this.statisticManager = sessionContext.getStatisticManager(); |
| this.queueLength = queueLength; |
| this.commitRateLimiter = commitRateLimiter; |
| this.blobAccessProvider = sessionContext.getBlobAccessProvider(); |
| this.permissionProviderFactory = new PermissionProviderFactory() { |
| Set<Principal> principals = sessionDelegate.getAuthInfo().getPrincipals(); |
| @NotNull |
| @Override |
| public PermissionProvider create(Root root) { |
| return authorizationConfig.getPermissionProvider(root, |
| sessionDelegate.getWorkspaceName(), principals); |
| } |
| }; |
| } |
| |
| public void dispose() { |
| List<ChangeProcessor> toBeStopped; |
| |
| synchronized (this) { |
| toBeStopped = newArrayList(processors.values()); |
| processors.clear(); |
| } |
| |
| for (ChangeProcessor processor : toBeStopped) { |
| stop(processor); |
| } |
| } |
| |
| /** for testing only, hence package protected **/ |
| synchronized ChangeProcessor getChangeProcessor(EventListener listener) { |
| return processors.get(listener); |
| } |
| |
| private synchronized void addEventListener(EventListener listener, ListenerTracker tracker, |
| FilterProvider filterProvider) { |
| |
| ChangeProcessor processor = processors.get(listener); |
| if (processor == null) { |
| LOG.debug(OBSERVATION, |
| "Registering event listener {} with filter {}", listener, filterProvider); |
| // TODO sharing the namePathMapper across different thread might lead to lock contention. |
| // If this turns out to be problematic we might create a dedicated snapshot for each |
| // session. See OAK-1368. |
| processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper, |
| tracker, filterProvider, statisticManager, queueLength, |
| commitRateLimiter, blobAccessProvider); |
| processors.put(listener, processor); |
| processor.start(whiteboard); |
| } else { |
| LOG.debug(OBSERVATION, |
| "Changing event listener {} to filter {}", listener, filterProvider); |
| processor.setFilterProvider(filterProvider); |
| } |
| } |
| |
| /** |
| * Adds an event listener that listens for the events specified |
| * by the {@code filterProvider} passed to this method. |
| * <p> |
| * The set of events will be further filtered by the access rights |
| * of the current {@code Session}. |
| * <p> |
| * The filters of an already-registered {@code EventListener} can be |
| * changed at runtime by re-registering the same {@code EventListener} |
| * object (i.e. the same actual Java object) with a new filter provider. |
| * The implementation must ensure that no events are lost during the |
| * changeover. |
| * |
| * @param listener an {@link EventListener} object. |
| * @param filterProvider filter provider specifying the filter for this listener |
| */ |
| public void addEventListener(EventListener listener, FilterProvider filterProvider) { |
| // FIXME Add support for FilterProvider in ListenerTracker |
| ListenerTracker tracker = new WarningListenerTracker( |
| true, listener, 0, null, true, null, null, false); |
| addEventListener(listener, tracker, filterProvider); |
| } |
| |
| @Override |
| public void addEventListener(EventListener listener, int eventTypes, String absPath, |
| boolean isDeep, String[] uuids, String[] nodeTypeName, boolean noLocal) |
| throws RepositoryException { |
| |
| JackrabbitEventFilter filter = new JackrabbitEventFilter(); |
| filter.setEventTypes(eventTypes); |
| if (absPath != null) { |
| filter.setAbsPath(absPath); |
| } |
| filter.setIsDeep(isDeep); |
| if (uuids != null) { |
| filter.setIdentifiers(uuids); |
| } |
| if (nodeTypeName != null) { |
| filter.setNodeTypes(nodeTypeName); |
| } |
| filter.setNoLocal(noLocal); |
| filter.setNoExternal(listener instanceof ExcludeExternal); |
| addEventListener(listener, filter); |
| } |
| |
| @Override |
| public void addEventListener(EventListener listener, JackrabbitEventFilter filter) |
| throws RepositoryException { |
| OakEventFilterImpl oakEventFilter = null; |
| if (filter instanceof OakEventFilterImpl) { |
| oakEventFilter = (OakEventFilterImpl) filter; |
| } |
| |
| int eventTypes = filter.getEventTypes(); |
| boolean isDeep = filter.getIsDeep(); |
| String[] uuids = filter.getIdentifiers(); |
| String[] nodeTypeName = filter.getNodeTypes(); |
| boolean noLocal = filter.getNoLocal(); |
| boolean noExternal = filter.getNoExternal() || listener instanceof ExcludeExternal; |
| boolean noInternal = filter.getNoInternal(); |
| Set<String> includePaths = getOakPaths(namePathMapper, filter.getAdditionalPaths()); |
| String absPath = filter.getAbsPath(); |
| if (absPath != null) { |
| includePaths.add(namePathMapper.getOakPath(absPath)); |
| } |
| Set<String> excludedPaths = getOakPaths(namePathMapper, filter.getExcludedPaths()); |
| PathUtils.unifyInExcludes(includePaths, excludedPaths); |
| if (oakEventFilter != null) { |
| String[] includeGlobPaths = oakEventFilter.getIncludeGlobPaths(); |
| if (includeGlobPaths != null) { |
| includePaths.addAll(Arrays.asList(includeGlobPaths)); |
| } |
| } |
| if (includePaths.isEmpty()) { |
| LOG.warn("The passed filter excludes all events. No event listener registered"); |
| return; |
| } |
| |
| FilterBuilder filterBuilder = new FilterBuilder(); |
| String depthPattern = isDeep ? STAR + '/' + STAR_STAR : STAR; |
| List<Condition> includeConditions = newArrayList(); |
| filterBuilder.addPathsForMBean(includePaths); |
| for (String path : includePaths) { |
| final String deepenedPath; |
| if (path.endsWith(STAR)) { |
| // that's the case for a glob ending with * already, so |
| // no need to add another * or ** |
| deepenedPath = path; |
| } else if (path.contains(STAR)) { |
| // for any other glob path that doesn't end with * |
| // we only add a single *, not a ** |
| deepenedPath = concat (path, STAR); |
| } else { |
| // for any non-glob path we do it the traditional way |
| deepenedPath = concat(path, depthPattern); |
| } |
| includeConditions.add(filterBuilder.path(deepenedPath)); |
| if (oakEventFilter != null && oakEventFilter.getIncludeAncestorsRemove()) { |
| // with the 'includeAncestorsRemove' extension we need |
| // to register '/' as the base path - done in wrapMainCondition |
| // - in order to catch any node removal. So we have to skip adding |
| // the subtree here as a result. |
| continue; |
| } |
| // only register the part leading to the first STAR: |
| filterBuilder.addSubTree(pathWithoutGlob(path)); |
| } |
| |
| List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths); |
| |
| final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName); |
| Selector nodeTypeSelector = Selectors.PARENT; |
| boolean deleteSubtree = true; |
| if (oakEventFilter != null) { |
| Condition additionalIncludes = oakEventFilter.getAdditionalIncludeConditions(includePaths); |
| if (additionalIncludes != null) { |
| includeConditions.add(additionalIncludes); |
| } |
| filterBuilder.aggregator(oakEventFilter.getAggregator()); |
| if (oakEventFilter.getApplyNodeTypeOnSelf()) { |
| nodeTypeSelector = Selectors.THIS; |
| } |
| if (oakEventFilter.getIncludeSubtreeOnRemove()) { |
| deleteSubtree = false; |
| } |
| } |
| if (deleteSubtree) { |
| excludeConditions.add(filterBuilder.deleteSubtree()); |
| } |
| |
| Condition condition = filterBuilder.all( |
| filterBuilder.all(excludeConditions), |
| filterBuilder.any(includeConditions), |
| // filterBuilder.deleteSubtree(), // moved depending on deleteSubtree on excludeConditions |
| filterBuilder.moveSubtree(), |
| filterBuilder.eventType(eventTypes), |
| filterBuilder.uuid(Selectors.PARENT, uuids), |
| filterBuilder.nodeType(nodeTypeSelector, validatedNodeTypeNames), |
| filterBuilder.accessControl(permissionProviderFactory)); |
| if (oakEventFilter != null) { |
| condition = oakEventFilter.wrapMainCondition(condition, filterBuilder, permissionProviderFactory); |
| } |
| filterBuilder |
| .includeSessionLocal(!noLocal) |
| .includeClusterExternal(!noExternal) |
| .includeClusterLocal(!noInternal) |
| .condition(condition); |
| |
| // FIXME support multiple path in ListenerTracker |
| ListenerTracker tracker = new WarningListenerTracker( |
| !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal); |
| |
| Set<String> additionalIncludePaths = null; |
| if (oakEventFilter != null) { |
| additionalIncludePaths = oakEventFilter.calcPrefilterIncludePaths(includePaths); |
| } |
| |
| // OAK-5082 : node type filtering should not only be direct but include derived types |
| // one easy way to solve this is to 'explode' the node types at start by including |
| // all subtypes of every registered node type |
| HashSet<String> explodedNodeTypes = null; |
| if (validatedNodeTypeNames != null) { |
| explodedNodeTypes = newHashSet(); |
| for (String nt : validatedNodeTypeNames) { |
| explodeSubtypes(nt, explodedNodeTypes); |
| } |
| } |
| |
| // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering |
| // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be |
| // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that. |
| filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, isDeep, additionalIncludePaths, excludedPaths, null, |
| explodedNodeTypes, null)); |
| |
| addEventListener(listener, tracker, filterBuilder.build()); |
| } |
| |
| private void explodeSubtypes(String nodeType, Set<String> set) throws RepositoryException { |
| set.add(nodeType); |
| NodeTypeIterator it = ntMgr.getNodeType(nodeType).getSubtypes(); |
| while(it.hasNext()) { |
| String subnt = String.valueOf(it.next()); |
| if (!set.contains(subnt)) { |
| set.add(subnt); |
| explodeSubtypes(subnt, set); |
| } |
| } |
| } |
| |
| private String pathWithoutGlob(String path) { |
| if (!path.contains("*")) { |
| return path; |
| } |
| Iterator<String> it = elements(path).iterator(); |
| String result = "/"; |
| while(it.hasNext()) { |
| String next = it.next(); |
| if (next.contains("*")) { |
| // then stop here |
| break; |
| } |
| result = concat(result, next); |
| } |
| return result; |
| } |
| |
| private static List<Condition> createExclusions(FilterBuilder filterBuilder, Iterable<String> excludedPaths) { |
| List<Condition> conditions = newArrayList(); |
| for (String path : excludedPaths) { |
| conditions.add(filterBuilder.not(filterBuilder.path(path + '/' + STAR_STAR))); |
| } |
| return conditions; |
| } |
| |
| private static Set<String> getOakPaths(NamePathMapper mapper, String[] paths) { |
| Set<String> oakPaths = newHashSet(); |
| for (String path : paths) { |
| oakPaths.add(mapper.getOakPath(path)); |
| } |
| return oakPaths; |
| } |
| |
| @Override |
| public void removeEventListener(EventListener listener) { |
| ChangeProcessor processor; |
| synchronized (this) { |
| processor = processors.remove(listener); |
| } |
| if (processor != null) { |
| stop(processor); // needs to happen outside synchronization |
| } |
| } |
| |
| @Override |
| public EventListenerIterator getRegisteredEventListeners() { |
| return new EventListenerIteratorAdapter(processors.keySet()); |
| } |
| |
| @Override |
| public void setUserData(@Nullable String userData) { |
| sessionDelegate.setUserData(userData); |
| } |
| |
| @Override |
| public EventJournal getEventJournal() throws RepositoryException { |
| throw new UnsupportedRepositoryOperationException(); |
| } |
| |
| @Override |
| public EventJournal getEventJournal(int eventTypes, String absPath, boolean isDeep, String[] uuid, String[] |
| nodeTypeName) throws RepositoryException { |
| throw new UnsupportedRepositoryOperationException(); |
| } |
| |
| //------------------------------------------------------------< private >--- |
| |
| /** |
| * Validates the given node type names. |
| * |
| * @param nodeTypeNames the node type names. |
| * @return the node type names as oak names. |
| * @throws javax.jcr.nodetype.NoSuchNodeTypeException if one of the node type names refers to |
| * an non-existing node type. |
| * @throws javax.jcr.RepositoryException if an error occurs while reading from the |
| * node type manager. |
| */ |
| @Nullable |
| private String[] validateNodeTypeNames(@Nullable String[] nodeTypeNames) |
| throws NoSuchNodeTypeException, RepositoryException { |
| if (nodeTypeNames == null) { |
| return null; |
| } |
| String[] oakNames = new String[nodeTypeNames.length]; |
| for (int i = 0; i < nodeTypeNames.length; i++) { |
| ntMgr.getNodeType(nodeTypeNames[i]); |
| oakNames[i] = namePathMapper.getOakName(nodeTypeNames[i]); |
| } |
| return oakNames; |
| } |
| |
| private static void stop(ChangeProcessor processor) { |
| if (!processor.stopAndWait(STOP_TIME_OUT, MILLISECONDS)) { |
| LOG.warn( |
| OBSERVATION, |
| "Timed out waiting for change processor to stop after " |
| + STOP_TIME_OUT |
| + " milliseconds. Falling back to asynchronous stop on " |
| + processor |
| + " (listener details: '" |
| + processor.getListenerToString() |
| + "')"); |
| processor.stop(); |
| } |
| } |
| |
| private class WarningListenerTracker extends ListenerTracker { |
| private final boolean enableWarning; |
| |
| public WarningListenerTracker( |
| boolean enableWarning, EventListener listener, int eventTypes, String absPath, |
| boolean isDeep, String[] uuids, String[] nodeTypeName, boolean noLocal) { |
| super(listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal); |
| this.enableWarning = enableWarning; |
| } |
| |
| @Override |
| protected void warn(String message) { |
| if (enableWarning) { |
| LOG.warn(DEPRECATED, message, initStackTrace); |
| } |
| } |
| |
| @Override |
| protected void beforeEventDelivery() { |
| sessionDelegate.refreshAtNextAccess(); |
| } |
| } |
| |
| } |