blob: 9da58a40c4a3a3a94396e0ec182a59a9ac78eda3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.vault.fs.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jcr.Credentials;
import javax.jcr.LoginException;
import javax.jcr.Node;
import javax.jcr.PropertyType;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.nodetype.NodeDefinition;
import javax.jcr.nodetype.NodeType;
import javax.jcr.nodetype.PropertyDefinition;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.vault.fs.api.AggregateManager;
import org.apache.jackrabbit.vault.fs.api.Aggregator;
import org.apache.jackrabbit.vault.fs.api.ArtifactHandler;
import org.apache.jackrabbit.vault.fs.api.DumpContext;
import org.apache.jackrabbit.vault.fs.api.ImportInfo;
import org.apache.jackrabbit.vault.fs.api.ProgressTrackerListener;
import org.apache.jackrabbit.vault.fs.api.RepositoryAddress;
import org.apache.jackrabbit.vault.fs.api.VaultFsConfig;
import org.apache.jackrabbit.vault.fs.api.WorkspaceFilter;
import org.apache.jackrabbit.vault.fs.config.AbstractVaultFsConfig;
import org.apache.jackrabbit.vault.fs.config.ConfigurationException;
import org.apache.jackrabbit.vault.fs.config.DefaultWorkspaceFilter;
import org.apache.jackrabbit.vault.fs.impl.aggregator.RootAggregator;
import org.apache.jackrabbit.vault.fs.spi.CNDReader;
import org.apache.jackrabbit.vault.fs.spi.NodeTypeInstaller;
import org.apache.jackrabbit.vault.fs.spi.ServiceProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The artifact manager exposes an artifact node tree using the configured
* aggregators and serializers.
*/
public class AggregateManagerImpl implements AggregateManager {
/**
* the name of the (internal) default config
*/
private static final String DEFAULT_CONFIG =
"org/apache/jackrabbit/vault/fs/config/defaultConfig-1.1.xml";
/**
* the name of the (internal) default config
*/
private static final String DEFAULT_BINARY_REFERENCES_CONFIG =
"org/apache/jackrabbit/vault/fs/config/defaultConfig-1.1-binaryless.xml";
/**
* the name of the (internal) default workspace filter
*/
private static final String DEFAULT_WSP_FILTER = "" +
"org/apache/jackrabbit/vault/fs/config/defaultFilter-1.0.xml";
/**
* name of node types resource
*/
private static final String DEFAULT_NODETYPES = "" +
"org/apache/jackrabbit/vault/fs/config/nodetypes.cnd";
/**
* the repository session for this manager
*/
private Session session;
/**
* indicates if this manager owns the session and is allowed to close
* it in {@link #unmount()}
*/
private final boolean ownSession;
/**
* The repository address of the mountpoint;
*/
private final RepositoryAddress mountpoint;
/**
* provider that selects the respective aggregator for a repository node
*/
private final AggregatorProvider aggregatorProvider;
/**
* list of artifact handlers
*/
private final List<ArtifactHandler> artifactHandlers;
/**
* filter to general includes/excludes
*/
private final WorkspaceFilter workspaceFilter;
private AggregatorTracker tracker;
/**
* Set of node types used in the aggregates. this is cumulated when building
* the aggregates
*/
private final Set<String> nodeTypes = new HashSet<String>();
private final Map<String, String> nameCache = new ReferenceMap(ReferenceMap.WEAK, ReferenceMap.WEAK);
/**
* config
*/
private final VaultFsConfig config;
/**
* the root aggregate
*/
private final AggregateImpl root;
/**
* Creates a new artifact manager that is rooted at the given node.
*
* @param config fs config
* @param wspFilter the workspace filter
* @param mountpoint the address of the mountpoint
* @param session the repository session
* @return an artifact manager
* @throws RepositoryException if an error occurs.
*/
public static AggregateManager mount(VaultFsConfig config,
WorkspaceFilter wspFilter,
RepositoryAddress mountpoint,
Session session)
throws RepositoryException {
if (config == null) {
config = getDefaultConfig();
}
if (wspFilter == null) {
wspFilter = getDefaultWorkspaceFilter();
}
Node rootNode = session.getNode(mountpoint.getPath());
return new AggregateManagerImpl(config, wspFilter, mountpoint, rootNode, false);
}
/**
* Creates a new artifact manager that is rooted at the given path using
* the provided repository, credentials and workspace to create the
* session.
*
* @param config fs config
* @param wspFilter the workspace filter
* @param rep the jcr repository
* @param credentials the credentials
* @param mountpoint the address of the mountpoint
* @return an artifact manager
* @throws RepositoryException if an error occurs.
*/
public static AggregateManager mount(VaultFsConfig config,
WorkspaceFilter wspFilter,
Repository rep,
Credentials credentials,
RepositoryAddress mountpoint)
throws RepositoryException {
if (config == null) {
config = getDefaultConfig();
}
if (wspFilter == null) {
wspFilter = getDefaultWorkspaceFilter();
}
Node rootNode;
String wspName = mountpoint.getWorkspace();
try {
rootNode = rep.login(credentials, wspName).getNode(mountpoint.getPath());
} catch (LoginException e) {
if (wspName == null) {
// try again with default workspace
// todo: make configurable
rootNode = rep.login(credentials, "crx.default").getNode(mountpoint.getPath());
} else {
throw e;
}
}
return new AggregateManagerImpl(config, wspFilter, mountpoint, rootNode, true);
}
/**
* Returns the default config
* @return the default config
*/
public static VaultFsConfig getDefaultConfig() {
try (InputStream in = AggregateManagerImpl.class.getClassLoader()
.getResourceAsStream(DEFAULT_CONFIG)) {
if (in == null) {
throw new InternalError("Default config not in classpath: " + DEFAULT_CONFIG);
}
return AbstractVaultFsConfig.load(in, DEFAULT_CONFIG);
} catch (ConfigurationException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
} catch (IOException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
}
}
/**
* Returns the default config
* @return the default config
*/
public static VaultFsConfig getDefaultBinaryReferencesConfig() {
try (InputStream in = AggregateManagerImpl.class.getClassLoader()
.getResourceAsStream(DEFAULT_BINARY_REFERENCES_CONFIG)) {
if (in == null) {
throw new InternalError("Default config not in classpath: " + DEFAULT_BINARY_REFERENCES_CONFIG);
}
return AbstractVaultFsConfig.load(in, DEFAULT_BINARY_REFERENCES_CONFIG);
} catch (ConfigurationException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
} catch (IOException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
}
}
/**
* Returns the default workspace filter
* @return the default workspace filter
*/
public static DefaultWorkspaceFilter getDefaultWorkspaceFilter() {
try (InputStream in = AggregateManagerImpl.class.getClassLoader()
.getResourceAsStream(DEFAULT_WSP_FILTER)) {
if (in == null) {
throw new InternalError("Default filter not in classpath: " + DEFAULT_WSP_FILTER);
}
DefaultWorkspaceFilter filter = new DefaultWorkspaceFilter();
filter.load(in);
return filter;
} catch (ConfigurationException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
} catch (IOException e) {
throw new IllegalArgumentException("Internal error while parsing config.", e);
}
}
public void unmount() throws RepositoryException {
assertMounted();
if (ownSession) {
session.logout();
}
session = null;
}
public AggregateImpl getRoot() throws RepositoryException {
assertMounted();
return root;
}
public RepositoryAddress getMountpoint() {
return mountpoint;
}
/**
* Constructs the artifact manager.
*
* @param config the configuration
* @param wspFilter the workspace filter
* @param mountpoint the repository address of the mountpoint
* @param rootNode the root node
* @param ownSession indicates if the session can be logged out in unmount.
* @throws RepositoryException if an error occurs.
*/
private AggregateManagerImpl(VaultFsConfig config, WorkspaceFilter wspFilter,
RepositoryAddress mountpoint, Node rootNode,
boolean ownSession)
throws RepositoryException {
session = rootNode.getSession();
this.mountpoint = mountpoint;
this.ownSession = ownSession;
this.config = config;
workspaceFilter = wspFilter;
aggregatorProvider = new AggregatorProvider(config.getAggregators());
artifactHandlers = Collections.unmodifiableList(config.getHandlers());
// init root node
Aggregator rootAggregator = rootNode.getDepth() == 0
? new RootAggregator()
: getAggregator(rootNode, null);
root = new AggregateImpl(this, rootNode.getPath(), rootAggregator);
// setup node types
initNodeTypes();
}
public Set<String> getNodeTypes() {
return nodeTypes;
}
/**
* Add the primary and mixin node types of that node to the internal set
* of used node types.
* @param node the node
* @throws RepositoryException if an error occurs
*/
public void addNodeTypes(Node node) throws RepositoryException {
internalAddNodeType(node.getPrimaryNodeType());
for (NodeType nt: node.getMixinNodeTypes()) {
internalAddNodeType(nt);
}
}
public String getNamespaceURI(String prefix) throws RepositoryException {
return session.getNamespaceURI(prefix);
}
public String cacheString(String string) {
String ret = nameCache.get(string);
if (ret == null) {
// create copy to keep retained size minimal
ret = new String(string);
nameCache.put(ret, ret);
}
return ret;
}
public void startTracking(ProgressTrackerListener pTracker) {
tracker = new AggregatorTracker(pTracker);
}
public void stopTracking() {
if (tracker != null) {
tracker.log(true);
tracker = null;
}
}
public void onAggregateCreated() {
if (tracker != null) {
tracker.onCreated();
}
}
public void onAggregateCollected() {
if (tracker != null) {
tracker.onCollected();
}
}
public void onAggregatePrepared() {
if (tracker != null) {
tracker.onPrepared();
}
}
/**
* internally add the node type and all transitive ones to the set of
* used node types.
* @param nodeType to add
*/
private void internalAddNodeType(NodeType nodeType) {
if (nodeType != null && !nodeTypes.contains(nodeType.getName())) {
nodeTypes.add(nodeType.getName());
NodeType[] superTypes = nodeType.getSupertypes();
for (NodeType superType: superTypes) {
nodeTypes.add(superType.getName());
}
NodeDefinition[] nodeDefs = nodeType.getChildNodeDefinitions();
if (nodeDefs != null) {
for (NodeDefinition nodeDef: nodeDefs) {
internalAddNodeType(nodeDef.getDefaultPrimaryType());
NodeType[] reqs = nodeDef.getRequiredPrimaryTypes();
if (reqs != null) {
for (NodeType req: reqs) {
internalAddNodeType(req);
}
}
}
}
// check reference constraints, too (bug #33367)
PropertyDefinition[] propDefs = nodeType.getPropertyDefinitions();
if (propDefs != null) {
for (PropertyDefinition propDef: propDefs) {
if (propDef.getRequiredType() == PropertyType.REFERENCE ||
propDef.getRequiredType() == PropertyType.WEAKREFERENCE) {
String[] vcs = propDef.getValueConstraints();
if (vcs != null) {
for (String vc: vcs) {
try {
internalAddNodeType(session.getWorkspace().getNodeTypeManager().getNodeType(vc));
} catch (RepositoryException e) {
// ignore
}
}
}
}
}
}
}
}
/**
* Initializes vlt node types (might not be the correct location)
* @throws RepositoryException if an error occurs
*/
private void initNodeTypes() throws RepositoryException {
// check if node types are registered
try {
session.getWorkspace().getNodeTypeManager().getNodeType("vlt:HierarchyNode");
session.getWorkspace().getNodeTypeManager().getNodeType("vlt:FullCoverage");
return;
} catch (RepositoryException e) {
// ignore
}
try (InputStream in = getClass().getClassLoader()
.getResourceAsStream(DEFAULT_NODETYPES)) {
NodeTypeInstaller installer = ServiceProviderFactory.getProvider().getDefaultNodeTypeInstaller(session);
CNDReader types = ServiceProviderFactory.getProvider().getCNDReader();
types.read(new InputStreamReader(in, "utf8"), DEFAULT_NODETYPES, null);
installer.install(null, types);
} catch (Exception e) {
throw new RepositoryException("Error while importing nodetypes.", e);
}
}
public Aggregator getAggregator(Node node, String path) throws RepositoryException {
return aggregatorProvider.getAggregator(node, path);
}
public WorkspaceFilter getWorkspaceFilter() {
return workspaceFilter;
}
/**
* Writes the artifact set back to the repository.
*
* @param node the artifact node to write
* @param reposName the name of the new node or {@code null}
* @param artifacts the artifact to write
* @return infos about the modifications
* @throws RepositoryException if an error occurs.
* @throws IOException if an I/O error occurs.
*/
public ImportInfo writeAggregate(AggregateImpl node, String reposName,
ArtifactSetImpl artifacts)
throws RepositoryException, IOException {
assertMounted();
if (reposName == null) {
ImportInfo info;
for (ArtifactHandler artifactHandler : artifactHandlers) {
info = artifactHandler.accept(session, node, artifacts);
if (info != null) {
node.invalidate();
return info;
}
}
} else {
ImportInfo info;
for (ArtifactHandler artifactHandler : artifactHandlers) {
info = artifactHandler.accept(session, node, reposName, artifacts);
if (info != null) {
node.invalidate();
return info;
}
}
}
throw new IllegalStateException("No handler accepted artifacts " + artifacts);
}
/**
* Checks if this tree is still mounted and if the attached session
* is still live.
*
* @throws RepositoryException if not mounted or not live.
*/
private void assertMounted() throws RepositoryException {
if (!isMounted()) {
throw new RepositoryException("JcrFS is not mounted anymore.");
}
}
public boolean isMounted() {
return session != null && session.isLive();
}
public String getUserId() throws RepositoryException {
assertMounted();
return session.getUserID();
}
public String getWorkspace() throws RepositoryException {
assertMounted();
return session.getWorkspace().getName();
}
public Session getSession() {
return session;
}
public void dumpConfig(PrintWriter out) throws IOException {
DumpContext ctx = new DumpContext(out);
ctx.println(false, "workspace filter");
ctx.indent(false);
workspaceFilter.dump(ctx, true);
ctx.outdent();
aggregatorProvider.dump(ctx, false);
ctx.println(true, "handlers");
ctx.indent(true);
for (Iterator<ArtifactHandler> iter = artifactHandlers.iterator(); iter.hasNext();) {
ArtifactHandler h = iter.next();
h.dump(ctx, !iter.hasNext());
}
ctx.outdent();
ctx.flush();
}
public VaultFsConfig getConfig() {
return config;
}
private static class AggregatorTracker {
/**
* default logger
*/
private static final Logger log = LoggerFactory.getLogger(AggregatorTracker.class);
private ProgressTrackerListener tracker;
int numCreated;
int numCollected;
int numPrepared;
long lastLogged;
private AggregatorTracker(ProgressTrackerListener tracker) {
this.tracker = tracker;
}
public void onCreated() {
numCreated++;
log(false);
}
public void onCollected() {
numCollected++;
log(false);
}
public void onPrepared() {
numPrepared++;
log(false);
}
public void log(boolean flush) {
if (tracker == null && !log.isInfoEnabled()) {
return;
}
long now = System.currentTimeMillis();
if (lastLogged == 0) {
lastLogged = now;
// updated each 5 seconds
} else if (now-lastLogged > 5000 || flush) {
lastLogged = now;
String str = new StringBuilder("Aggregation status: ")
.append(numPrepared).append(" of ")
.append(numCreated).append(" prepared, ")
.append(numCollected).append(" collected").toString();
log.trace("- {}", str);
if (tracker != null) {
tracker.onMessage(ProgressTrackerListener.Mode.TEXT, "-", str);
}
}
}
}
}