blob: 57f75d19ccbec57b4321e1e9b25e58f4ecfc5ae4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.resources.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.component.impl.DistributionComponent;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.component.impl.DistributionComponentProvider;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.resources.DistributionResourceTypes;
import org.apache.sling.distribution.resources.impl.common.SimplePathInfo;
/**
* Extended service resource provider exposes children resources like .../agents/agentName/queues/queueName/queueItem
*/
public class ExtendedDistributionServiceResourceProvider extends DistributionServiceResourceProvider {
private static final String QUEUES_PATH = "queues";
private static final String LOG_PATH = "log";
private static final String STATUS_PATH = "status";
private static final int MAX_QUEUE_LENGTH = 5000;
private static final int MAX_QUEUE_CHUNK = 100;
public ExtendedDistributionServiceResourceProvider(String kind,
DistributionComponentProvider componentProvider,
String resourceRoot) {
super(kind, componentProvider, resourceRoot);
}
@Override
protected Map<String, Object> getChildResourceProperties(DistributionComponent<?> component, String childResourceName) {
DistributionComponentKind kind = component.getKind();
if (DistributionComponentKind.AGENT == kind) {
DistributionAgent agent = (DistributionAgent) component.getService();
if (agent != null && childResourceName != null) {
if (childResourceName.startsWith(QUEUES_PATH)) {
SimplePathInfo queuePathInfo = SimplePathInfo.parsePathInfo(QUEUES_PATH, childResourceName);
return getQueueProperties(agent, queuePathInfo);
} else if (childResourceName.startsWith(LOG_PATH)) {
Map<String, Object> result = new HashMap<String, Object>();
result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.LOG_RESOURCE_TYPE);
DistributionLog distributionLog = agent.getLog();
result.put(INTERNAL_ADAPTABLE, distributionLog);
return result;
} else if (childResourceName.startsWith(STATUS_PATH)) {
Map<String, Object> result = new HashMap<String, Object>();
DistributionAgentState agentState = agent.getState();
result.put("state", agentState.name());
return result;
}
}
}
return null;
}
@Override
protected Iterable<String> getChildResourceChildren(DistributionComponent<?> component, String childResourceName) {
DistributionComponentKind kind = component.getKind();
if (DistributionComponentKind.AGENT == kind) {
DistributionAgent agent = (DistributionAgent) component.getService();
if (agent != null) {
if (childResourceName == null) {
List<String> nameList = new ArrayList<String>();
nameList.add(QUEUES_PATH);
nameList.add(LOG_PATH);
nameList.add(STATUS_PATH);
return nameList;
}
}
}
return null;
}
private Map<String, Object> getQueueProperties(DistributionAgent agent, SimplePathInfo queueInfo) {
if (queueInfo.isRoot()) {
Map<String, Object> result = new HashMap<String, Object>();
List<String> nameList = new ArrayList<String>();
for (String name : agent.getQueueNames()) {
nameList.add(name);
}
result.put(ITEMS, nameList.toArray(new String[nameList.size()]));
result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_LIST_RESOURCE_TYPE);
return result;
} else if (queueInfo.isMain()) {
String queueName = queueInfo.getMainResourceName();
Map<String, Object> result = new HashMap<String, Object>();
DistributionQueue queue = agent.getQueue(queueName);
if (queue != null) {
DistributionQueueStatus queueStatus = queue.getStatus();
result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_RESOURCE_TYPE);
result.put("state", queueStatus.getState().name());
result.put("empty", queueStatus.isEmpty());
result.put("itemsCount", queueStatus.getItemsCount());
if (queueName.startsWith(ErrorQueueDispatchingStrategy.ERROR_PREFIX)) {
String retryQueue = queueName.replace(ErrorQueueDispatchingStrategy.ERROR_PREFIX, "");
result.put("retryQueue", retryQueue);
}
List<String> nameList = new ArrayList<String>();
DistributionQueueEntry entry = queue.getHead();
if (entry != null) {
nameList.add(entry.getId());
}
result.put(ITEMS, nameList.toArray(new String[nameList.size()]));
result.put(INTERNAL_ITEMS_ITERATOR, new QueueItemsIterator(queue));
result.put(INTERNAL_ADAPTABLE, queue);
}
return result;
} else if (queueInfo.isChild()) {
String queueName = queueInfo.getMainResourceName();
Map<String, Object> result = new HashMap<String, Object>();
DistributionQueue queue = agent.getQueue(queueName);
if (queue != null) {
String itemId = queueInfo.getChildResourceName();
DistributionQueueEntry entry = queue.getItem(itemId);
result = getItemProperties(entry);
}
return result;
}
return null;
}
private Map<String, Object> getItemProperties(DistributionQueueEntry entry) {
Map<String, Object> result = new HashMap<String, Object>();
if (entry != null) {
result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_ITEM_RESOURCE_TYPE);
DistributionQueueItem item = entry.getItem();
DistributionPackageInfo packageInfo = DistributionPackageUtils.fromQueueItem(item);
result.put("id", entry.getId());
result.put("size", item.getSize());
result.put("paths", packageInfo.getPaths());
result.put("action", packageInfo.getRequestType());
result.put("userid", packageInfo.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class));
DistributionQueueItemStatus status = entry.getStatus();
result.put("attempts", status.getAttempts());
result.put("time", status.getEntered().getTime());
result.put("state", status.getItemState().name());
}
return result;
}
class QueueItemsIterator implements Iterator<Map<String, Object>> {
private final DistributionQueue queue;
private Iterator<DistributionQueueEntry> items;
int fetched = 0;
QueueItemsIterator(DistributionQueue queue) {
this.queue = queue;
}
@Override
public boolean hasNext() {
if (fetched > MAX_QUEUE_LENGTH) {
return false;
}
boolean shouldFetch = items == null || !items.hasNext();
if (shouldFetch) {
items = queue.getItems(fetched, MAX_QUEUE_CHUNK).iterator();
}
return items.hasNext();
}
@Override
public Map<String, Object> next() {
DistributionQueueEntry queueEntry = items.next();
String itemName = queueEntry.getId();
Map<String, Object> itemProperties = getItemProperties(queueEntry);
itemProperties.put(INTERNAL_NAME, itemName);
fetched ++;
return itemProperties;
}
@Override
public void remove() {
items.remove();
}
}
}