blob: c334d39b1a04da01c41a8aa5f9a06b2ef2c8eb42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.packaging.impl;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.RepositoryException;
import javax.jcr.nodetype.NodeType;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
/**
* Package related utility methods
*/
public class DistributionPackageUtils {
private static final Logger log = LoggerFactory.getLogger(DistributionPackageUtils.class);
private final static String META_START = "DSTRPACKMETA";
private static final Object repolock = new Object();
private static final Object filelock = new Object();
public final static String PROPERTY_REMOTE_PACKAGE_ID = "remote.package.id";
/**
* distribution package origin queue
*/
private static final String PACKAGE_INFO_PROPERTY_ORIGIN_QUEUE = "internal.origin.queue";
/**
* distribution request user
*/
public static final String PACKAGE_INFO_PROPERTY_REQUEST_USER = "internal.request.user";
public static final String PACKAGE_INFO_PROPERTY_REQUEST_ID = "internal.request.id";
public static final String PACKAGE_INFO_PROPERTY_REQUEST_START_TIME = "internal.request.startTime";
/**
* Acquires the package if it's a {@link SharedDistributionPackage}, via {@link SharedDistributionPackage#acquire(String[])}
* @param distributionPackage a distribution package
* @param queueNames the name of the queue in which the package should be acquired
*/
public static void acquire(DistributionPackage distributionPackage, String... queueNames) {
if (distributionPackage instanceof SharedDistributionPackage) {
((SharedDistributionPackage) distributionPackage).acquire(queueNames);
}
}
/**
* Releases the package if it's a {@link SharedDistributionPackage}, via {@link SharedDistributionPackage#release(String[])}
* @param distributionPackage a distribution package
* @param queueNames the name of the queue in which the package should be released
*/
public static void release(DistributionPackage distributionPackage, String... queueNames) {
if (distributionPackage instanceof SharedDistributionPackage) {
((SharedDistributionPackage) distributionPackage).release(queueNames);
}
}
/**
* Releases a distribution package if it's a {@link SharedDistributionPackage}, otherwise deletes it.
* @param distributionPackage a distribution package
* @param queueNames the name of the queue from which it should be eventually released
*/
public static void releaseOrDelete(DistributionPackage distributionPackage, String... queueNames) {
if (distributionPackage == null) {
return;
}
try {
if (distributionPackage instanceof SharedDistributionPackage) {
if (queueNames != null) {
((SharedDistributionPackage) distributionPackage).release(queueNames);
log.debug("package {} released from queue {}", distributionPackage.getId(), queueNames);
} else {
log.error("package {} cannot be released from null queue", distributionPackage.getId());
}
} else {
deleteSafely(distributionPackage);
log.debug("package {} deleted", distributionPackage.getId());
}
} catch (Throwable t) {
log.error("cannot release package {}", t);
}
}
/**
* Delete a distribution package, if deletion fails, ignore it
* @param distributionPackage the package to delete
*/
private static void deleteSafely(DistributionPackage distributionPackage) {
if (distributionPackage != null) {
try {
distributionPackage.delete();
} catch (Throwable t) {
log.error("error deleting package", t);
}
}
}
public static void closeSafely(DistributionPackage distributionPackage) {
if (distributionPackage != null) {
try {
distributionPackage.close();
} catch (Throwable t) {
log.error("error closing package", t);
}
}
}
/**
* Create a queue item out of a package
* @param distributionPackage a distribution package
* @return a distribution queue item
*/
public static DistributionQueueItem toQueueItem(DistributionPackage distributionPackage) {
return new DistributionQueueItem(distributionPackage.getId(), distributionPackage.getSize(), distributionPackage.getInfo());
}
/**
* Create a {@link DistributionPackageInfo} from a queue item
* @param queueItem a distribution queue item
* @return a {@link DistributionPackageInfo}
*/
public static DistributionPackageInfo fromQueueItem(DistributionQueueItem queueItem) {
String type = queueItem.get(DistributionPackageInfo.PROPERTY_PACKAGE_TYPE, String.class);
return new DistributionPackageInfo(type, queueItem);
}
public static String getQueueName(DistributionPackageInfo packageInfo) {
return packageInfo.get(PACKAGE_INFO_PROPERTY_ORIGIN_QUEUE, String.class);
}
public static void mergeQueueEntry(DistributionPackageInfo packageInfo, DistributionQueueEntry entry) {
packageInfo.putAll(entry.getItem());
packageInfo.put(PACKAGE_INFO_PROPERTY_ORIGIN_QUEUE, entry.getStatus().getQueueName());
}
public static void fillInfo(DistributionPackageInfo info, DistributionRequest request) {
info.put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, request.getRequestType());
info.put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, request.getPaths());
info.put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS, getDeepPaths(request));
}
private static String[] getDeepPaths(DistributionRequest request) {
List<String> deepPaths = new ArrayList<String>();
for (String path : request.getPaths()) {
if (request.isDeep(path)) {
deepPaths.add(path);
}
}
return deepPaths.toArray(new String[deepPaths.size()]);
}
public static InputStream createStreamWithHeader(DistributionPackage distributionPackage) throws IOException {
DistributionPackageInfo packageInfo = distributionPackage.getInfo();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Map<String, Object> headerInfo = new HashMap<String, Object>();
headerInfo.put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, packageInfo.getRequestType());
headerInfo.put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, packageInfo.getPaths());
headerInfo.put(PROPERTY_REMOTE_PACKAGE_ID, distributionPackage.getId());
if (packageInfo.containsKey("reference-required")) {
headerInfo.put("reference-required", packageInfo.get("reference-required"));
log.info("setting reference-required to {}", packageInfo.get("reference-required"));
}
writeInfo(outputStream, headerInfo);
InputStream headerStream = new ByteArrayInputStream(outputStream.toByteArray());
InputStream bodyStream = distributionPackage.createInputStream();
return new SequenceInputStream(headerStream, bodyStream);
}
public static void readInfo(InputStream inputStream, Map<String, Object> info) {
try {
int size = META_START.getBytes("UTF-8").length;
inputStream.mark(size);
byte[] buffer = new byte[size];
int bytesRead = inputStream.read(buffer, 0, size);
String s = new String(buffer, "UTF-8");
if (bytesRead > 0 && buffer[0] > 0 && META_START.equals(s)) {
ObjectInputStream stream = getSafeObjectInputStream(inputStream);
@SuppressWarnings("unchecked") // by design
HashMap<String, Object> map = (HashMap<String, Object>) stream.readObject();
info.putAll(map);
} else {
inputStream.reset();
}
} catch (IOException e) {
log.error("Cannot read stream info", e);
} catch (ClassNotFoundException e) {
log.error("Cannot read stream info", e);
}
}
public static void writeInfo(OutputStream outputStream, Map<String, Object> info) {
HashMap<String, Object> map = new HashMap<String, Object>(info);
try {
outputStream.write(META_START.getBytes("UTF-8"));
ObjectOutputStream stream = new ObjectOutputStream(outputStream);
stream.writeObject(map);
} catch (IOException e) {
log.error("Cannot read stream info", e);
}
}
public static Resource getPackagesRoot(ResourceResolver resourceResolver, String packagesRootPath) throws PersistenceException {
Resource packagesRoot = resourceResolver.getResource(packagesRootPath);
if (packagesRoot != null) {
return packagesRoot;
}
synchronized (repolock) {
if (resourceResolver.hasChanges()) {
resourceResolver.refresh();
}
packagesRoot = ResourceUtil.getOrCreateResource(resourceResolver, packagesRootPath, "sling:Folder", "sling:Folder", true);
}
return packagesRoot;
}
public static InputStream getStream(Resource resource) throws RepositoryException {
Node parent = resource.adaptTo(Node.class);
return parent.getProperty("bin/jcr:content/jcr:data").getBinary().getStream();
}
public static void uploadStream(Resource resource, InputStream stream) throws RepositoryException {
Node parent = resource.adaptTo(Node.class);
Node file = JcrUtils.getOrAddNode(parent, "bin", NodeType.NT_FILE);
Node content = JcrUtils.getOrAddNode(file, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
Binary binary = parent.getSession().getValueFactory().createBinary(stream);
content.setProperty(Property.JCR_DATA, binary);
JcrUtils.getOrAddNode(parent, "refs", NodeType.NT_UNSTRUCTURED);
}
public static void acquire(Resource resource, @Nonnull String[] holderNames) throws RepositoryException {
if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or empty");
}
Node parent = resource.adaptTo(Node.class);
Node refs = parent.getNode("refs");
for (String holderName : holderNames) {
if (!refs.hasNode(holderName)) {
refs.addNode(holderName, NodeType.NT_UNSTRUCTURED);
}
}
}
public static boolean disposable(@Nonnull Resource resource) throws RepositoryException {
Node parent = resource.adaptTo(Node.class);
if (parent.hasNode("refs")) {
Node refs = parent.getNode("refs");
return !refs.hasNodes() && refs.hasProperty("released");
} else {
// Packages without refs nodes are likely the result of the concurrency
// issue fixed in SLING-6503. Yet, we consider them non disposable.
log.warn("Package {} has no refs resource. Consider removing it explicitly.", resource.getPath());
return false;
}
}
public static void release(Resource resource, @Nonnull String[] holderNames) throws RepositoryException {
if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or empty");
}
Node parent = resource.adaptTo(Node.class);
Node refs = parent.getNode("refs");
for (String holderName : holderNames) {
Node refNode = refs.getNode(holderName);
if (refNode != null) {
refNode.remove();
}
}
if (!refs.hasProperty("released")) {
refs.setProperty("released", true);
}
}
public static void acquire(File file, @Nonnull String[] holderNames) throws IOException {
if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or empty");
}
synchronized (filelock) {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
HashSet<String> set;
if (file.exists()) {
inputStream = getSafeObjectInputStream(new FileInputStream(file));
@SuppressWarnings("unchecked") // type is known by sedign
HashSet<String> fromStreamSet = (HashSet<String>) inputStream.readObject();
set = fromStreamSet;
} else {
set = new HashSet<String>();
}
set.addAll(Arrays.asList(holderNames));
outputStream = new ObjectOutputStream(new FileOutputStream(file));
outputStream.writeObject(set);
} catch (ClassNotFoundException e) {
log.error("Cannot release file", e);
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
}
}
public static boolean release(File file, @Nonnull String[] holderNames) throws IOException {
if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or empty");
}
synchronized (filelock) {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
HashSet<String> set;
if (file.exists()) {
inputStream = getSafeObjectInputStream(new FileInputStream(file));
@SuppressWarnings("unchecked") //type is known by design
HashSet<String> fromStreamSet = (HashSet<String>) inputStream.readObject();
set = fromStreamSet;
} else {
set = new HashSet<String>();
}
set.removeAll(Arrays.asList(holderNames));
if (set.isEmpty()) {
FileUtils.deleteQuietly(file);
return true;
}
outputStream = new ObjectOutputStream(new FileOutputStream(file));
outputStream.writeObject(set);
}
catch (ClassNotFoundException e) {
log.error("Cannot release file", e);
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
}
return false;
}
private static ObjectInputStream getSafeObjectInputStream(InputStream inputStream) throws IOException {
final Class<?>[] acceptedClasses = new Class<?>[] {
HashMap.class, HashSet.class,
String.class, String[].class,
Long.class,
Number.class,
Boolean.class,
Enum.class,
DistributionRequestType.class
};
return new ObjectInputStream(inputStream) {
@Override
protected Class<?> resolveClass(ObjectStreamClass osc) throws IOException, ClassNotFoundException {
String className = osc.getName();
for (Class<?> clazz : acceptedClasses) {
if (clazz.getName().equals(className)) {
return super.resolveClass(osc);
}
}
throw new InvalidClassException("Class name not accepted: " + className);
}
};
// TODO: replace with the following lines when switching to commons-io-2.5
// return new ValidatingObjectInputStream(inputStream)
// .accept(acceptedClasses);
}
}