blob: 301b4f1b089e5b05eebbae1d312cace4324871f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.stanbol.enhancer.jobmanager.event.impl;
import static org.apache.stanbol.enhancer.jobmanager.event.Constants.TOPIC_JOB_MANAGER;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.apache.clerezza.commons.rdf.ImmutableGraph;
import org.apache.clerezza.commons.rdf.Triple;
import org.apache.clerezza.rdf.core.serializedform.Serializer;
import org.apache.clerezza.rdf.core.serializedform.SupportedFormat;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler.EnhancementJobObserver;
import org.apache.stanbol.enhancer.servicesapi.Chain;
import org.apache.stanbol.enhancer.servicesapi.ChainException;
import org.apache.stanbol.enhancer.servicesapi.ChainManager;
import org.apache.stanbol.enhancer.servicesapi.ContentItem;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
import org.apache.stanbol.enhancer.servicesapi.EnhancementException;
import org.apache.stanbol.enhancer.servicesapi.EnhancementJobManager;
import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate=true,metatype=true)
@Service
@Properties(value={
//register with a ranking lower than 0 to allow easy overriding by specific
@Property(name=Constants.SERVICE_RANKING,intValue=EventJobManagerImpl.DEFAULT_SERVICE_RANKING),
@Property(name=EventJobManagerImpl.MAX_ENHANCEMENT_JOB_WAIT_TIME,intValue=EventJobManagerImpl.DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME)
})
public class EventJobManagerImpl implements EnhancementJobManager {
private final Logger log = LoggerFactory.getLogger(EventJobManagerImpl.class);
/**
* Logger for the {@link EnhancementJobManager} interface. This is used
* to log statistics about execution times for enhancement jobs
*/
private final Logger enhancementJobManagerLog = LoggerFactory.getLogger(EnhancementJobManager.class);
public static final int DEFAULT_SERVICE_RANKING = 0;
public static final String MAX_ENHANCEMENT_JOB_WAIT_TIME = "stanbol.maxEnhancementJobWaitTime";
/**
* default max wait time is 60sec (similar to the http timeout)
*/
public static final int DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME = 60 * 1000;
@Reference
protected ChainManager chainManager;
@Reference
protected EnhancementEngineManager engineManager;
@Reference
protected EventAdmin eventAdmin;
/**
* If available it is used for logging ExecutionMetadata of failed or
* timed out Enhancement Requests (OPTIONAL)
*/
@Reference(cardinality=ReferenceCardinality.OPTIONAL_UNARY)
protected Serializer serializer;
@SuppressWarnings("rawtypes")
private ServiceRegistration jobHandlerRegistration;
private EnhancementJobHandler jobHandler;
private int maxEnhancementJobWaitTime = DEFAULT_MAX_ENHANCEMENT_JOB_WAIT_TIME;
/**
* Instantiates and registers the {@link EnhancementJobHandler} as
* {@link EventHandler} for the topic
* {@link org.apache.stanbol.enhancer.jobmanager.event.Constants#TOPIC_JOB_MANAGER}
* @param ctx
*/
@Activate
protected void activate(ComponentContext ctx){
log.info("activate {}",getClass().getName());
jobHandler = new EnhancementJobHandler(eventAdmin,engineManager);
Dictionary<String,Object> properties = new Hashtable<String,Object>();
properties.put(org.osgi.service.event.EventConstants.EVENT_TOPIC, TOPIC_JOB_MANAGER);
jobHandlerRegistration = ctx.getBundleContext().registerService(
EventHandler.class.getName(), jobHandler, properties);
Object maxWaitTime = ctx.getProperties().get(MAX_ENHANCEMENT_JOB_WAIT_TIME);
if (maxWaitTime instanceof Integer) {
this.maxEnhancementJobWaitTime = (Integer) maxWaitTime;
}
}
/**
* Unregisters the {@link EnhancementJobHandler}
* @param ctx
*/
@Deactivate
protected void deactivate(ComponentContext ctx){
log.info("deactivate {}",getClass().getName());
EnhancementJobHandler jobHandler = this.jobHandler;
//set first the field to null
this.jobHandler = null;
//and than close the instance to ensure that running jobs are shut down
//correctly
jobHandler.close();
jobHandlerRegistration.unregister();
jobHandlerRegistration = null;
}
@Override
public void enhanceContent(ContentItem ci) throws EnhancementException {
Chain defaultChain = chainManager.getDefault();
if(defaultChain == null){
throw new ChainException("Unable to enhance ContentItem '"+ci.getUri()+
"' because currently no enhancement chain is active. Please" +
"configure a Chain or enable the default chain");
}
enhanceContent(ci, defaultChain);
}
@Override
public void enhanceContent(ContentItem ci, Chain chain) throws EnhancementException {
if(ci == null) {
throw new IllegalArgumentException("The parsed contentItem MUST NOT be NULL!");
}
if(chain == null){
throw new IllegalArgumentException("Unable to enhance ContentItem '"+ci.getUri()+
"' because NULL was passed as enhancement chain");
}
long start = System.currentTimeMillis();
enhancementJobManagerLog.debug(">> enhance {} with chain {}", ci.getUri(), chain.getName());
boolean isDefaultChain = chain.equals(chainManager.getDefault());
EnhancementJob job = new EnhancementJob(ci, chain.getName(), chain.getExecutionPlan(),isDefaultChain);
//start the execution
//wait for the results
EnhancementJobObserver observer = jobHandler.register(job);
//now wait for the execution to finish for the configured maximum time
boolean completed = observer.waitForCompletion(maxEnhancementJobWaitTime);
if(!completed){ //throw timeout exception
StringBuilder sb = new StringBuilder("Status:\n");
ExecutionMetadata em = ExecutionMetadata.parseFrom(job.getExecutionMetadata(), ci.getUri());
for(Entry<String,Execution> ex : em.getEngineExecutions().entrySet()){
sb.append(" -").append(ex.getKey()).append(": ").append(ex.getValue().getStatus()).append('\n');
}
throw new ChainException("Execution timeout after "
+((System.currentTimeMillis()-start)/1000f)+"sec (timeout:"+(maxEnhancementJobWaitTime/1000)
+ "sec) for ContentItem "+ci.getUri()+"\n"+sb.toString()
+ " \n To change the timeout change value of property '"+
MAX_ENHANCEMENT_JOB_WAIT_TIME+"' for the service "+getClass());
}
log.info("Execution of Chain {} {} after {}ms for ContentItem {}",
new Object[]{ chain.getName(), job.isFailed() ? "failed" : "finished",
System.currentTimeMillis()-start,
job.getContentItem().getUri()});
//NOTE: ExecutionMetadata are not added to the metadata of the ContentItem
// by the EnhancementJobManager.
// However one could add this as an optional feature to the
// RESTful interface of the Enhancer!
//ci.getMetadata().addAll(job.getExecutionMetadata());
if(job.isFailed()){
Exception e = job.getError();
EnhancementJobHandler.logJobInfo(log, job, null, true);
logExecutionMetadata(enhancementJobManagerLog, job, true);
log.warn("ExecutionMetadata: ");
for(Iterator<Triple> it = job.getExecutionMetadata().iterator();
it.hasNext();
log.warn(it.next().toString()));
if (e instanceof SecurityException) {
throw (SecurityException)e;
} else if(e instanceof EnhancementException){
throw (EnhancementException)e;
} else {
throw new ChainException(job.getErrorMessage(), e);
}
}
if(!job.isFinished()){
log.warn("Execution finished, but Job is not finished!");
EnhancementJobHandler.logJobInfo(log, job, null, true);
logExecutionMetadata(log, job, true);
throw new ChainException("EnhancementJobManager was deactivated while" +
" enhancing the passed ContentItem "+job.getContentItem()+
" (EnhancementJobManager type: "+getClass()+")");
} else {
//log infos about the execution times to the enhancementJobManager
EnhancementJobHandler.logExecutionTimes(enhancementJobManagerLog, job);
logExecutionMetadata(enhancementJobManagerLog, job, false);
}
}
/**
* Logs the ExecutionMetadata
* @param logger the logger to log the execution metadata to
* @param job the enhancement job to log the execution metadata for
* @param isWarn if <code>true</code> the data are logged with <code>WARN</code> level.
* If <code>false</code> the <code>DEBUG</code> level is used
*/
protected void logExecutionMetadata(Logger logger, EnhancementJob job, boolean isWarn) {
if(log.isDebugEnabled() || (isWarn && log.isWarnEnabled())){
StringBuilder message = new StringBuilder(1024);
message.append("ExecutionMetadata for ContentItem ").append(job.getContentItem().getUri())
.append(" and Chain ").append(job.getChainName()).append(": \n");
boolean serialized = false;
if(serializer != null){
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
serializer.serialize(bout, job.getExecutionMetadata(), SupportedFormat.TURTLE);
message.append(bout.toString("utf-8"));
serialized = true;
} catch (RuntimeException e){
log.warn(" ... unable to serialize Execution Metadata | {}: {}",
e.getClass(), e.getMessage());
} catch (UnsupportedEncodingException e) {
log.warn(" ... unable to serialize Execution Metadata | {}: {}",
e.getClass(), e.getMessage());
}
}
if(!serialized){
//TURTLE serialization not possible ... use the toString method of triple
for(Triple t : job.getExecutionMetadata()){
message.append(t.toString()).append('\n');
}
}
//finally write the serialized graph to the logger
if(isWarn){
logger.warn(message.toString());
} else {
logger.debug(message.toString());
}
}
}
@Override
public List<EnhancementEngine> getActiveEngines() {
//This implementation return the list of active engined for the default
//Chain in the order they would be executed
Chain defaultChain = chainManager.getDefault();
if(defaultChain == null){
throw new IllegalStateException("Currently no enhancement chain is " +
"active. Please configure a Chain or enable the default chain");
}
ImmutableGraph ep;
try {
ep = defaultChain.getExecutionPlan();
} catch (ChainException e) {
throw new IllegalStateException("Unable to get Execution Plan for " +
"default enhancement chain (name: '"+defaultChain.getName()+
"'| class: '"+defaultChain.getClass()+"')!",e);
}
return ExecutionPlanHelper.getActiveEngines(engineManager,ep);
}
}