| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.stanbol.enhancer.jobmanager.event.impl; |
| |
| import static org.apache.stanbol.enhancer.jobmanager.event.Constants.PROPERTY_EXECUTION; |
| import static org.apache.stanbol.enhancer.jobmanager.event.Constants.PROPERTY_JOB_MANAGER; |
| import static org.apache.stanbol.enhancer.jobmanager.event.Constants.TOPIC_JOB_MANAGER; |
| import static org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper.getEngine; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Dictionary; |
| import java.util.Hashtable; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.clerezza.rdf.core.NonLiteral; |
| import org.apache.stanbol.enhancer.servicesapi.EngineException; |
| import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine; |
| import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager; |
| import org.osgi.service.event.Event; |
| import org.osgi.service.event.EventAdmin; |
| import org.osgi.service.event.EventHandler; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.ibm.icu.lang.UCharacter.SentenceBreak; |
| |
| public class EnhancementJobHandler implements EventHandler { |
| |
| private EnhancementEngineManager engineManager; |
| private EventAdmin eventAdmin; |
| |
| /* |
| * NOTE on debug level Loggings |
| * |
| * ++ ... init some activity |
| * >> ... started some activity (thread has the requested lock) |
| * << ... completed some activity (thread has released the lock) |
| * |
| * n: ... no lock |
| * r: ... read lock |
| * w: ... write lock |
| */ |
| private Logger log = LoggerFactory.getLogger(EnhancementJobHandler.class); |
| /** |
| * Keys are {@link EnhancementJob}s currently asynchronously enhancing |
| * contentItems and the values are the objects used to interrupt the |
| * requesting thread as soon as the enhancement process has finished. |
| */ |
| private Map<EnhancementJob,EnhancementJobObserver> processingJobs; |
| private final ReadWriteLock processingLock = new ReentrantReadWriteLock(); |
| private Thread observerDaemon; |
| |
| public EnhancementJobHandler(EventAdmin eventAdmin, |
| EnhancementEngineManager engineManager) { |
| if(eventAdmin == null){ |
| throw new IllegalArgumentException("The parsed EventAdmin service MUST NOT be NULL!"); |
| } |
| if(engineManager == null){ |
| throw new IllegalArgumentException("The parsed EnhancementEngineManager MUST NOT be NULL!"); |
| } |
| this.eventAdmin = eventAdmin; |
| this.engineManager = engineManager; |
| processingLock.writeLock().lock(); |
| try { |
| processingJobs = new LinkedHashMap<EnhancementJob,EnhancementJobObserver>(); |
| } finally{ |
| processingLock.writeLock().unlock(); |
| } |
| observerDaemon = new Thread(new EnhancementJobObserverDaemon()); |
| observerDaemon.setName("Event Job Manager Observer Daemon"); |
| observerDaemon.setDaemon(true); |
| observerDaemon.start(); |
| |
| } |
| /** |
| * Closes this Handler and notifies all components that wait for still |
| * running jobs |
| */ |
| public void close(){ |
| log.info("deactivate {}",getClass().getName()); |
| processingLock.writeLock().lock(); |
| try { |
| for(Object o : processingJobs.values()){ |
| synchronized (o) { |
| o.notifyAll(); |
| } |
| } |
| processingJobs = null; |
| } finally { |
| processingLock.writeLock().unlock(); |
| } |
| observerDaemon = null; |
| } |
| |
| /** |
| * Registers an EnhancementJob and will start the enhancement process. |
| * When the process is finished or this service is deactivated the |
| * returned oject will be notified. Therefore callers that need to |
| * wait for the completion of the parsed job will want to |
| * <code><pre> |
| * Object object = enhancementJobHandler.register(); |
| * while(!job.isFinished() & enhancementJobHandler != null){ |
| * synchronized (object) { |
| * try { |
| * object.wait(); |
| * } catch (InterruptedException e) {} |
| * } |
| * } |
| * </pre></code> |
| * @param enhancementJob the enhancement job to register |
| * @return An object that will get {@link Object#notifyAll()} as soon as |
| * {@link EnhancementJob#isFinished()} or this instance is deactivated |
| */ |
| public EnhancementJobObserver register(EnhancementJob enhancementJob){ |
| final boolean init; |
| EnhancementJobObserver observer; |
| processingLock.writeLock().lock(); |
| try { |
| if(enhancementJob == null || processingJobs == null){ |
| return null; |
| } |
| observer = processingJobs.get(enhancementJob); |
| if(observer == null){ |
| observer = new EnhancementJobObserver(enhancementJob); |
| logJobInfo(log, enhancementJob, "Add EnhancementJob:",false); |
| processingJobs.put(enhancementJob, observer); |
| init = true; |
| } else { |
| init = false; |
| } |
| } finally { |
| processingLock.writeLock().unlock(); |
| } |
| if(init){ |
| observer.acquire(); |
| enhancementJob.startProcessing(); |
| log.debug("++ w: {}","init execution"); |
| enhancementJob.getLock().writeLock().lock(); |
| try { |
| log.debug(">> w: {}","init execution"); |
| executeNextNodes(enhancementJob); |
| } finally { |
| log.debug("<< w: {}","init execution"); |
| enhancementJob.getLock().writeLock().unlock(); |
| } |
| } |
| return observer; |
| } |
| |
| @Override |
| public void handleEvent(Event event) { |
| EnhancementJob job = (EnhancementJob)event.getProperty(PROPERTY_JOB_MANAGER); |
| NonLiteral execution = (NonLiteral)event.getProperty(PROPERTY_EXECUTION); |
| if(job == null || execution == null){ |
| log.warn("Unable to process EnhancementEvent where EnhancementJob " + |
| "{} or Execution node {} is null -> ignore",job,execution); |
| } |
| try { |
| processEvent(job, execution); |
| } catch (Throwable t) { |
| String message = String.format("Unexpected Exception while processing " + |
| "ContentItem %s with EnhancementJobManager: %s", |
| job.getContentItem().getUri(),EventJobManagerImpl.class); |
| //this ensures that an runtime exception does not |
| job.setFailed(execution, null, new IllegalStateException(message,t)); |
| log.error(message,t); |
| } |
| //(2) trigger the next actions |
| log.debug("++ w: {}","check for next Executions"); |
| job.getLock().writeLock().lock(); |
| log.debug(">> w: {}","check for next Executions"); |
| try { |
| if(job.isFinished()){ |
| finish(job); |
| } else if(!job.isFailed()){ |
| if(!executeNextNodes(job) && job.getRunning().isEmpty()){ |
| log.warn("Unexpected state in the Execution of ContentItem {}:" |
| + " Job is not finished AND no executions are running AND" |
| + " no further execution could be started! -> finishing" |
| + " this job :("); |
| finish(job); |
| } //else execution started of other jobs are running |
| } else { |
| if(log.isInfoEnabled()){ |
| Collection<String> running = new ArrayList<String>(3); |
| for(NonLiteral runningNode : job.getRunning()){ |
| running.add(getEngine(job.getExecutionPlan(), job.getExecutionNode(runningNode))); |
| } |
| log.info("Job {} failed, but {} still running!", |
| job.getContentItem().getUri(),running); |
| } |
| } |
| } finally { |
| log.debug("<< w: {}","check for next Executions"); |
| job.getLock().writeLock().unlock(); |
| } |
| } |
| /** |
| * @param job |
| * @param execution |
| */ |
| private void processEvent(EnhancementJob job, NonLiteral execution) { |
| NonLiteral executionNode = job.getExecutionNode(execution); |
| String engineName = getEngine(job.getExecutionPlan(), executionNode); |
| //(1) execute the parsed ExecutionNode |
| EnhancementEngine engine = engineManager.getEngine(engineName); |
| if(engine != null){ |
| //execute the engine |
| Exception exception = null; |
| int engineState; |
| try { |
| engineState = engine.canEnhance(job.getContentItem()); |
| } catch (EngineException e) { |
| exception = e; |
| log.warn("Unable to check if engine '" + engineName |
| + "'(type: " + engine.getClass() + ") can enhance ContentItem '" |
| + job.getContentItem().getUri()+ "'!",e); |
| engineState = EnhancementEngine.CANNOT_ENHANCE; |
| } |
| if(engineState == EnhancementEngine.ENHANCE_SYNCHRONOUS){ |
| //ensure that this engine exclusively access the content item |
| log.debug("++ w: {}: {}","start sync execution", engine.getName()); |
| job.getLock().writeLock().lock(); |
| log.debug(">> w: {}: {}","start sync execution", engine.getName()); |
| try { |
| engine.computeEnhancements(job.getContentItem()); |
| job.setCompleted(execution); |
| } catch (EngineException e){ |
| job.setFailed(execution, engine, e); |
| } finally{ |
| log.debug("<< w: {}: {}","finished sync execution", engine.getName()); |
| job.getLock().writeLock().unlock(); |
| } |
| } else if(engineState == EnhancementEngine.ENHANCE_ASYNC){ |
| try { |
| log.debug("++ n: start async execution of Engine {}",engine.getName()); |
| engine.computeEnhancements(job.getContentItem()); |
| log.debug("++ n: finished async execution of Engine {}",engine.getName()); |
| job.setCompleted(execution); |
| } catch (EngineException e) { |
| job.setFailed(execution, engine, e); |
| } catch (RuntimeException e) { |
| job.setFailed(execution, engine, e); |
| } |
| } else { //CANNOT_ENHANCE |
| if(exception != null){ |
| job.setFailed(execution,engine,exception); |
| } else { //can not enhance is not an error |
| //it just says this engine can not enhance this content item |
| job.setCompleted(execution); |
| } |
| } |
| } else { //engine with that name is not available |
| job.setFailed(execution, null, null); |
| } |
| } |
| /** |
| * Removes a finished job from {@link #processingJobs} and notifies |
| * all waiting components |
| * @param job the finished job |
| */ |
| private void finish(EnhancementJob job){ |
| processingLock.writeLock().lock(); |
| EnhancementJobObserver observer; |
| try { |
| observer = processingJobs.remove(job); |
| } finally { |
| processingLock.writeLock().unlock(); |
| } |
| if(observer != null) { |
| try { |
| logJobInfo(log, job, "Finished EnhancementJob:",false); |
| log.debug("++ n: finished processing ContentItem {} with Chain {}", |
| job.getContentItem().getUri(),job.getChainName()); |
| } finally { |
| //release the semaphore to send signal to the EventJobManager waiting |
| //for the results |
| observer.release(); |
| } |
| } else { |
| log.warn("EnhancementJob for ContentItem {} is not " + |
| "registered with {}. Will not send notification!", |
| job.getContentItem().getUri(), getClass().getName()); |
| } |
| } |
| /** |
| * triggers the execution of the next nodes or if |
| * {@link EnhancementJob#isFinished()} notifies the one who registered |
| * the {@link EnhancementJob} with this component. |
| * @param job the enhancement job to process |
| * @return if an Execution event was sent |
| */ |
| protected boolean executeNextNodes(EnhancementJob job) { |
| //getExecutable returns an snapshot so we do not need to lock |
| boolean startedExecution = false; |
| for(NonLiteral executable : job.getExecutable()){ |
| if(log.isDebugEnabled()){ |
| log.debug("PREPARE execution of Engine {}", |
| getEngine(job.getExecutionPlan(), job.getExecutionNode(executable))); |
| } |
| Dictionary<String,Object> properties = new Hashtable<String,Object>(); |
| properties.put(PROPERTY_JOB_MANAGER, job); |
| properties.put(PROPERTY_EXECUTION, executable); |
| job.setRunning(executable); |
| if(log.isDebugEnabled()){ |
| log.debug("SHEDULE execution of Engine {}", |
| getEngine(job.getExecutionPlan(), job.getExecutionNode(executable))); |
| } |
| eventAdmin.postEvent(new Event(TOPIC_JOB_MANAGER,properties)); |
| startedExecution = true; |
| } |
| return startedExecution; |
| } |
| |
| /** |
| * Logs basic infos about the Job as INFO and detailed infos as DEBUG |
| * @param job |
| */ |
| protected static void logJobInfo(Logger log, EnhancementJob job, String header, boolean logExecutions) { |
| if(header != null){ |
| log.info(header); |
| } |
| log.info(" finished: {}",job.isFinished()); |
| log.info(" state: {}",job.isFailed()?"failed":"processing"); |
| log.info(" chain: {}",job.getChainName()); |
| log.info(" content-item: {}", job.getContentItem().getUri()); |
| if(logExecutions){ |
| log.info(" executions:"); |
| for(NonLiteral completedExec : job.getCompleted()){ |
| log.info(" - {} completed",getEngine(job.getExecutionMetadata(), |
| job.getExecutionNode(completedExec))); |
| } |
| for(NonLiteral runningExec : job.getRunning()){ |
| log.info(" - {} running",getEngine(job.getExecutionMetadata(), |
| job.getExecutionNode(runningExec))); |
| } |
| } |
| } |
| public class EnhancementJobObserver{ |
| |
| private static final int MIN_WAIT_TIME = 500; |
| private final EnhancementJob enhancementJob; |
| private final Semaphore semaphore; |
| |
| private EnhancementJobObserver(EnhancementJob job){ |
| if(job == null){ |
| throw new IllegalArgumentException("The parsed EnhancementJob MUST NOT be NULL!"); |
| } |
| this.enhancementJob = job; |
| this.semaphore = new Semaphore(1); |
| } |
| |
| protected void acquire() { |
| try { |
| semaphore.acquire(); |
| } catch (InterruptedException e) { |
| log.warn("Interrupted while acquireing Semaphore for EnhancementJob " |
| + enhancementJob + "!",e); |
| } |
| } |
| |
| protected void release() { |
| semaphore.release(); |
| } |
| |
| public boolean hasCompleted() { |
| enhancementJob.getLock().readLock().lock(); |
| try { |
| return enhancementJob.isFinished(); |
| } finally { |
| enhancementJob.getLock().readLock().unlock(); |
| } |
| } |
| |
| public void waitForCompletion(int maxEnhancementJobWaitTime) { |
| if(semaphore.availablePermits() < 1){ |
| // The only permit is taken by the EnhancementJobHander |
| try { |
| semaphore.tryAcquire(1, |
| Math.max(MIN_WAIT_TIME, maxEnhancementJobWaitTime),TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| //interupted |
| } |
| } else if(!hasCompleted()){ |
| int wait = Math.max(100, maxEnhancementJobWaitTime/10); |
| log.warn("Unexpected permit available for Semaphore of " |
| + "EnhancementJob of ContentItem {}. Fallback to wait({})" |
| + "for detecting if Job has finished. While the fallback " |
| + "should ensure correct Enhancement results this indicates a " |
| + "Bug in the EventHobManager. Please feel free to report " |
| + "This on dev@stanbol.apache.org or the Apache Stanbol " |
| + "Issue Tracker.",enhancementJob.getContentItem().getUri(),wait); |
| try { |
| Thread.currentThread().wait(wait); |
| } catch (InterruptedException e) { |
| //interupted |
| } |
| }// else completed |
| } |
| |
| } |
| |
| |
| /** |
| * Currently only used to debug the number of currently registered |
| * Enhancements Jobs (if there are some) |
| * @author Rupert Westenthaler |
| */ |
| private class EnhancementJobObserverDaemon implements Runnable { |
| |
| /** |
| * The logger of the Observer. Can be used to configure Loglevel specificly |
| * |
| */ |
| private Logger observerLog = LoggerFactory.getLogger(EnhancementJobObserverDaemon.class); |
| |
| @Override |
| public void run() { |
| observerLog.debug(" ... init EnhancementJobObserver"); |
| while(processingJobs != null){ |
| try { |
| Thread.sleep(10000); |
| } catch (InterruptedException e) { |
| } |
| Collection<EnhancementJob> jobs; |
| Lock readLock = processingLock.readLock(); |
| readLock.lock(); |
| try { |
| if(processingJobs != null){ |
| jobs = new ArrayList<EnhancementJob>(processingJobs.keySet()); |
| } else { |
| jobs = Collections.emptyList(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| if(!jobs.isEmpty()){ |
| observerLog.info(" -- {} active Enhancement Jobs",jobs.size()); |
| if(observerLog.isDebugEnabled()){ |
| for(EnhancementJob job : jobs){ |
| Lock jobLock = job.getLock().readLock(); |
| jobLock.lock(); |
| try { |
| logJobInfo(observerLog,job,null,true); |
| } finally { |
| jobLock.unlock(); |
| } |
| } |
| } |
| } else { |
| log.debug(" -- No active Enhancement Jobs"); |
| } |
| } |
| |
| } |
| |
| } |
| |
| |
| } |