blob: 99c7b0e416edf546d9ec164a3aec330a08adbfa4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl.jobs.tasks;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Upgrade task
*
* Upgrade jobs from earlier versions to the new format.
*/
public class UpgradeTask {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** Job manager configuration. */
private final JobManagerConfiguration configuration;
/** The capabilities. */
private final TopologyCapabilities caps;
/**
* Constructor
* @param config the configuration
*/
public UpgradeTask(final JobManagerConfiguration config) {
this.configuration = config;
this.caps = this.configuration.getTopologyCapabilities();
}
/**
* Upgrade
*/
public void run() {
if ( caps.isLeader() ) {
this.processJobsFromPreviousVersions();
}
this.upgradeBridgedJobs();
}
/**
* Upgrade bridged jobs.
* In previous versions, bridged jobs were stored under a special topic.
* This has changed, the jobs are now stored with their real topic.
*/
private void upgradeBridgedJobs() {
final String path = configuration.getLocalJobsPath() + "/slingevent:eventadmin";
final ResourceResolver resolver = configuration.createResourceResolver();
if ( resolver != null ) {
try {
final Resource rootResource = resolver.getResource(path);
if ( rootResource != null ) {
upgradeBridgedJobs(rootResource);
}
if ( caps.isLeader() ) {
final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + "/slingevent:eventadmin");
if ( unassignedRoot != null ) {
upgradeBridgedJobs(unassignedRoot);
}
}
} finally {
resolver.close();
}
}
}
/**
* Upgrade bridged jobs
* @param rootResource The root resource (topic resource)
*/
private void upgradeBridgedJobs(final Resource topicResource) {
final String topicName = topicResource.getName().replace('.', '/');
final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
if ( qcm == null ) {
return;
}
final QueueInfo info = qcm.getQueueInfo(topicName);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
final String targetId = caps.detectTarget(topicName, vm, info);
final Map<String, Object> props = new HashMap<String, Object>(vm);
final String newPath;
if ( targetId != null ) {
newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
} else {
newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
}
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
ResourceHelper.getOrCreateResource(topicResource.getResourceResolver(), newPath, props);
topicResource.getResourceResolver().delete(rsrc);
topicResource.getResourceResolver().commit();
} catch ( final PersistenceException pe ) {
logger.warn("Unable to move job from previous version " + rsrc.getPath(), pe);
topicResource.getResourceResolver().refresh();
topicResource.getResourceResolver().revert();
}
} catch (final InstantiationException ie) {
logger.warn("Unable to move job from previous version " + rsrc.getPath(), ie);
topicResource.getResourceResolver().refresh();
topicResource.getResourceResolver().revert();
}
return caps.isActive();
}
});
}
/**
* Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
*/
private void processJobsFromPreviousVersions() {
final ResourceResolver resolver = configuration.createResourceResolver();
if ( resolver != null ) {
try {
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
} catch ( final PersistenceException pe ) {
this.logger.warn("Problems moving jobs from previous version.", pe);
} finally {
resolver.close();
}
}
}
/**
* Recursively find jobs and move them
*/
private void processJobsFromPreviousVersions(final Resource rsrc) throws PersistenceException {
if ( rsrc != null && caps.isActive() ) {
if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
this.moveJobFromPreviousVersion(rsrc);
} else {
for(final Resource child : rsrc.getChildren()) {
this.processJobsFromPreviousVersions(child);
}
if ( caps.isActive() ) {
rsrc.getResourceResolver().delete(rsrc);
rsrc.getResourceResolver().commit();
rsrc.getResourceResolver().refresh();
}
}
}
}
/**
* Move a single job
*/
private void moveJobFromPreviousVersion(final Resource jobResource)
throws PersistenceException {
final ResourceResolver resolver = jobResource.getResourceResolver();
try {
final ValueMap vm = ResourceHelper.getValueMap(jobResource);
// check for binary properties
Map<String, Object> binaryProperties = new HashMap<String, Object>();
final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class);
if ( ois != null ) {
try {
int length = ois.readInt();
for(int i=0;i<length;i++) {
final String key = (String)ois.readObject();
final Object value = ois.readObject();
binaryProperties.put(key, value);
}
} catch (final ClassNotFoundException cnfe) {
throw new PersistenceException("Class not found.", cnfe);
} catch (final java.io.InvalidClassException ice) {
throw new PersistenceException("Invalid class.", ice);
} catch (final IOException ioe) {
throw new PersistenceException("Unable to deserialize job properties.", ioe);
} finally {
try {
ois.close();
} catch (final IOException ioe) {
throw new PersistenceException("Unable to deserialize job properties.", ioe);
}
}
}
final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
final String topic = (String)properties.remove("slingevent:topic");
properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
// and binary properties
properties.putAll(binaryProperties);
properties.remove("slingevent:properties");
if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
}
if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
}
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topic);
String targetId = null;
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
if ( qcm == null ) {
resolver.revert();
return;
}
final QueueInfo info = qcm.getQueueInfo(topic);
logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
targetId = caps.detectTarget(topic, vm, info);
if ( targetId != null ) {
properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
}
}
properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
final String jobId = configuration.getUniqueId(topic);
properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
final String newPath = configuration.getUniquePath(targetId, topic, jobId, vm);
this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
ResourceHelper.getOrCreateResource(resolver, newPath, properties);
resolver.delete(jobResource);
resolver.commit();
} catch (final InstantiationException ie) {
throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause());
}
}
}