| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.distribution.agent.impl; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.sling.distribution.DistributionRequestState; |
| import org.apache.sling.distribution.DistributionResponse; |
| import org.apache.sling.distribution.common.DistributionException; |
| import org.apache.sling.distribution.component.impl.DistributionComponentKind; |
| import org.apache.sling.distribution.event.DistributionEventTopics; |
| import org.apache.sling.distribution.event.impl.DistributionEventFactory; |
| import org.apache.sling.distribution.impl.SimpleDistributionResponse; |
| import org.apache.sling.distribution.log.impl.DefaultDistributionLog; |
| import org.apache.sling.distribution.packaging.DistributionPackage; |
| import org.apache.sling.distribution.packaging.impl.DistributionPackageProcessor; |
| import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils; |
| import org.apache.sling.distribution.queue.DistributionQueueItemState; |
| import org.apache.sling.distribution.queue.DistributionQueueItemStatus; |
| import org.apache.sling.distribution.queue.impl.DistributionQueueProvider; |
| import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; |
| |
| /** |
| * The package exporter callback function is responsible to process the exported packages. |
| * The exported packages are scheduled for import by passing them to a {@link DistributionQueueDispatchingStrategy}. |
| */ |
| class QueueingDistributionPackageProcessor implements DistributionPackageProcessor { |
| |
| // request info |
| private final String callingUser; |
| private final String requestId; |
| private final long requestStartTime; |
| private final String agentName; |
| |
| // stats |
| private final AtomicInteger packagesCount = new AtomicInteger(); |
| private final AtomicLong packagesSize = new AtomicLong(); |
| private final List<DistributionResponse> allResponses = new LinkedList<DistributionResponse>(); |
| |
| // required components |
| private final DistributionEventFactory distributionEventFactory; |
| private final DistributionQueueDispatchingStrategy scheduleQueueStrategy; |
| private final DistributionQueueProvider queueProvider; |
| private final DefaultDistributionLog log; |
| |
| public List<DistributionResponse> getAllResponses() { |
| return allResponses; |
| } |
| |
| public int getPackagesCount() { |
| return packagesCount.get(); |
| } |
| |
| public long getPackagesSize() { |
| return packagesSize.get(); |
| } |
| |
| QueueingDistributionPackageProcessor(@Nullable String callingUser, @Nonnull String requestId, long requestStartTime, |
| @Nonnull DistributionEventFactory distributionEventFactory, |
| @Nonnull DistributionQueueDispatchingStrategy scheduleQueueStrategy, |
| @Nonnull DistributionQueueProvider queueProvider, @Nonnull DefaultDistributionLog log, |
| @Nonnull String agentName) { |
| this.callingUser = callingUser; |
| this.requestId = requestId; |
| this.requestStartTime = requestStartTime; |
| this.distributionEventFactory = distributionEventFactory; |
| this.scheduleQueueStrategy = scheduleQueueStrategy; |
| this.queueProvider = queueProvider; |
| this.log = log; |
| this.agentName = agentName; |
| } |
| |
| @Override |
| public void process(DistributionPackage distributionPackage) { |
| final long startTime = System.currentTimeMillis(); |
| |
| Collection<SimpleDistributionResponse> responses = scheduleImportPackage(distributionPackage, callingUser, |
| requestId, requestStartTime); |
| packagesCount.incrementAndGet(); |
| packagesSize.addAndGet(distributionPackage.getSize()); |
| allResponses.addAll(responses); |
| |
| final long endTime = System.currentTimeMillis(); |
| |
| log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, queueTime={}ms, responses={}", requestId, distributionPackage.getId(), |
| distributionPackage.getInfo().getPaths(), endTime - startTime, responses.size()); |
| } |
| |
| private Collection<SimpleDistributionResponse> scheduleImportPackage(DistributionPackage distributionPackage, String callingUser, |
| String requestId, long startTime) { |
| Collection<SimpleDistributionResponse> distributionResponses = new LinkedList<SimpleDistributionResponse>(); |
| |
| // dispatch the distribution package to one or more queues |
| try { |
| // add metadata to the package |
| distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, callingUser); |
| distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, requestId); |
| distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, startTime); |
| |
| // put the package in the queue |
| Iterable<DistributionQueueItemStatus> states = scheduleQueueStrategy.add(distributionPackage, queueProvider); |
| for (DistributionQueueItemStatus state : states) { |
| DistributionRequestState requestState = getRequestStateFromQueueState(state.getItemState()); |
| distributionResponses.add(new SimpleDistributionResponse(requestState, state.getItemState().toString())); |
| } |
| |
| distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED, |
| DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo()); |
| } catch (DistributionException e) { |
| log.error("an error happened during dispatching items to the queue(s)", e); |
| distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString())); |
| } |
| |
| return distributionResponses; |
| } |
| |
| /* Convert the state of a certain item in the queue into a request state */ |
| private DistributionRequestState getRequestStateFromQueueState(DistributionQueueItemState itemState) { |
| DistributionRequestState requestState; |
| switch (itemState) { |
| case QUEUED: |
| requestState = DistributionRequestState.ACCEPTED; |
| break; |
| case ERROR: |
| requestState = DistributionRequestState.DROPPED; |
| break; |
| default: |
| requestState = DistributionRequestState.DROPPED; |
| break; |
| } |
| return requestState; |
| } |
| } |