blob: 4433925782b6829ee70ade911ff04dc4033f21e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.pm;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.camel.CamelContext;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.pm.helper.DuccWorkHelper;
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.DuccEvent;
import org.apache.uima.ducc.transport.event.DuccJobsStateEvent;
import org.apache.uima.ducc.transport.event.PmStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccJobDeployment;
import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
import org.apache.uima.ducc.transport.event.common.DuccWorkPop;
import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
/**
* The ProcessManager's main role is to receive Orchestrator updates, trim received state and
* publish a new state to the agents. Trimming is done to remove state that is irrelevant to
* the agents.
*/
public class ProcessManagerComponent extends AbstractDuccComponent
implements ProcessManager {
private static final String[] jobHeaderArray=
{"DuccId","ProcessType","PID","ProcessState","ResourceState","NodeIP","NodeName","DeallocationType","JMX Url"};
private static final String jobHeaderFormat =
"%1$-15s|%2$-20s|%3$-10s|%4$-15s|%5$-15s|%6$-13s|%7$-45s|%8$-25s|%9$-45s";
private static String header;
private static String tbl=String.format("%1$-158s"," ").replace(" ", "-");
public static DuccLogger logger = new DuccLogger(ProcessManagerComponent.class, DuccComponent);
private static DuccWorkHelper dwHelper = null;
// Dispatch component used to send messages to remote Agents
private DuccEventDispatcher eventDispatcher;
private int shareQuantum;
private int fudgeFactor = 5; // default 5%
public ProcessManagerComponent(CamelContext context, DuccEventDispatcher eventDispatcher) {
super("ProcessManager",context);
this.eventDispatcher = eventDispatcher;
if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) {
shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
}
if ( System.getProperty("ducc.agent.share.size.fudge.factor") != null &&
System.getProperty("ducc.agent.share.size.fudge.factor").trim().length() > 0) {
fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor").trim());
}
header =
String.format(jobHeaderFormat,jobHeaderArray[0],jobHeaderArray[1],jobHeaderArray[2],
jobHeaderArray[3],jobHeaderArray[4],jobHeaderArray[5],jobHeaderArray[6],
jobHeaderArray[7],jobHeaderArray[8]+"\n");
dwHelper = new DuccWorkHelper();
}
public void start(DuccService service) throws Exception {
super.start(service, null);
DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.ProcessManager,getProcessJmxUrl());
logger.info("start", null, "--PM started - jmx URL:"+super.getProcessJmxUrl());
}
public DuccLogger getLogger() {
return logger;
}
/* New Code */
private long normalizeMemory(String processMemoryAssignment, MemoryUnits units) {
// Get user defined memory assignment for the JP
long normalizedProcessMemoryRequirements =
Long.parseLong(processMemoryAssignment);
// Normalize memory requirements for JPs into Gigs
if ( units.equals(MemoryUnits.KB ) ) {
normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024);
} else if ( units.equals(MemoryUnits.MB ) ) {
normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024;
} else if ( units.equals(MemoryUnits.GB ) ) {
// already normalized
} else if ( units.equals(MemoryUnits.TB ) ) {
normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024;
}
return normalizedProcessMemoryRequirements;
}
private int getShares(long normalizedProcessMemoryRequirements ) {
int shares = (int)normalizedProcessMemoryRequirements/shareQuantum; // get number of shares
if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel
return shares;
}
private String getCmdLine(ICommandLine iCommandLine) {
StringBuffer sb = new StringBuffer();
if(iCommandLine != null) {
String[] commandLine = iCommandLine.getCommandLine();
if(commandLine != null) {
for(String item : commandLine) {
sb.append(item);
sb.append(" ");
}
}
}
return sb.toString();
}
public void dispatchStateUpdateToAgents(Map<DuccId, IDuccWork> workMap, long sequence) {
String methodName="dispatchStateUpdateToAgents";
try {
dumpState(workMap);
// Create a job list which the PM will dispatch to agents
List<IDuccJobDeployment> jobDeploymentList = new ArrayList<IDuccJobDeployment>();
List<DuccUserReservation> reservationList = new ArrayList<DuccUserReservation>();
// Populate job list with job data that agents need. Don't copy data the agent doesnt
// care about.
for( Entry<DuccId, IDuccWork> entry : workMap.entrySet() ) {
if ( entry.getValue() instanceof DuccWorkJob ) {
DuccWorkJob dcj = (DuccWorkJob)entry.getValue();
// Create process list for each job
List<IDuccProcess> jobProcessList = new ArrayList<IDuccProcess>();
long normalizedProcessMemoryRequirements = normalizeMemory(dcj.getSchedulingInfo().getShareMemorySize(),dcj.getSchedulingInfo().getShareMemoryUnits());
int shares = getShares(normalizedProcessMemoryRequirements);
long processAdjustedMemorySize = shares * shareQuantum * 1024;
ProcessMemoryAssignment pma = new ProcessMemoryAssignment();
pma.setShares(shares);
pma.setNormalizedMemoryInMBs(processAdjustedMemorySize);
// Copy job processes
for( Entry<DuccId,IDuccProcess> jpProcess : dcj.getProcessMap().getMap().entrySet()) {
jobProcessList.add(jpProcess.getValue());
}
/*
if ( dcj.getUimaDeployableConfiguration() instanceof DuccUimaDeploymentDescriptor ) {
// Add deployment UIMA AS deployment descriptor path
((JavaCommandLine)dcj.getCommandLine()).
addArgument(((DuccUimaDeploymentDescriptor)dcj.getUimaDeployableConfiguration()).getDeploymentDescriptorPath());
}
*/
// add fudge factor (5% default) to adjust memory computed above
processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100));
pma.setMaxMemoryWithFudge(processAdjustedMemorySize);
logger.debug(methodName,dcj.getDuccId(),"--------------- User Requested Memory For Process:"+dcj.getSchedulingInfo().getShareMemorySize()+dcj.getSchedulingInfo().getShareMemoryUnits()+" PM Calculated Memory Assignment of:"+processAdjustedMemorySize);
ICommandLine driverCmdLine = null;
ICommandLine processCmdLine = null;
IDuccProcess driverProcess = null;
IDuccWork dw = null;
switch(dcj.getDuccType()) {
case Job:
logger.debug(methodName, dcj.getDuccId(), "case: Job");
dw = dwHelper.fetch(dcj.getDuccId());
IDuccWorkJob job = (IDuccWorkJob) dw;
DuccWorkPopDriver driver = job.getDriver();
if(driver != null) {
driverCmdLine = driver.getCommandLine();
driverProcess = driver.getProcessMap().entrySet().iterator().next().getValue();
}
processCmdLine = job.getCommandLine();
break;
case Service:
logger.debug(methodName, dcj.getDuccId(), "case: Service");
dw = dwHelper.fetch(dcj.getDuccId());
IDuccWorkJob service = (IDuccWorkJob) dw;
processCmdLine = service.getCommandLine();
processCmdLine.addOption("-Dducc.deploy.components=service");
break;
default:
logger.debug(methodName, dcj.getDuccId(), "case: default");
dw = dwHelper.fetch(dcj.getDuccId());
if(dw instanceof IDuccWorkExecutable) {
IDuccWorkExecutable dwe = (IDuccWorkExecutable) dw;
processCmdLine = dwe.getCommandLine();
}
break;
}
String dText = "n/a";
if(driverCmdLine != null) {
dText = getCmdLine(driverCmdLine);
}
logger.trace(methodName, dcj.getDuccId(), "driver: "+dText);
String pText = "n/a";
if(processCmdLine != null) {
pText = getCmdLine(processCmdLine);
}
logger.trace(methodName, dcj.getDuccId(), "process: "+pText);
jobDeploymentList.add( new DuccJobDeployment(dcj.getDuccId(), driverCmdLine,
processCmdLine,
dcj.getStandardInfo(),
driverProcess,
pma,
//processAdjustedMemorySize,
jobProcessList ));
} else if (entry.getValue() instanceof DuccWorkReservation ) {
String userId = ((DuccWorkReservation) entry.getValue()).getStandardInfo().getUser();
if ( !"System".equals(userId)) {
IDuccReservationMap reservationMap =
((DuccWorkReservation) entry.getValue()).getReservationMap();
reservationList.add(new DuccUserReservation(userId, ((DuccWorkReservation) entry.getValue()).getDuccId(), reservationMap));
logger.debug(methodName,null,"--------------- Added reservation for user:"+userId);
}
}
}
logger.info(methodName, null , "---- PM Dispatching DuccJobsStateEvent request to Agent(s) - State Map Size:"+jobDeploymentList.size()+" Reservation List:"+reservationList.size());
DuccJobsStateEvent ev = new DuccJobsStateEvent(DuccEvent.EventType.PM_STATE, jobDeploymentList, reservationList);
ev.setSequence(sequence);
// Dispatch state update to agents
eventDispatcher.dispatch(ev);
logger.debug(methodName, null , "+++++ PM Dispatched State To Agent(s)");
} catch( Throwable t ) {
logger.error(methodName,null,t);
}
}
private String formatProcess( IDuccProcess process ) {
return String.format(jobHeaderFormat,
String.valueOf(process.getDuccId().getFriendly()),
process.getProcessType().toString(),
(process.getPID()==null? "" :process.getPID()),
process.getProcessState().toString(),
process.getResourceState().toString(),
process.getNodeIdentity().getIp(),
process.getNodeIdentity().getName(),
process.getProcessDeallocationType().toString(),
(process.getProcessJmxUrl() == null ? "N/A" : process.getProcessJmxUrl() ));
}
private void dumpState(Map<DuccId, IDuccWork> workMap) {
String methodName="dumpState";
try {
StringBuffer sb = new StringBuffer();
for( Entry<DuccId,IDuccWork> job : workMap.entrySet()) {
IDuccWork duccWork = job.getValue();
if ( duccWork instanceof DuccWorkJob ) {
DuccWorkJob duccWorkJob = (DuccWorkJob)duccWork;
sb.append("\n").append(tbl).
append("\nJob ID: ").append(duccWorkJob.getDuccId().getFriendly()).
append("\tJobState: ").append(duccWorkJob.getStateObject()).
append("\tJobSubmittedBy: ").append(duccWorkJob.getStandardInfo().getUser()).
append("\n\n").
append(header).append(tbl).append("\n");
DuccWorkPopDriver driver = duccWorkJob.getDriver();
if(driver != null) {
IDuccProcess driverProcess =
driver.getProcessMap().entrySet().iterator().next().getValue();
sb.append(formatProcess(driverProcess));
}
for(Entry<DuccId,IDuccProcess> process : ((DuccWorkJob)job.getValue()).getProcessMap().entrySet()) {
sb.append("\n").append(formatProcess(process.getValue()));
}
sb.append("\n").append(tbl).append("\n");
logger.info(methodName, null, sb.toString());
} else if ( job.getValue() instanceof DuccWorkReservation ) {
continue; // TBI
} else if ( job.getValue() instanceof DuccWorkPop ) {
continue; // TBI
} else {
logger.info(methodName, job.getKey(), "Not a WorkJob but "+job.getClass().getName());
}
}
} catch( Exception e) {
e.printStackTrace();
}
}
/**
* Override
*/
public void setLogLevel(String clz, String level) {
logger.info("setLogLevel",null,"--------- Changing Log Level to:"+level+ " For class:"+clz);
super.setLogLevel(clz, level);
}
public void setLogLevel(String level) {
logger.info("setLogLevel",null,"--------- Changing Log Level to:"+level+ " For class:"+getClass().getCanonicalName());
super.setLogLevel(getClass().getCanonicalName(), level);
}
public String getLogLevel() {
return super.getLogLevel();
}
public void logAtTraceLevel(String toLog, String methodName) {
if ( logger.isTrace()) {
logger.trace(methodName,null,"--------- "+ toLog);
}
}
public PmStateDuccEvent getState() {
String methodName = "PmStateDuccEvent";
logger.trace(methodName,null,"");
return new PmStateDuccEvent();
}
}