| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.pm; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName; |
| import org.apache.uima.ducc.common.component.AbstractDuccComponent; |
| import org.apache.uima.ducc.common.main.DuccService; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.pm.helper.DuccWorkHelper; |
| import org.apache.uima.ducc.transport.cmdline.ICommandLine; |
| import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; |
| import org.apache.uima.ducc.transport.event.DuccEvent; |
| import org.apache.uima.ducc.transport.event.DuccJobsStateEvent; |
| import org.apache.uima.ducc.transport.event.PmStateDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.DuccJobDeployment; |
| import org.apache.uima.ducc.transport.event.common.DuccUserReservation; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkJob; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkPop; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver; |
| import org.apache.uima.ducc.transport.event.common.DuccWorkReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits; |
| import org.apache.uima.ducc.transport.event.common.IDuccWork; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable; |
| import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; |
| import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment; |
| |
| /** |
| * The ProcessManager's main role is to receive Orchestrator updates, trim received state and |
| * publish a new state to the agents. Trimming is done to remove state that is irrelevant to |
| * the agents. |
| */ |
| public class ProcessManagerComponent extends AbstractDuccComponent |
| implements ProcessManager { |
| private static final String[] jobHeaderArray= |
| {"DuccId","ProcessType","PID","ProcessState","ResourceState","NodeIP","NodeName","DeallocationType","JMX Url"}; |
| private static final String jobHeaderFormat = |
| "%1$-15s|%2$-20s|%3$-10s|%4$-15s|%5$-15s|%6$-13s|%7$-45s|%8$-25s|%9$-45s"; |
| |
| private static String header; |
| private static String tbl=String.format("%1$-158s"," ").replace(" ", "-"); |
| public static DuccLogger logger = new DuccLogger(ProcessManagerComponent.class, DuccComponent); |
| |
| private static DuccWorkHelper dwHelper = null; |
| |
| // Dispatch component used to send messages to remote Agents |
| private DuccEventDispatcher eventDispatcher; |
| private int shareQuantum; |
| private int fudgeFactor = 5; // default 5% |
| |
| public ProcessManagerComponent(CamelContext context, DuccEventDispatcher eventDispatcher) { |
| super("ProcessManager",context); |
| this.eventDispatcher = eventDispatcher; |
| if ( System.getProperty("ducc.rm.share.quantum") != null && System.getProperty("ducc.rm.share.quantum").trim().length() > 0 ) { |
| shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim()); |
| } |
| if ( System.getProperty("ducc.agent.share.size.fudge.factor") != null && |
| System.getProperty("ducc.agent.share.size.fudge.factor").trim().length() > 0) { |
| fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor").trim()); |
| } |
| |
| header = |
| String.format(jobHeaderFormat,jobHeaderArray[0],jobHeaderArray[1],jobHeaderArray[2], |
| jobHeaderArray[3],jobHeaderArray[4],jobHeaderArray[5],jobHeaderArray[6], |
| jobHeaderArray[7],jobHeaderArray[8]+"\n"); |
| dwHelper = new DuccWorkHelper(); |
| } |
| public void start(DuccService service) throws Exception { |
| super.start(service, null); |
| DuccDaemonRuntimeProperties.getInstance().boot(DaemonName.ProcessManager,getProcessJmxUrl()); |
| logger.info("start", null, "--PM started - jmx URL:"+super.getProcessJmxUrl()); |
| |
| } |
| public DuccLogger getLogger() { |
| return logger; |
| } |
| |
| /* New Code */ |
| |
| private long normalizeMemory(String processMemoryAssignment, MemoryUnits units) { |
| // Get user defined memory assignment for the JP |
| long normalizedProcessMemoryRequirements = |
| Long.parseLong(processMemoryAssignment); |
| // Normalize memory requirements for JPs into Gigs |
| if ( units.equals(MemoryUnits.KB ) ) { |
| normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/(1024*1024); |
| } else if ( units.equals(MemoryUnits.MB ) ) { |
| normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements/1024; |
| } else if ( units.equals(MemoryUnits.GB ) ) { |
| // already normalized |
| } else if ( units.equals(MemoryUnits.TB ) ) { |
| normalizedProcessMemoryRequirements = (int)normalizedProcessMemoryRequirements*1024; |
| } |
| return normalizedProcessMemoryRequirements; |
| } |
| private int getShares(long normalizedProcessMemoryRequirements ) { |
| int shares = (int)normalizedProcessMemoryRequirements/shareQuantum; // get number of shares |
| if ( (normalizedProcessMemoryRequirements % shareQuantum) > 0 ) shares++; // ciel |
| return shares; |
| } |
| |
| private String getCmdLine(ICommandLine iCommandLine) { |
| StringBuffer sb = new StringBuffer(); |
| if(iCommandLine != null) { |
| String[] commandLine = iCommandLine.getCommandLine(); |
| if(commandLine != null) { |
| for(String item : commandLine) { |
| sb.append(item); |
| sb.append(" "); |
| } |
| } |
| } |
| return sb.toString(); |
| } |
| |
| public void dispatchStateUpdateToAgents(Map<DuccId, IDuccWork> workMap, long sequence) { |
| String methodName="dispatchStateUpdateToAgents"; |
| try { |
| dumpState(workMap); |
| // Create a job list which the PM will dispatch to agents |
| List<IDuccJobDeployment> jobDeploymentList = new ArrayList<IDuccJobDeployment>(); |
| List<DuccUserReservation> reservationList = new ArrayList<DuccUserReservation>(); |
| |
| // Populate job list with job data that agents need. Don't copy data the agent doesnt |
| // care about. |
| for( Entry<DuccId, IDuccWork> entry : workMap.entrySet() ) { |
| if ( entry.getValue() instanceof DuccWorkJob ) { |
| DuccWorkJob dcj = (DuccWorkJob)entry.getValue(); |
| // Create process list for each job |
| List<IDuccProcess> jobProcessList = new ArrayList<IDuccProcess>(); |
| |
| long normalizedProcessMemoryRequirements = normalizeMemory(dcj.getSchedulingInfo().getShareMemorySize(),dcj.getSchedulingInfo().getShareMemoryUnits()); |
| int shares = getShares(normalizedProcessMemoryRequirements); |
| long processAdjustedMemorySize = shares * shareQuantum * 1024; |
| ProcessMemoryAssignment pma = new ProcessMemoryAssignment(); |
| pma.setShares(shares); |
| pma.setNormalizedMemoryInMBs(processAdjustedMemorySize); |
| |
| |
| // Copy job processes |
| for( Entry<DuccId,IDuccProcess> jpProcess : dcj.getProcessMap().getMap().entrySet()) { |
| jobProcessList.add(jpProcess.getValue()); |
| } |
| |
| |
| /* |
| if ( dcj.getUimaDeployableConfiguration() instanceof DuccUimaDeploymentDescriptor ) { |
| // Add deployment UIMA AS deployment descriptor path |
| ((JavaCommandLine)dcj.getCommandLine()). |
| addArgument(((DuccUimaDeploymentDescriptor)dcj.getUimaDeployableConfiguration()).getDeploymentDescriptorPath()); |
| } |
| */ |
| // add fudge factor (5% default) to adjust memory computed above |
| processAdjustedMemorySize += (processAdjustedMemorySize * ((double)fudgeFactor/100)); |
| pma.setMaxMemoryWithFudge(processAdjustedMemorySize); |
| |
| logger.debug(methodName,dcj.getDuccId(),"--------------- User Requested Memory For Process:"+dcj.getSchedulingInfo().getShareMemorySize()+dcj.getSchedulingInfo().getShareMemoryUnits()+" PM Calculated Memory Assignment of:"+processAdjustedMemorySize); |
| |
| ICommandLine driverCmdLine = null; |
| ICommandLine processCmdLine = null; |
| IDuccProcess driverProcess = null; |
| IDuccWork dw = null; |
| |
| switch(dcj.getDuccType()) { |
| case Job: |
| logger.debug(methodName, dcj.getDuccId(), "case: Job"); |
| dw = dwHelper.fetch(dcj.getDuccId()); |
| IDuccWorkJob job = (IDuccWorkJob) dw; |
| DuccWorkPopDriver driver = job.getDriver(); |
| if(driver != null) { |
| driverCmdLine = driver.getCommandLine(); |
| driverProcess = driver.getProcessMap().entrySet().iterator().next().getValue(); |
| } |
| processCmdLine = job.getCommandLine(); |
| break; |
| case Service: |
| logger.debug(methodName, dcj.getDuccId(), "case: Service"); |
| dw = dwHelper.fetch(dcj.getDuccId()); |
| IDuccWorkJob service = (IDuccWorkJob) dw; |
| processCmdLine = service.getCommandLine(); |
| processCmdLine.addOption("-Dducc.deploy.components=service"); |
| break; |
| default: |
| logger.debug(methodName, dcj.getDuccId(), "case: default"); |
| dw = dwHelper.fetch(dcj.getDuccId()); |
| if(dw instanceof IDuccWorkExecutable) { |
| IDuccWorkExecutable dwe = (IDuccWorkExecutable) dw; |
| processCmdLine = dwe.getCommandLine(); |
| } |
| break; |
| } |
| |
| String dText = "n/a"; |
| if(driverCmdLine != null) { |
| dText = getCmdLine(driverCmdLine); |
| } |
| logger.trace(methodName, dcj.getDuccId(), "driver: "+dText); |
| |
| String pText = "n/a"; |
| if(processCmdLine != null) { |
| pText = getCmdLine(processCmdLine); |
| } |
| logger.trace(methodName, dcj.getDuccId(), "process: "+pText); |
| |
| jobDeploymentList.add( new DuccJobDeployment(dcj.getDuccId(), driverCmdLine, |
| processCmdLine, |
| dcj.getStandardInfo(), |
| driverProcess, |
| pma, |
| //processAdjustedMemorySize, |
| jobProcessList )); |
| } else if (entry.getValue() instanceof DuccWorkReservation ) { |
| String userId = ((DuccWorkReservation) entry.getValue()).getStandardInfo().getUser(); |
| if ( !"System".equals(userId)) { |
| IDuccReservationMap reservationMap = |
| ((DuccWorkReservation) entry.getValue()).getReservationMap(); |
| reservationList.add(new DuccUserReservation(userId, ((DuccWorkReservation) entry.getValue()).getDuccId(), reservationMap)); |
| logger.debug(methodName,null,"--------------- Added reservation for user:"+userId); |
| } |
| } |
| } |
| logger.info(methodName, null , "---- PM Dispatching DuccJobsStateEvent request to Agent(s) - State Map Size:"+jobDeploymentList.size()+" Reservation List:"+reservationList.size()); |
| DuccJobsStateEvent ev = new DuccJobsStateEvent(DuccEvent.EventType.PM_STATE, jobDeploymentList, reservationList); |
| ev.setSequence(sequence); |
| // Dispatch state update to agents |
| eventDispatcher.dispatch(ev); |
| logger.debug(methodName, null , "+++++ PM Dispatched State To Agent(s)"); |
| } catch( Throwable t ) { |
| logger.error(methodName,null,t); |
| } |
| } |
| |
| private String formatProcess( IDuccProcess process ) { |
| return String.format(jobHeaderFormat, |
| String.valueOf(process.getDuccId().getFriendly()), |
| process.getProcessType().toString(), |
| (process.getPID()==null? "" :process.getPID()), |
| process.getProcessState().toString(), |
| process.getResourceState().toString(), |
| process.getNodeIdentity().getIp(), |
| process.getNodeIdentity().getName(), |
| process.getProcessDeallocationType().toString(), |
| (process.getProcessJmxUrl() == null ? "N/A" : process.getProcessJmxUrl() )); |
| |
| } |
| private void dumpState(Map<DuccId, IDuccWork> workMap) { |
| String methodName="dumpState"; |
| try { |
| StringBuffer sb = new StringBuffer(); |
| for( Entry<DuccId,IDuccWork> job : workMap.entrySet()) { |
| IDuccWork duccWork = job.getValue(); |
| if ( duccWork instanceof DuccWorkJob ) { |
| DuccWorkJob duccWorkJob = (DuccWorkJob)duccWork; |
| sb.append("\n").append(tbl). |
| append("\nJob ID: ").append(duccWorkJob.getDuccId().getFriendly()). |
| append("\tJobState: ").append(duccWorkJob.getStateObject()). |
| append("\tJobSubmittedBy: ").append(duccWorkJob.getStandardInfo().getUser()). |
| append("\n\n"). |
| append(header).append(tbl).append("\n"); |
| DuccWorkPopDriver driver = duccWorkJob.getDriver(); |
| if(driver != null) { |
| IDuccProcess driverProcess = |
| driver.getProcessMap().entrySet().iterator().next().getValue(); |
| sb.append(formatProcess(driverProcess)); |
| } |
| for(Entry<DuccId,IDuccProcess> process : ((DuccWorkJob)job.getValue()).getProcessMap().entrySet()) { |
| sb.append("\n").append(formatProcess(process.getValue())); |
| } |
| sb.append("\n").append(tbl).append("\n"); |
| logger.info(methodName, null, sb.toString()); |
| } else if ( job.getValue() instanceof DuccWorkReservation ) { |
| continue; // TBI |
| } else if ( job.getValue() instanceof DuccWorkPop ) { |
| continue; // TBI |
| } else { |
| logger.info(methodName, job.getKey(), "Not a WorkJob but "+job.getClass().getName()); |
| } |
| } |
| |
| } catch( Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| /** |
| * Override |
| */ |
| public void setLogLevel(String clz, String level) { |
| logger.info("setLogLevel",null,"--------- Changing Log Level to:"+level+ " For class:"+clz); |
| super.setLogLevel(clz, level); |
| } |
| public void setLogLevel(String level) { |
| logger.info("setLogLevel",null,"--------- Changing Log Level to:"+level+ " For class:"+getClass().getCanonicalName()); |
| super.setLogLevel(getClass().getCanonicalName(), level); |
| } |
| public String getLogLevel() { |
| return super.getLogLevel(); |
| } |
| public void logAtTraceLevel(String toLog, String methodName) { |
| if ( logger.isTrace()) { |
| logger.trace(methodName,null,"--------- "+ toLog); |
| } |
| } |
| |
| |
| public PmStateDuccEvent getState() { |
| String methodName = "PmStateDuccEvent"; |
| logger.trace(methodName,null,""); |
| return new PmStateDuccEvent(); |
| } |
| |
| } |