blob: 0ab1e729c551fc1b5c1e87d4913715e2ebcfe071 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.journal.impl.publisher;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.nodetype.NodeType;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.jackrabbit.api.ReferenceBinary;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
/**
* Manages the binary content of DistributionPackages. If they are too big to fit in a journal message then they
* are written to the blob store. It also offers cleanup functionality to remove the data when it is not needed anymore.
*/
@Component(service = PackageRepo.class)
@ParametersAreNonnullByDefault
public class PackageRepo {
@Reference
private ResourceResolverFactory resolverFactory;
@Reference
private Topics topics;
@Reference
private MessagingProvider messagingProvider;
@Reference
private ServiceUserMapped mapped;
@Reference
private DistributionMetricsService distributionMetricsService;
private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class);
static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packages";
private static final String PACKAGE_PATH_PATTERN = PACKAGES_ROOT_PATH + "/%s/data/%s"; // packageType x packageId
@Nonnull
public String store(ResourceResolver resolver, DistributionPackage disPkg)
throws DistributionException {
try {
String pkgPath = String.format(PACKAGE_PATH_PATTERN, disPkg.getType(), disPkg.getId());
Resource pkgResource = ResourceUtil.getOrCreateResource(resolver,
pkgPath, "sling:Folder", "sling:Folder", false);
Node pkgNode = pkgResource.adaptTo(Node.class);
Node binNode = JcrUtils.getOrAddNode(pkgNode, "bin", NodeType.NT_FILE);
Node cntNode = JcrUtils.getOrAddNode(binNode, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
Binary binary = pkgNode.getSession().getValueFactory().createBinary(disPkg.createInputStream());
cntNode.setProperty(Property.JCR_DATA, binary);
resolver.commit();
String blobRef = ((ReferenceBinary) binary).getReference();
LOG.info(String.format("Stored content package %s under path %s with blobRef %s",
disPkg.getId(), pkgPath, blobRef));
return blobRef;
} catch (Exception e) {
throw new DistributionException(e.getMessage(), e);
}
}
/**
* The cleanup algorithm is based on the head and tail offsets
* fetched from the package topic.
*
* The implementation is robust and fits any topic retention policy setting,
* without requiring any explicit configuration matching the
* actual retention policy. It is also robust against potential
* instance clock misconfiguration/synchronisation issues.
*
* This comes at the expense of saving offsets in the repository.
* It should be an acceptable cost, given that the vast majority
* of packages are not saved in the repository on the first place
* and thus don't need cleanup.
*
* The cleanup does this:
*
* * The packages with an offset smaller than the head
* offset can be removed because they are no longer referenced
* by the package topic.
*
* * The packages with no offset are set the tail offset. This relies
* on the session being shielded from new packages that might be created
* in parallel.
*/
public void cleanup() {
Timer.Context context = distributionMetricsService.getCleanupPackageDuration().time();
// Auto-refresh policy is disabled for service resource resolver
try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"))) {
long headOffset = messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.earliest);
long tailOffset = messagingProvider.retrieveOffset(topics.getPackageTopic(), Reset.latest);
cleanup(resolver, headOffset, tailOffset);
} catch (LoginException | PersistenceException e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
context.stop();
}
}
private void cleanup(ResourceResolver resolver, long headOffset, long tailOffset)
throws PersistenceException {
LOG.info(String.format("Cleanup headOffset %s tailOffset %s", headOffset, tailOffset));
Resource root = getRoot(resolver);
int removedCount = 0;
for (Resource type : root.getChildren()) {
Resource data = type.getChild("data");
if (data != null) {
for (Resource pkg : data.getChildren()) {
removedCount += cleanNode(resolver, headOffset, tailOffset, pkg);
}
}
}
if (resolver.hasChanges()) {
resolver.commit();
distributionMetricsService.getCleanupPackageRemovedCount().increment(removedCount);
}
}
private int cleanNode(ResourceResolver resolver, long headOffset, long tailOffset, Resource pkg)
throws PersistenceException {
long offset = pkg.getValueMap().get("offset", -1);
if (offset < 0) {
LOG.info(String.format("keep package %s, setting tail offset %s", pkg.getName(), tailOffset));
pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset);
} else if (offset < headOffset) {
LOG.info(String.format("remove package %s, offset smaller than head offset %s < %s", pkg.getName(), offset, headOffset));
resolver.delete(pkg);
return 1;
} else {
LOG.debug(String.format("keep package %s, offset bigger or equal to head offset %s >= %s", pkg.getName(), offset, headOffset));
}
return 0;
}
@Nonnull
private Resource getRoot(ResourceResolver resolver)
throws PersistenceException {
return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, "sling:Folder", "sling:Folder", true);
}
}