blob: 8617064732c91c240cd9094c0b7e5abfd8871019 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.jd;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.CamelContext;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.internationalization.Messages;
import org.apache.uima.ducc.common.jd.JdConstants;
import org.apache.uima.ducc.common.jd.files.WorkItemStateManager;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.ExceptionHelper;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.Rationale;
import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
import org.apache.uima.ducc.transport.event.jd.PerformanceSummaryWriter;
import org.apache.uima.ducc.transport.json.jp.JobProcessCollection;
import org.apache.uima.ducc.transport.json.jp.JobProcessData;
public class JobDriverComponent extends AbstractDuccComponent
implements IJobDriverComponent {
private static DuccLogger duccOut = DuccLoggerComponents.getJdOut(JobDriverComponent.class.getName());
private static Messages duccMsg = JobDriverContext.getInstance().getSystemMessages();
public JobDriverComponent(CamelContext context, String jdBrokerUrl, String jdQueuePrefix, String localeLanguage, String localeCountry) {
super("JobDriver",context);
init(jdBrokerUrl,jdQueuePrefix);
JobDriverContext.getInstance().initSystemMessages(localeLanguage,localeCountry);
duccMsg = JobDriverContext.getInstance().getSystemMessages();
}
private DuccId duccId = null;
private String jobId = String.valueOf(-1);
protected JobDriver thread = null;
private String jdBrokerUrl;
private String jdQueue;
private AtomicInteger publicationCounter = new AtomicInteger(0);
private JobProcessCollection jpc = null;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean active = new AtomicBoolean(true);
private void init(String jdBrokerUrl,String jdQueuePrefix) {
String methodName = "init";
duccOut.trace(methodName, null, duccMsg.fetch("enter"));
String jobIdProperty = System.getProperty(JdConstants.key_duccJobId);
jobId = DuccWorkMap.normalize(jobIdProperty);
this.jdBrokerUrl = jdBrokerUrl;
this.jdQueue = jdQueuePrefix+jobId;
duccOut.debug(methodName, null, duccMsg.fetchLabel("job.broker")+this.jdBrokerUrl+" "+duccMsg.fetchLabel("job.queue")+this.jdQueue);
duccOut.trace(methodName, null, duccMsg.fetch("exit"));
}
public DuccLogger getLogger() {
return duccOut;
}
private void dumpProcessMap(DuccWorkJob job) {
String methodName = "pmap";
if(job == null) {
duccOut.debug(methodName, null, "job:"+job );
}
else {
duccOut.debug(methodName, job.getDuccId(), "job:"+job.getId() );
Map<DuccId, IDuccProcess> map = job.getProcessMap().getMap();
for( Entry<DuccId, IDuccProcess> entry : map.entrySet() ) {
IDuccProcess process = entry.getValue();
process.getDuccId();
NodeIdentity nodeIdentity = process.getNodeIdentity();
String node = null;
String ip = null;
if(nodeIdentity != null) {
node = nodeIdentity.getName();
ip = nodeIdentity.getIp();
}
String pid = process.getPID();
duccOut.debug(methodName, job.getDuccId(), process.getDuccId(), "node:"+node+" "+"ip:"+ip+" "+"pid:"+pid );
}
}
}
private boolean dumpProcessMapEnabled = false;
protected String summarize(Exception e) {
return ExceptionHelper.summarize(e);
}
protected void process(OrchestratorAbbreviatedStateDuccEvent duccEvent) {
String methodName = "process";
duccOut.trace(methodName, null, duccMsg.fetch("enter"));
DuccWorkJob job = (DuccWorkJob) duccEvent.getWorkMap().findDuccWork(DuccType.Job, jobId);
if(dumpProcessMapEnabled) {
dumpProcessMap(job);
}
if(job != null) {
if(duccId == null) {
duccId = job.getDuccId();
}
duccOut.trace(methodName, duccId, "jd-cmd:"+job.getDriver().getCommandLine());
duccOut.trace(methodName, duccId, "jp-cmd:"+job.getCommandLine());
synchronized(jobId) {
if(thread != null) {
thread.setJob(job);
}
if(!started.get()) {
started.set(true);
duccOut.debug(methodName, job.getDuccId(), job.getJobState());
duccOut.trace(methodName, job.getDuccId(), duccMsg.fetch("creating driver thread"));
try {
thread = new JobDriver();
duccOut.trace(methodName, job.getDuccId(), "thread:"+thread);
thread.initialize(job, getProcessJmxUrl());
thread.start();
jpc = new JobProcessCollection(job);
}
catch(Exception e) {
duccOut.error(methodName, null, e);
duccOut.error(methodName, job.getDuccId(), summarize(e), e);
}
catch(Throwable t) {
duccOut.error(methodName, null, t);
}
}
if(active.get()) {
try {
if(jpc != null) {
ConcurrentSkipListMap<Long, JobProcessData> map = jpc.transform(job);
jpc.exportData(map);
}
}
catch(Exception e) {
duccOut.error(methodName, job.getDuccId(), summarize(e), e);
}
}
}
}
else {
duccOut.debug(methodName, duccId, duccMsg.fetch("job not found"));
if(active.get()) {
active.set(false);
if(thread != null) {
thread.kill(new Rationale("job driver failed to locate job in map"));
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
duccOut.debug(methodName, duccId, e);
}
duccOut.debug(methodName, duccId, duccMsg.fetch("thread killed"));
}
}
}
duccOut.trace(methodName, null, duccMsg.fetch("exit"));
}
protected void publisher() {
String methodName = "publisher";
PerformanceSummaryWriter performanceSummaryWriter = thread.getPerformanceSummaryWriter();
if(performanceSummaryWriter == null) {
duccOut.debug(methodName, null, duccMsg.fetch("performanceSummaryWriter is null"));
}
else {
performanceSummaryWriter.writeSummary();
}
WorkItemStateManager workItemStateManager = thread.getWorkItemStateManager();
if(workItemStateManager == null) {
duccOut.debug(methodName, null, duccMsg.fetch("workItemStateManager is null"));
}
else {
try {
workItemStateManager.exportData();
}
catch(Exception e) {
duccOut.error(methodName, null, e);
}
}
}
public JdStateDuccEvent getState() {
String methodName = "getState";
duccOut.trace(methodName, null, duccMsg.fetch("enter"));
JdStateDuccEvent jdStateDuccEvent = new JdStateDuccEvent();
if(active.get()) {
publicationCounter.addAndGet(1);
try {
duccOut.debug(methodName, null, duccMsg.fetch("publishing state"));
if(thread != null) {
thread.rectifyStatus();
DriverStatusReport dsr = thread.getDriverStatusReportCopy();
if(dsr == null) {
duccOut.debug(methodName, null, duccMsg.fetch("dsr is null"));
}
else {
duccOut.debug(methodName, null, "driverState:"+dsr.getDriverState());
duccOut.debug(methodName, dsr.getDuccId(), dsr.getLogReport());
jdStateDuccEvent.setState(dsr);
}
publisher();
}
else {
duccOut.debug(methodName, null, duccMsg.fetch("thread is null"));
}
}
catch(Exception e) {
duccOut.error(methodName, null, e);
}
}
duccOut.trace(methodName, null, duccMsg.fetch("exit"));
return jdStateDuccEvent;
}
public void evaluateJobDriverConstraints(OrchestratorAbbreviatedStateDuccEvent duccEvent) {
String methodName = "evaluateDispatchedJobConstraints";
duccOut.trace(methodName, null, duccMsg.fetch("enter"));
duccOut.debug(methodName, null, duccMsg.fetchLabel("received")+"OrchestratorStateEvent");
if(active.get()) {
process(duccEvent);
}
duccOut.trace(methodName, null, duccMsg.fetch("exit"));
}
}