blob: 6d4046a757512dc6a034d913101c477ecafd7281 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.chunked;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.jcr.Node;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.Distributor;
import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(property = { JobConsumer.PROPERTY_TOPICS + "=" + ChunkedDistribution.TOPIC })
public class ChunkedDistribution implements JobExecutor {
public static final int DEFAULT_CHUNK_SIZE = 100;
public static final String KEY_PATH = "path";
public static final String KEY_MODE = "mode";
public static final String KEY_CHUNK_SIZE = "chunkSize";
public static final String TOPIC = "sling/whiteboard/distribution/chunked";
private Logger log = LoggerFactory.getLogger(this.getClass());
private Set<String> shallowNodeTypes = new HashSet<>();
Distributor distributor;
ResourceResolverFactory resolverFactory;
@Activate
public ChunkedDistribution(@Reference Distributor distributor, @Reference ResourceResolverFactory resolverFactory) {
this.distributor = distributor;
this.resolverFactory = resolverFactory;
this.shallowNodeTypes.add("sling:Folder");
this.shallowNodeTypes.add("sling:OrderedFolder");
this.shallowNodeTypes.add("cq:Page");
this.shallowNodeTypes.add("cq:Tag");
}
@Override
public JobExecutionResult process(Job job, JobExecutionContext context) {
String path = requireParam(job, KEY_PATH, String.class);
String modeSt = requireParam(job, KEY_MODE, String.class);
Mode mode = Mode.valueOf(modeSt);
Integer chunkSize = requireParam(job, KEY_CHUNK_SIZE, Integer.class);
log.info("Starting chunked tree distribution for path {}", path);
try {
try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
distribute(resolver, path, mode, chunkSize, context);
log.info("Finished chunked tree distribution for path {}", path);
return context.result().succeeded();
}
} catch (Exception e) {
log.warn("Error distributing tree {} with mode {}", path, mode, e);
context.log(e.getMessage());
return context.result().message(e.getMessage()).cancelled();
}
}
private <T> T requireParam(Job job, String key, Class<T> type) {
return Objects.requireNonNull(job.getProperty(key, type), "No " + key + " parameter provided");
}
public void distribute(ResourceResolver resolver, String path, Mode mode, Integer chunkSize, JobExecutionContext context) {
Resource parent = Objects.requireNonNull(resolver.getResource(path), "No resource present at path " + path);
context.log("Getting tree nodes for path=" + path);
List<String> paths = DeepTree.getPaths(parent);
List<List<String>> chunks = getChunks(paths, chunkSize);
context.initProgress(chunks.size(), -1);
int progress = 0;
for (List<String> chunk : chunks) {
context.incrementProgressCount(1);
progress ++;
String firstPath = chunk.iterator().next();
String msg = String.format("Distributing chunk %d/%d starting with %s", progress, chunks.size(), firstPath);
log.info(msg);
context.log(msg);
distributeChunk(resolver, chunk, context);
if (context.isStopped()) {
throw new RuntimeException("Job stopped");
}
}
}
private List<List<String>> getChunks(List<String> paths, Integer chunkSize) {
List<List<String>> chunks = new ArrayList<>();
int c = 0;
while (c < paths.size()) {
int next = Math.min(paths.size(), c + chunkSize);
chunks.add(paths.subList(c, next));
c = next;
}
return chunks;
}
private void distributeChunk(ResourceResolver resolver, List<String> paths, JobExecutionContext context) {
try {
List<String> allPaths = new ArrayList<>();
Set<String> deepPaths = new HashSet<>();
for (String path : paths) {
allPaths.add(path);
Resource res = resolver.getResource(path);
Iterator<Resource> childIt = res.getChildren().iterator();
while (childIt.hasNext()) {
Resource child = childIt.next();
Node node = child.adaptTo(Node.class);
String type = node.getPrimaryNodeType().getName();
if (!shallowNodeTypes.contains(type)) {
String childPath = child.getPath();
allPaths.add(childPath);
deepPaths.add(child.getPath());
}
}
}
String[] pathsAr = allPaths.toArray(new String[] {});
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, pathsAr, deepPaths);
distributor.distribute("publish", resolver, request);
} catch (Exception e) {
String firstPath = paths.iterator().next();
String msg = "Error creating distribution request first path " + firstPath + " msg: " + e.getMessage();
throw new RuntimeException(msg, e);
}
}
}