blob: e8136c2c651e1c31214f04052f21b45a66d6fbf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.IMPORTED;
import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED;
import static org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status.REMOVED_FAILED;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
* Keeps track of offset and processed status and manages
* coordinates the import/retry handling.
*
* The offset store is identified by the agentName only.
*
* With non clustered publish instances deployment, each
* instance stores the offset in its own node store, thus
* avoiding mix ups. Moreover, when cloning an instance
* from a node store, the cloned instance will implicitly
* recover the offsets and start from the last processed
* offset.
*
* With clustered publish instances deployment, only one
* Subscriber agent must run on the cluster in order to
* avoid mix ups.
*
* The clustered and non clustered publish instances use
* cases can be supported by only running the Subscriber
* agent on the leader instance.
*/
public class BookKeeper implements Closeable {
private static final String SUBSERVICE_IMPORTER = "importer";
private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
private static final int RETRY_SEND_DELAY = 1000;
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ResourceResolverFactory resolverFactory;
private final DistributionMetricsService distributionMetricsService;
private final PackageHandler packageHandler;
private final EventAdmin eventAdmin;
private final Consumer<PackageStatusMessage> sender;
private final boolean editable;
private final int maxRetries;
private final boolean errorQueueEnabled;
private final PackageRetries packageRetries = new PackageRetries();
private final LocalStore statusStore;
private final LocalStore processedOffsets;
private final String subAgentName;
private final String subSlingId;
private GaugeService<Integer> retriesGauge;
public BookKeeper(ResourceResolverFactory resolverFactory,
DistributionMetricsService distributionMetricsService,
PackageHandler packageHandler,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
String subAgentName,
String subSlingId,
boolean editable,
int maxRetries) {
this.packageHandler = packageHandler;
this.eventAdmin = eventAdmin;
String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
this.resolverFactory = resolverFactory;
this.distributionMetricsService = distributionMetricsService;
this.sender = sender;
this.subAgentName = subAgentName;
this.subSlingId = subSlingId;
this.editable = editable;
this.maxRetries = maxRetries;
// Error queues are enabled when the number
// of retry attempts is limited ; disabled otherwise
this.errorQueueEnabled = (maxRetries >= 0);
this.statusStore = new LocalStore(resolverFactory, "statuses", subAgentName);
this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
}
/**
* We aim at processing the packages exactly once. Processing the packages
* exactly once is possible with the following conditions
*
* I. The package importer is configured to disable auto-committing changes.
*
* II. A single commit aggregates three content updates
*
* C1. install the package
* C2. store the processing status
* C3. store the offset processed
*
* Some package importers require auto-saving or issue partial commits before
* failing. For those packages importers, we aim at processing packages at least
* once, thanks to the order in which the content updates are applied.
*/
public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
pkgMsg.getReqType(), offset));
addPackageMDC(pkgMsg);
try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
storeStatus(importerResolver, new PackageStatus(IMPORTED, offset, pkgMsg.getPubAgentName()));
}
storeOffset(importerResolver, offset);
importerResolver.commit();
distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
packageRetries.clear(pkgMsg.getPubAgentName());
Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
eventAdmin.postEvent(event);
} catch (LoginException | IOException | RuntimeException e) {
failure(pkgMsg, offset, e);
} finally {
MDC.clear();
}
}
private void addPackageMDC(PackageMessage pkgMsg) {
MDC.put("module", "distribution");
MDC.put("package-id", pkgMsg.getPkgId());
String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
MDC.put("paths", paths);
MDC.put("pub-sling-id", pkgMsg.getPubSlingId());
String pubAgentName = pkgMsg.getPubAgentName();
MDC.put("pub-agent-name", pubAgentName);
MDC.put("distribution-message-type", pkgMsg.getReqType().name());
MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
MDC.put("sub-sling-id", subSlingId);
MDC.put("sub-agent-name", subAgentName);
}
/**
* Should be called on a exception while importing a package.
*
* When we use an error queue and the max retries is reached the package is removed.
* In all other cases a DistributionException is thrown that signals that we should retry the
* package.
*
* @param pkgMsg
* @param offset
* @param e
* @throws DistributionException if the package should be retried
*/
private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
distributionMetricsService.getFailedPackageImports().mark();
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
if (errorQueueEnabled && retries >= maxRetries) {
log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
removeFailedPackage(pkgMsg, offset);
} else {
packageRetries.increase(pubAgentName);
String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
throw new DistributionException(msg, e);
}
}
public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
}
storeOffset(resolver, offset);
resolver.commit();
}
packageRetries.clear(pkgMsg.getPubAgentName());
context.stop();
}
public void sendStoredStatus() throws InterruptedException, IOException {
try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
PackageStatus status = new PackageStatus(statusStore.load());
boolean sent = status.sent;
for (int retry = 0 ; !sent ; retry++) {
try {
sendStatusMessage(status, retry);
markStatusSent();
sent = true;
} catch (Exception e) {
log.warn("Cannot send status (retry {})", retry, e);
Thread.sleep(RETRY_SEND_DELAY);
}
}
}
}
private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
.setSubSlingId(subSlingId)
.setSubAgentName(subAgentName)
.setPubAgentName(status.pubAgentName)
.setOffset(status.offset)
.setStatus(status.status)
.build();
sender.accept(pkgStatMsg);
log.info("Sent status message {}", pkgStatMsg);
}
public void markStatusSent() {
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
statusStore.store(resolver, "sent", true);
resolver.commit();
} catch (Exception e) {
log.warn("Failed to mark status as sent", e);
}
}
public long loadOffset() {
return processedOffsets.load("offset", -1L);
}
public int getRetries(String pubAgentName) {
return packageRetries.get(pubAgentName);
}
public PackageRetries getPackageRetries() {
return packageRetries;
}
@Override
public void close() throws IOException {
IOUtils.closeQuietly(retriesGauge);
}
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
storeOffset(resolver, offset);
resolver.commit();
} catch (Exception e) {
throw new DistributionException("Error removing failed package", e);
}
context.stop();
}
private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
Map<String, Object> statusMap = packageStatus.asMap();
statusStore.store(resolver, statusMap);
log.info("Stored status {}", statusMap);
}
private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
processedOffsets.store(resolver, "offset", offset);
}
private ResourceResolver getServiceResolver(String subService) throws LoginException {
return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, SUBSERVICE_BOOKKEEPER));
}
class PackageStatus {
final Status status;
final Long offset;
final String pubAgentName;
final Boolean sent;
PackageStatus(Status status, long offset, String pubAgentName) {
this.status = status;
this.offset = offset;
this.pubAgentName = pubAgentName;
this.sent = false;
}
PackageStatus(ValueMap statusMap) {
Integer statusNum = statusMap.get("statusNumber", Integer.class);
this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
this.offset = statusMap.get("offset", Long.class);
this.pubAgentName = statusMap.get("pubAgentName", String.class);
this.sent = statusMap.get("sent", true);
}
Map<String, Object> asMap() {
Map<String, Object> s = new HashMap<>();
s.put("pubAgentName", pubAgentName);
s.put("statusNumber", status.getNumber());
s.put("offset", offset);
s.put("sent", sent);
return s;
}
}
}