blob: 0d96219737c9d4c88d2ef50938414f7db9217f3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.agent.impl;
import javax.annotation.Nonnull;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.common.RecoverableDistributionException;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.impl.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageImporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A processor of agent queue entries, each entry's underlying package is fetched and passed to the
* {@link DistributionPackageImporter} for import.
* If item can be delivered it can be removed from the queue, if it cannot be delivered because of a {@link RecoverableDistributionException}
* like a connection issue the item will stay in the queue, for other types of errors the item will be moved to the
* current queue "error queue" if that exists.
*/
class SimpleDistributionAgentQueueProcessor implements DistributionQueueProcessor {
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributionPackageExporter distributionPackageExporter;
private final DistributionPackageImporter distributionPackageImporter;
private final int retryAttempts;
private final DistributionQueueDispatchingStrategy errorQueueStrategy;
private final DefaultDistributionLog distributionLog;
private final DistributionQueueProvider queueProvider;
private final DistributionEventFactory distributionEventFactory;
private final SimpleDistributionAgentAuthenticationInfo authenticationInfo;
private final String agentName;
public SimpleDistributionAgentQueueProcessor(DistributionPackageExporter distributionPackageExporter,
DistributionPackageImporter distributionPackageImporter, int retryAttempts,
DistributionQueueDispatchingStrategy errorQueueStrategy, DefaultDistributionLog log,
DistributionQueueProvider queueProvider, DistributionEventFactory distributionEventFactory,
SimpleDistributionAgentAuthenticationInfo authenticationInfo, String agentName) {
this.distributionPackageExporter = distributionPackageExporter;
this.distributionPackageImporter = distributionPackageImporter;
this.retryAttempts = retryAttempts;
this.errorQueueStrategy = errorQueueStrategy;
this.distributionLog = log;
this.queueProvider = queueProvider;
this.distributionEventFactory = distributionEventFactory;
this.authenticationInfo = authenticationInfo;
this.agentName = agentName;
}
@Override
public boolean process(@Nonnull String queueName, @Nonnull DistributionQueueEntry queueEntry) {
DistributionQueueItem queueItem = queueEntry.getItem();
try {
final long startTime = System.currentTimeMillis();
distributionLog.debug("[{}] ITEM-PROCESS processing item={}", queueName, queueItem);
boolean success = processQueueItem(queueName, queueEntry);
final long endTime = System.currentTimeMillis();
distributionLog.debug("[{}] ITEM-PROCESSED item={}, status={}, processingTime={}ms", queueName, queueItem, success, endTime - startTime);
return success;
} catch (Throwable t) {
distributionLog.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t);
return false;
}
}
private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException {
boolean removeItemFromQueue = false;
ResourceResolver agentResourceResolver = null;
DistributionPackage distributionPackage = null;
DistributionQueueItem queueItem = queueEntry.getItem();
DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
try {
String callingUser = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class);
String requestId = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, String.class);
Long globalStartTime = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, Long.class);
agentResourceResolver = DistributionUtils.getResourceResolver(callingUser, authenticationInfo.getAgentService(),
authenticationInfo.getSlingRepository(), authenticationInfo.getSubServiceName(),
authenticationInfo.getResourceResolverFactory());
final long startTime = System.currentTimeMillis();
distributionPackage = distributionPackageExporter.getPackage(agentResourceResolver, queueItem.getPackageId());
if (distributionPackage != null) {
final long packageSize = distributionPackage.getSize();
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry);
final DistributionRequestType requestType = distributionPackage.getInfo().getRequestType();
final String[] paths = distributionPackage.getInfo().getPaths();
try {
// import package
distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
// generated event
distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo());
removeItemFromQueue = true;
final long endTime = System.currentTimeMillis();
distributionLog.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, importTime={}ms, execTime={}ms, size={}B", queueName, requestId,
requestType, paths,
endTime - startTime, endTime - globalStartTime,
packageSize);
} catch (RecoverableDistributionException e) {
distributionLog.error("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage());
distributionLog.debug("could not deliver package {}", distributionPackage.getId(), e);
log.error("could not deliver package {}", distributionPackage.getId(), e);
} catch (Throwable e) {
distributionLog.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
log.error("could not deliver package {} from queue {}", new Object[]{distributionPackage.getId(), queueName}, e);
if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
removeItemFromQueue = reEnqueuePackage(distributionPackage);
distributionLog.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId());
}
}
} else {
removeItemFromQueue = true; // return success if package does not exist in order to clear the queue.
distributionLog.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getPackageId());
}
} finally {
if (removeItemFromQueue) {
DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
} else {
DistributionPackageUtils.closeSafely(distributionPackage);
}
DistributionUtils.ungetResourceResolver(agentResourceResolver);
}
// return true if item should be removed from queue
return removeItemFromQueue;
}
private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
if (errorQueueStrategy == null) {
return false;
}
try {
errorQueueStrategy.add(distributionPackage, queueProvider);
log.warn("package {} moved to error queue", distributionPackage.getId());
} catch (DistributionException e) {
distributionLog.error("could not reenqueue package {}", distributionPackage.getId(), e);
return false;
}
return true;
}
}