blob: 257286f515a8e4b88617c1def3550c7944c25ba5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.agent.impl;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.impl.CompositeDistributionResponse;
import org.apache.sling.distribution.impl.SimpleDistributionResponse;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.impl.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageImporter;
import org.apache.sling.distribution.packaging.impl.DistributionPackageProcessor;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueState;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
import org.apache.sling.distribution.trigger.DistributionTrigger;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.apache.sling.jcr.api.SlingRepository;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic implementation of a {@link DistributionAgent}
*/
public class SimpleDistributionAgent implements DistributionAgent {
private final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";
private final DistributionQueueProvider queueProvider;
private final DistributionPackageImporter distributionPackageImporter;
private final DistributionPackageExporter distributionPackageExporter;
private final DistributionQueueDispatchingStrategy scheduleQueueStrategy;
private final DistributionQueueDispatchingStrategy errorQueueStrategy;
private final DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy;
private final DefaultDistributionLog log;
private final DistributionEventFactory distributionEventFactory;
private final DistributionQueueProcessor queueProcessor;
private TriggerAgentRequestHandler agentBasedRequestHandler;
private final String name;
private final boolean queueProcessingEnabled;
private final DistributionRequestType[] allowedRequests;
private boolean active = false;
private final Set<String> processingQueues;
private final String[] allowedRoots;
private final AtomicInteger nextRequestId = new AtomicInteger();
private final SimpleDistributionAgentAuthenticationInfo agentAuthenticationInfo;
public SimpleDistributionAgent(String name,
boolean queueProcessingEnabled,
Set<String> processingQueues,
String subServiceName,
DistributionPackageImporter distributionPackageImporter,
DistributionPackageExporter distributionPackageExporter,
DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy,
DistributionQueueProvider queueProvider,
DistributionQueueDispatchingStrategy scheduleQueueStrategy,
DistributionQueueDispatchingStrategy errorQueueStrategy,
DistributionEventFactory distributionEventFactory,
ResourceResolverFactory resourceResolverFactory,
SlingRepository slingRepository,
DefaultDistributionLog log,
DistributionRequestType[] allowedRequests,
String[] allowedRoots,
int retryAttempts) {
this.log = log;
this.allowedRequests = allowedRequests;
this.processingQueues = processingQueues;
validateConfiguration(name, queueProcessingEnabled, subServiceName, distributionPackageImporter, distributionPackageExporter, distributionRequestAuthorizationStrategy, queueProvider, scheduleQueueStrategy, distributionEventFactory, resourceResolverFactory);
this.allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots);
this.distributionRequestAuthorizationStrategy = distributionRequestAuthorizationStrategy;
this.name = SettingsUtils.removeEmptyEntry(name);
this.queueProcessingEnabled = queueProcessingEnabled;
this.distributionPackageImporter = distributionPackageImporter;
this.distributionPackageExporter = distributionPackageExporter;
this.queueProvider = queueProvider;
this.scheduleQueueStrategy = scheduleQueueStrategy;
this.errorQueueStrategy = errorQueueStrategy;
this.distributionEventFactory = distributionEventFactory;
this.agentAuthenticationInfo = new SimpleDistributionAgentAuthenticationInfo(slingRepository, DEFAULT_AGENT_SERVICE, resourceResolverFactory, subServiceName);
this.queueProcessor = new SimpleDistributionAgentQueueProcessor(distributionPackageExporter, distributionPackageImporter,
retryAttempts, errorQueueStrategy, log, queueProvider, distributionEventFactory, agentAuthenticationInfo, name);
}
private void validateConfiguration(String name, boolean queueProcessingEnabled, String subServiceName, DistributionPackageImporter distributionPackageImporter, DistributionPackageExporter distributionPackageExporter, DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy, DistributionQueueProvider queueProvider, DistributionQueueDispatchingStrategy scheduleQueueStrategy, DistributionEventFactory distributionEventFactory, ResourceResolverFactory resourceResolverFactory) {
// check configuration is valid
if (name == null
|| (queueProcessingEnabled && distributionPackageImporter == null)
|| distributionPackageExporter == null
|| distributionRequestAuthorizationStrategy == null
|| queueProvider == null
|| scheduleQueueStrategy == null
|| distributionEventFactory == null
|| resourceResolverFactory == null) {
String errorMessage = Arrays.toString(new Object[]{name,
distributionPackageImporter,
distributionPackageExporter,
subServiceName,
distributionRequestAuthorizationStrategy,
queueProvider,
scheduleQueueStrategy,
distributionEventFactory,
resourceResolverFactory});
throw new IllegalArgumentException("all arguments are required: " + errorMessage);
}
}
@Nonnull
public DistributionResponse execute(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionRequest distributionRequest)
throws DistributionException {
ResourceResolver agentResourceResolver = null;
final String requestId = "DSTRQ" + nextRequestId.incrementAndGet();
String callingUser = resourceResolver.getUserID();
try {
// check if the request type can be executed by this agent
if (!isAcceptedRequestType(distributionRequest)) {
log.debug("request type not accepted {}", distributionRequest.getRequestType());
return new SimpleDistributionResponse(DistributionRequestState.DROPPED, "Request type not accepted");
}
// check if the request paths can be distributed via this agent
if (!isAcceptedRequestRoot(distributionRequest)) {
log.debug("request paths not accepted {}", Arrays.toString(distributionRequest.getPaths()));
return new SimpleDistributionResponse(DistributionRequestState.DROPPED, "Request paths not accepted");
}
boolean silent = DistributionRequestType.PULL.equals(distributionRequest.getRequestType());
log.info(silent, "REQUEST-START {}: {} paths={}, user={}", requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser);
// check permissions
distributionRequestAuthorizationStrategy.checkPermission(resourceResolver, distributionRequest);
agentResourceResolver = DistributionUtils.getResourceResolver(callingUser, agentAuthenticationInfo.getAgentService(),
agentAuthenticationInfo.getSlingRepository(), agentAuthenticationInfo.getSubServiceName(),
agentAuthenticationInfo.getResourceResolverFactory());
// export packages
CompositeDistributionResponse distributionResponse = exportPackages(agentResourceResolver, distributionRequest, callingUser, requestId);
log.debug("REQUEST-STARTED {}: {} paths={}, success={}, state={}, exportTime={}ms, noPackages={}, size={}B, noQueues={}", requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(),
distributionResponse.isSuccessful(), distributionResponse.getState(),
distributionResponse.getExportTime(),
distributionResponse.getPackagesCount(), distributionResponse.getPackagseSize(),
distributionResponse.getQueuesCount());
return distributionResponse;
} catch (DistributionException e) {
log.error("REQUEST-FAIL {}: {} paths={}, user={}, message={}", requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser, e.getMessage());
throw e;
} finally {
DistributionUtils.ungetResourceResolver(agentResourceResolver);
}
}
private boolean isPassive() {
return !queueProcessingEnabled;
}
private CompositeDistributionResponse exportPackages(ResourceResolver agentResourceResolver, DistributionRequest distributionRequest, String callingUser, String requestId) throws DistributionException {
final long startTime = System.currentTimeMillis();
// callback function
DistributionPackageProcessor packageProcessor;
if (DistributionRequestType.TEST.equals(distributionRequest.getRequestType())) {
// test packages do not get passed to the queues and get imported immediately
packageProcessor = new ImportingDistributionPackageProcessor(distributionPackageImporter, agentAuthenticationInfo,
callingUser, requestId, log);
} else {
packageProcessor = new QueueingDistributionPackageProcessor(callingUser, requestId, startTime,
distributionEventFactory, scheduleQueueStrategy, queueProvider, log, name);
}
// export packages
distributionPackageExporter.exportPackages(agentResourceResolver, distributionRequest, packageProcessor);
final long endTime = System.currentTimeMillis();
generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_CREATED);
// create response
List<DistributionResponse> distributionResponses = packageProcessor.getAllResponses();
int packagesCount = packageProcessor.getPackagesCount();
long packagesSize = packageProcessor.getPackagesSize();
return new CompositeDistributionResponse(distributionResponses, packagesCount, packagesSize, endTime - startTime);
}
@Nonnull
public Set<String> getQueueNames() {
Set<String> queueNames = new TreeSet<String>();
queueNames.addAll(scheduleQueueStrategy.getQueueNames());
if (errorQueueStrategy != null) {
queueNames.addAll(errorQueueStrategy.getQueueNames());
}
return queueNames;
}
public DistributionQueue getQueue(@Nonnull final String queueName) {
Set<String> queues = getQueueNames();
if (!queues.contains(queueName)) {
return null;
}
DistributionQueue queue = null;
try {
queue = queueProvider.getQueue(queueName);
} catch (DistributionException e) {
log.error("cannot get queue", e);
}
if (queue != null) {
boolean isPausedQueue = !queueProcessingEnabled && (processingQueues != null && processingQueues.contains(queueName));
queue = new SimpleAgentDistributionQueue(queue, isPausedQueue, name);
}
return queue;
}
@Nonnull
public DistributionLog getLog() {
return log;
}
@Nonnull
public DistributionAgentState getState() {
DistributionAgentState agentState = DistributionAgentState.IDLE;
// if it is passive and it is not a queueing agent
if (isPassive() && distributionPackageImporter != null) {
return DistributionAgentState.PAUSED;
}
for (String queueName : getQueueNames()) {
DistributionQueue queue = getQueue(queueName);
DistributionQueueState state = queue.getStatus().getState();
if (DistributionQueueState.BLOCKED == state) {
return DistributionAgentState.BLOCKED;
}
if (DistributionQueueState.RUNNING == state) {
agentState = DistributionAgentState.RUNNING;
}
}
return agentState;
}
public void enable() {
log.info("enabling agent");
active = true;
// register triggers if any
agentBasedRequestHandler = new TriggerAgentRequestHandler(this, name, agentAuthenticationInfo, log, active);
if (!isPassive()) {
try {
queueProvider.enableQueueProcessing(queueProcessor, processingQueues.toArray(new String[processingQueues.size()]));
} catch (DistributionException e) {
log.error("cannot enable queue processing", e);
}
}
}
public void enableTrigger(DistributionTrigger trigger) {
if (!active || agentBasedRequestHandler == null) {
return;
}
try {
log.info("enabling trigger {}", trigger);
trigger.register(agentBasedRequestHandler);
} catch (DistributionException e) {
log.error("could not register handler from trigger {} {}", trigger, e);
}
}
public void disableTrigger(DistributionTrigger trigger) {
if (!active || agentBasedRequestHandler == null) {
return;
}
try {
log.info("disabling trigger {}", trigger);
trigger.unregister(agentBasedRequestHandler);
} catch (DistributionException e) {
log.error("could not unregister handler from trigger {} {}", trigger, e);
}
}
public void disable() {
log.info("disabling agent");
active = false;
agentBasedRequestHandler = null;
if (!isPassive()) {
try {
queueProvider.disableQueueProcessing();
} catch (DistributionException e) {
log.error("cannot disable queue processing", e);
}
}
}
private void generatePackageEvent(String topic, DistributionPackage... distributionPackages) {
for (DistributionPackage distributionPackage : distributionPackages) {
distributionEventFactory.generatePackageEvent(topic, DistributionComponentKind.AGENT, name, distributionPackage.getInfo());
}
}
/**
* check whether the given request type can be handled by the current agent
*
* @param request a distribution request
* @return {@code true} if request can be executed, {@code false} otherwise.
*/
private boolean isAcceptedRequestType(DistributionRequest request) {
if (allowedRequests == null) {
return true;
}
if (DistributionRequestType.TEST.equals(request.getRequestType())) {
return true;
}
for (DistributionRequestType requestType : allowedRequests) {
if (requestType.equals(request.getRequestType())) {
return true;
}
}
return false;
}
/**
* check whether a certain request paths can be distributed via this agent
*
* @param request a distribution request
* @return {@code true} if request can be executed, {@code false} otherwise.
*/
private boolean isAcceptedRequestRoot(DistributionRequest request) {
if (allowedRoots == null || allowedRoots.length == 0) {
return true;
}
if (!DistributionRequestType.ADD.equals(request.getRequestType()) && !DistributionRequestType.DELETE.equals(request.getRequestType())) {
return true;
}
for (String path : request.getPaths()) {
boolean allowed = false;
for (String allowedRoot : allowedRoots) {
if (allowedRoot != null && allowedRoot.trim().length() != 0
&& path.startsWith(allowedRoot)) {
allowed = true;
}
}
if (!allowed) {
return false;
}
}
return true;
}
}