| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.sls; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.Reader; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.Collections; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.metrics2.source.JvmMetrics; |
| import org.apache.hadoop.tools.rumen.JobTraceReader; |
| import org.apache.hadoop.tools.rumen.LoggedJob; |
| import org.apache.hadoop.tools.rumen.LoggedTask; |
| import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; |
| import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; |
| import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; |
| import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; |
| import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; |
| import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; |
| import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; |
| import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; |
| import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; |
| import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; |
| import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; |
| import org.apache.hadoop.yarn.sls.synthetic.SynthJob; |
| import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; |
| import org.apache.hadoop.yarn.sls.utils.SLSUtils; |
| import org.apache.hadoop.yarn.util.UTCClock; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Private |
| @Unstable |
| public class SLSRunner extends Configured implements Tool { |
| // RM, Runner |
| private ResourceManager rm; |
| private static TaskRunner runner = new TaskRunner(); |
| private String[] inputTraces; |
| private Map<String, Integer> queueAppNumMap; |
| |
| // NM simulator |
| private HashMap<NodeId, NMSimulator> nmMap; |
| private Resource nodeManagerResource; |
| private String nodeFile; |
| |
| // AM simulator |
| private int AM_ID; |
| private Map<String, AMSimulator> amMap; |
| private Set<String> trackedApps; |
| private Map<String, Class> amClassMap; |
| private static int remainingApps = 0; |
| |
| // metrics |
| private String metricsOutputDir; |
| private boolean printSimulation; |
| |
| // other simulation information |
| private int numNMs, numRacks, numAMs, numTasks; |
| private long maxRuntime; |
| |
| private final static Map<String, Object> simulateInfoMap = |
| new HashMap<String, Object>(); |
| |
| // logger |
| public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); |
| |
| private final static int DEFAULT_MAPPER_PRIORITY = 20; |
| private final static int DEFAULT_REDUCER_PRIORITY = 10; |
| |
| private static boolean exitAtTheFinish = false; |
| |
| /** |
| * The type of trace in input. |
| */ |
| public enum TraceType { |
| SLS, RUMEN, SYNTH |
| } |
| |
| private TraceType inputType; |
| private SynthTraceJobProducer stjp; |
| |
| public SLSRunner() throws ClassNotFoundException { |
| Configuration tempConf = new Configuration(false); |
| init(tempConf); |
| } |
| |
| public SLSRunner(Configuration tempConf) throws ClassNotFoundException { |
| init(tempConf); |
| } |
| |
| @Override |
| public void setConf(Configuration conf) { |
| if (null != conf) { |
| // Override setConf to make sure all conf added load sls-runner.xml, see |
| // YARN-6560 |
| conf.addResource("sls-runner.xml"); |
| } |
| super.setConf(conf); |
| } |
| |
| private void init(Configuration tempConf) throws ClassNotFoundException { |
| nmMap = new HashMap<>(); |
| queueAppNumMap = new HashMap<>(); |
| amMap = new ConcurrentHashMap<>(); |
| amClassMap = new HashMap<>(); |
| |
| // runner configuration |
| setConf(tempConf); |
| |
| // runner |
| int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, |
| SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); |
| SLSRunner.runner.setQueueSize(poolSize); |
| // <AMType, Class> map |
| for (Map.Entry e : tempConf) { |
| String key = e.getKey().toString(); |
| if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { |
| String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); |
| amClassMap.put(amType, Class.forName(tempConf.get(key))); |
| } |
| } |
| |
| nodeManagerResource = getNodeManagerResource(); |
| } |
| |
| private Resource getNodeManagerResource() { |
| Resource resource = Resources.createResource(0); |
| ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); |
| for (ResourceInformation info : infors) { |
| long value; |
| if (info.getName().equals(ResourceInformation.MEMORY_URI)) { |
| value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, |
| SLSConfiguration.NM_MEMORY_MB_DEFAULT); |
| } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { |
| value = getConf().getInt(SLSConfiguration.NM_VCORES, |
| SLSConfiguration.NM_VCORES_DEFAULT); |
| } else { |
| value = getConf().getLong(SLSConfiguration.NM_PREFIX + |
| info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); |
| } |
| |
| resource.setResourceValue(info.getName(), value); |
| } |
| |
| return resource; |
| } |
| |
| /** |
| * @return an unmodifiable view of the simulated info map. |
| */ |
| public static Map<String, Object> getSimulateInfoMap() { |
| return Collections.unmodifiableMap(simulateInfoMap); |
| } |
| |
| public void setSimulationParams(TraceType inType, String[] inTraces, |
| String nodes, String outDir, Set<String> trackApps, |
| boolean printsimulation) throws IOException, ClassNotFoundException { |
| |
| this.inputType = inType; |
| this.inputTraces = inTraces.clone(); |
| this.nodeFile = nodes; |
| this.trackedApps = trackApps; |
| this.printSimulation = printsimulation; |
| metricsOutputDir = outDir; |
| |
| } |
| |
| public void start() throws IOException, ClassNotFoundException, YarnException, |
| InterruptedException { |
| // start resource manager |
| startRM(); |
| // start node managers |
| startNM(); |
| // start application masters |
| startAM(); |
| // set queue & tracked apps information |
| ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() |
| .setQueueSet(this.queueAppNumMap.keySet()); |
| ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() |
| .setTrackedAppSet(this.trackedApps); |
| // print out simulation info |
| printSimulationInfo(); |
| // blocked until all nodes RUNNING |
| waitForNodesRunning(); |
| // starting the runner once everything is ready to go, |
| runner.start(); |
| } |
| |
| private void startRM() throws ClassNotFoundException, YarnException { |
| Configuration rmConf = new YarnConfiguration(getConf()); |
| String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); |
| |
| if (Class.forName(schedulerClass) == CapacityScheduler.class) { |
| rmConf.set(YarnConfiguration.RM_SCHEDULER, |
| SLSCapacityScheduler.class.getName()); |
| rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); |
| rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, |
| ProportionalCapacityPreemptionPolicy.class.getName()); |
| } else if (Class.forName(schedulerClass) == FairScheduler.class) { |
| rmConf.set(YarnConfiguration.RM_SCHEDULER, |
| SLSFairScheduler.class.getName()); |
| } else if (Class.forName(schedulerClass) == FifoScheduler.class) { |
| // TODO add support for FifoScheduler |
| throw new YarnException("Fifo Scheduler is not supported yet."); |
| } |
| |
| rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); |
| |
| final SLSRunner se = this; |
| rm = new ResourceManager() { |
| @Override |
| protected ApplicationMasterLauncher createAMLauncher() { |
| return new MockAMLauncher(se, this.rmContext, amMap); |
| } |
| }; |
| |
| // Across runs of parametrized tests, the JvmMetrics objects is retained, |
| // but is not registered correctly |
| JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); |
| jvmMetrics.registerIfNeeded(); |
| |
| // Init and start the actual ResourceManager |
| rm.init(rmConf); |
| rm.start(); |
| } |
| |
| private void startNM() throws YarnException, IOException { |
| // nm configuration |
| int heartbeatInterval = getConf().getInt( |
| SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, |
| SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); |
| float resourceUtilizationRatio = getConf().getFloat( |
| SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, |
| SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); |
| // nm information (fetch from topology file, or from sls/rumen json file) |
| Map<String, Resource> nodeResourceMap = new HashMap<>(); |
| Set<? extends String> nodeSet; |
| if (nodeFile.isEmpty()) { |
| for (String inputTrace : inputTraces) { |
| switch (inputType) { |
| case SLS: |
| nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); |
| for (String node : nodeSet) { |
| nodeResourceMap.put(node, null); |
| } |
| break; |
| case RUMEN: |
| nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); |
| for (String node : nodeSet) { |
| nodeResourceMap.put(node, null); |
| } |
| break; |
| case SYNTH: |
| stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); |
| nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), |
| stjp.getNumNodes()/stjp.getNodesPerRack()); |
| for (String node : nodeSet) { |
| nodeResourceMap.put(node, null); |
| } |
| break; |
| default: |
| throw new YarnException("Input configuration not recognized, " |
| + "trace type should be SLS, RUMEN, or SYNTH"); |
| } |
| } |
| } else { |
| nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, |
| nodeManagerResource); |
| } |
| |
| if (nodeResourceMap.size() == 0) { |
| throw new YarnException("No node! Please configure nodes."); |
| } |
| |
| // create NM simulators |
| Random random = new Random(); |
| Set<String> rackSet = new HashSet<String>(); |
| for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) { |
| // we randomize the heartbeat start time from zero to 1 interval |
| NMSimulator nm = new NMSimulator(); |
| Resource nmResource = nodeManagerResource; |
| String hostName = entry.getKey(); |
| if (entry.getValue() != null) { |
| nmResource = entry.getValue(); |
| } |
| nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), |
| heartbeatInterval, rm, resourceUtilizationRatio); |
| nmMap.put(nm.getNode().getNodeID(), nm); |
| runner.schedule(nm); |
| rackSet.add(nm.getNode().getRackName()); |
| } |
| numRacks = rackSet.size(); |
| numNMs = nmMap.size(); |
| } |
| |
| private void waitForNodesRunning() throws InterruptedException { |
| long startTimeMS = System.currentTimeMillis(); |
| while (true) { |
| int numRunningNodes = 0; |
| for (RMNode node : rm.getRMContext().getRMNodes().values()) { |
| if (node.getState() == NodeState.RUNNING) { |
| numRunningNodes++; |
| } |
| } |
| if (numRunningNodes == numNMs) { |
| break; |
| } |
| LOG.info("SLSRunner is waiting for all nodes RUNNING." |
| + " {} of {} NMs initialized.", numRunningNodes, numNMs); |
| Thread.sleep(1000); |
| } |
| LOG.info("SLSRunner takes {} ms to launch all nodes.", |
| System.currentTimeMillis() - startTimeMS); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void startAM() throws YarnException, IOException { |
| switch (inputType) { |
| case SLS: |
| for (String inputTrace : inputTraces) { |
| startAMFromSLSTrace(inputTrace); |
| } |
| break; |
| case RUMEN: |
| long baselineTimeMS = 0; |
| for (String inputTrace : inputTraces) { |
| startAMFromRumenTrace(inputTrace, baselineTimeMS); |
| } |
| break; |
| case SYNTH: |
| startAMFromSynthGenerator(); |
| break; |
| default: |
| throw new YarnException("Input configuration not recognized, " |
| + "trace type should be SLS, RUMEN, or SYNTH"); |
| } |
| |
| numAMs = amMap.size(); |
| remainingApps = numAMs; |
| } |
| |
| /** |
| * Parse workload from a SLS trace file. |
| */ |
| @SuppressWarnings("unchecked") |
| private void startAMFromSLSTrace(String inputTrace) throws IOException { |
| JsonFactory jsonF = new JsonFactory(); |
| ObjectMapper mapper = new ObjectMapper(); |
| |
| try (Reader input = new InputStreamReader( |
| new FileInputStream(inputTrace), "UTF-8")) { |
| Iterator<Map> jobIter = mapper.readValues( |
| jsonF.createParser(input), Map.class); |
| |
| while (jobIter.hasNext()) { |
| try { |
| createAMForJob(jobIter.next()); |
| } catch (Exception e) { |
| LOG.error("Failed to create an AM: {}", e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| private void createAMForJob(Map jsonJob) throws YarnException { |
| long jobStartTime = Long.parseLong( |
| jsonJob.get(SLSConfiguration.JOB_START_MS).toString()); |
| |
| long jobFinishTime = 0; |
| if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) { |
| jobFinishTime = Long.parseLong( |
| jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); |
| } |
| |
| String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); |
| if (user == null) { |
| user = "default"; |
| } |
| |
| String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString(); |
| increaseQueueAppNum(queue); |
| |
| String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE); |
| if (amType == null) { |
| amType = SLSUtils.DEFAULT_JOB_TYPE; |
| } |
| |
| int jobCount = 1; |
| if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) { |
| jobCount = Integer.parseInt( |
| jsonJob.get(SLSConfiguration.JOB_COUNT).toString()); |
| } |
| jobCount = Math.max(jobCount, 1); |
| |
| String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID); |
| // Job id is generated automatically if this job configuration allows |
| // multiple job instances |
| if(jobCount > 1) { |
| oldAppId = null; |
| } |
| |
| for (int i = 0; i < jobCount; i++) { |
| runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, |
| getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); |
| } |
| } |
| |
| private List<ContainerSimulator> getTaskContainers(Map jsonJob) |
| throws YarnException { |
| List<ContainerSimulator> containers = new ArrayList<>(); |
| List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); |
| if (tasks == null || tasks.size() == 0) { |
| throw new YarnException("No task for the job!"); |
| } |
| |
| for (Object o : tasks) { |
| Map jsonTask = (Map) o; |
| |
| String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); |
| |
| long duration = 0; |
| if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { |
| duration = Integer.parseInt( |
| jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); |
| } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { |
| // Also support "duration.ms" for backward compatibility |
| duration = Integer.parseInt( |
| jsonTask.get(SLSConfiguration.DURATION_MS).toString()); |
| } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && |
| jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { |
| long taskStart = Long.parseLong( |
| jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); |
| long taskFinish = Long.parseLong( |
| jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); |
| duration = taskFinish - taskStart; |
| } |
| if (duration <= 0) { |
| throw new YarnException("Duration of a task shouldn't be less or equal" |
| + " to 0!"); |
| } |
| |
| Resource res = getResourceForContainer(jsonTask); |
| |
| int priority = DEFAULT_MAPPER_PRIORITY; |
| if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { |
| priority = Integer.parseInt( |
| jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); |
| } |
| |
| String type = "map"; |
| if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { |
| type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); |
| } |
| |
| int count = 1; |
| if (jsonTask.containsKey(SLSConfiguration.COUNT)) { |
| count = Integer.parseInt( |
| jsonTask.get(SLSConfiguration.COUNT).toString()); |
| } |
| count = Math.max(count, 1); |
| |
| ExecutionType executionType = ExecutionType.GUARANTEED; |
| if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { |
| executionType = ExecutionType.valueOf( |
| jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); |
| } |
| for (int i = 0; i < count; i++) { |
| containers.add( |
| new ContainerSimulator(res, duration, hostname, priority, type, |
| executionType)); |
| } |
| } |
| |
| return containers; |
| } |
| |
| private Resource getResourceForContainer(Map jsonTask) { |
| Resource res = getDefaultContainerResource(); |
| ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); |
| for (ResourceInformation info : infors) { |
| if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { |
| long value = Long.parseLong( |
| jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) |
| .toString()); |
| res.setResourceValue(info.getName(), value); |
| } |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Parse workload from a rumen trace file. |
| */ |
| @SuppressWarnings("unchecked") |
| private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) |
| throws IOException { |
| Configuration conf = new Configuration(); |
| conf.set("fs.defaultFS", "file:///"); |
| File fin = new File(inputTrace); |
| |
| try (JobTraceReader reader = new JobTraceReader( |
| new Path(fin.getAbsolutePath()), conf)) { |
| LoggedJob job = reader.getNext(); |
| |
| while (job != null) { |
| try { |
| createAMForJob(job, baselineTimeMS); |
| } catch (Exception e) { |
| LOG.error("Failed to create an AM: {}", e.getMessage()); |
| } |
| |
| job = reader.getNext(); |
| } |
| } |
| } |
| |
| private void createAMForJob(LoggedJob job, long baselineTimeMs) |
| throws YarnException { |
| String user = job.getUser() == null ? "default" : |
| job.getUser().getValue(); |
| String jobQueue = job.getQueue().getValue(); |
| String oldJobId = job.getJobID().toString(); |
| long jobStartTimeMS = job.getSubmitTime(); |
| long jobFinishTimeMS = job.getFinishTime(); |
| if (baselineTimeMs == 0) { |
| baselineTimeMs = job.getSubmitTime(); |
| } |
| jobStartTimeMS -= baselineTimeMs; |
| jobFinishTimeMS -= baselineTimeMs; |
| if (jobStartTimeMS < 0) { |
| LOG.warn("Warning: reset job {} start time to 0.", oldJobId); |
| jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; |
| jobStartTimeMS = 0; |
| } |
| |
| increaseQueueAppNum(jobQueue); |
| |
| List<ContainerSimulator> containerList = new ArrayList<>(); |
| // mapper |
| for (LoggedTask mapTask : job.getMapTasks()) { |
| if (mapTask.getAttempts().size() == 0) { |
| throw new YarnException("Invalid map task, no attempt for a mapper!"); |
| } |
| LoggedTaskAttempt taskAttempt = |
| mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); |
| String hostname = taskAttempt.getHostName().getValue(); |
| long containerLifeTime = taskAttempt.getFinishTime() - |
| taskAttempt.getStartTime(); |
| containerList.add( |
| new ContainerSimulator(getDefaultContainerResource(), |
| containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); |
| } |
| |
| // reducer |
| for (LoggedTask reduceTask : job.getReduceTasks()) { |
| if (reduceTask.getAttempts().size() == 0) { |
| throw new YarnException( |
| "Invalid reduce task, no attempt for a reducer!"); |
| } |
| LoggedTaskAttempt taskAttempt = |
| reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); |
| String hostname = taskAttempt.getHostName().getValue(); |
| long containerLifeTime = taskAttempt.getFinishTime() - |
| taskAttempt.getStartTime(); |
| containerList.add( |
| new ContainerSimulator(getDefaultContainerResource(), |
| containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); |
| } |
| |
| // Only supports the default job type currently |
| runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, |
| jobStartTimeMS, jobFinishTimeMS, containerList, |
| getAMContainerResource(null)); |
| } |
| |
| private Resource getDefaultContainerResource() { |
| int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, |
| SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); |
| int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, |
| SLSConfiguration.CONTAINER_VCORES_DEFAULT); |
| return Resources.createResource(containerMemory, containerVCores); |
| } |
| |
| /** |
| * parse workload information from synth-generator trace files. |
| */ |
| @SuppressWarnings("unchecked") |
| private void startAMFromSynthGenerator() throws YarnException, IOException { |
| Configuration localConf = new Configuration(); |
| localConf.set("fs.defaultFS", "file:///"); |
| long baselineTimeMS = 0; |
| |
| // if we use the nodeFile this could have been not initialized yet. |
| if (stjp == null) { |
| stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); |
| } |
| |
| SynthJob job = null; |
| // we use stjp, a reference to the job producer instantiated during node |
| // creation |
| while ((job = (SynthJob) stjp.getNextJob()) != null) { |
| // only support MapReduce currently |
| String user = job.getUser(); |
| String jobQueue = job.getQueueName(); |
| String oldJobId = job.getJobID().toString(); |
| long jobStartTimeMS = job.getSubmissionTime(); |
| |
| // CARLO: Finish time is only used for logging, omit for now |
| long jobFinishTimeMS = jobStartTimeMS + job.getDuration(); |
| |
| if (baselineTimeMS == 0) { |
| baselineTimeMS = jobStartTimeMS; |
| } |
| jobStartTimeMS -= baselineTimeMS; |
| jobFinishTimeMS -= baselineTimeMS; |
| if (jobStartTimeMS < 0) { |
| LOG.warn("Warning: reset job {} start time to 0.", oldJobId); |
| jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; |
| jobStartTimeMS = 0; |
| } |
| |
| increaseQueueAppNum(jobQueue); |
| |
| List<ContainerSimulator> containerList = |
| new ArrayList<ContainerSimulator>(); |
| ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet()); |
| Random rand = new Random(stjp.getSeed()); |
| |
| for (SynthJob.SynthTask task : job.getTasks()) { |
| RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) |
| .getNode(); |
| String hostname = "/" + node.getRackName() + "/" + node.getHostName(); |
| long containerLifeTime = task.getTime(); |
| Resource containerResource = Resource |
| .newInstance((int) task.getMemory(), (int) task.getVcores()); |
| containerList.add( |
| new ContainerSimulator(containerResource, containerLifeTime, |
| hostname, task.getPriority(), task.getType(), |
| task.getExecutionType())); |
| } |
| |
| |
| ReservationId reservationId = null; |
| |
| if(job.hasDeadline()){ |
| reservationId = ReservationId |
| .newInstance(this.rm.getStartTime(), AM_ID); |
| } |
| |
| runNewAM(job.getType(), user, jobQueue, oldJobId, |
| jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, |
| job.getDeadline(), getAMContainerResource(null), |
| job.getParams()); |
| } |
| } |
| |
| private Resource getAMContainerResource(Map jsonJob) { |
| Resource amContainerResource = |
| SLSConfiguration.getAMContainerResource(getConf()); |
| |
| if (jsonJob == null) { |
| return amContainerResource; |
| } |
| |
| ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); |
| for (ResourceInformation info : infors) { |
| String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); |
| if (jsonJob.containsKey(key)) { |
| long value = Long.parseLong(jsonJob.get(key).toString()); |
| amContainerResource.setResourceValue(info.getName(), value); |
| } |
| } |
| |
| return amContainerResource; |
| } |
| |
| private void increaseQueueAppNum(String queue) throws YarnException { |
| SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); |
| String queueName = wrapper.getRealQueueName(queue); |
| Integer appNum = queueAppNumMap.get(queueName); |
| if (appNum == null) { |
| appNum = 1; |
| } else { |
| appNum++; |
| } |
| |
| queueAppNumMap.put(queueName, appNum); |
| wrapper.getSchedulerMetrics().trackQueue(queueName); |
| } |
| |
| private void runNewAM(String jobType, String user, |
| String jobQueue, String oldJobId, long jobStartTimeMS, |
| long jobFinishTimeMS, List<ContainerSimulator> containerList, |
| Resource amContainerResource) { |
| runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, |
| jobFinishTimeMS, containerList, null, -1, |
| amContainerResource, null); |
| } |
| |
| private void runNewAM(String jobType, String user, |
| String jobQueue, String oldJobId, long jobStartTimeMS, |
| long jobFinishTimeMS, List<ContainerSimulator> containerList, |
| ReservationId reservationId, long deadline, Resource amContainerResource, |
| Map<String, String> params) { |
| |
| AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( |
| amClassMap.get(jobType), new Configuration()); |
| |
| if (amSim != null) { |
| int heartbeatInterval = getConf().getInt( |
| SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, |
| SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); |
| boolean isTracked = trackedApps.contains(oldJobId); |
| |
| if (oldJobId == null) { |
| oldJobId = Integer.toString(AM_ID); |
| } |
| AM_ID++; |
| amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, |
| jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, |
| runner.getStartTimeMS(), amContainerResource, params); |
| if(reservationId != null) { |
| // if we have a ReservationId, delegate reservation creation to |
| // AMSim (reservation shape is impl specific) |
| UTCClock clock = new UTCClock(); |
| amSim.initReservation(reservationId, deadline, clock.getTime()); |
| } |
| runner.schedule(amSim); |
| maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); |
| numTasks += containerList.size(); |
| amMap.put(oldJobId, amSim); |
| } |
| } |
| |
| private void printSimulationInfo() { |
| if (printSimulation) { |
| // node |
| LOG.info("------------------------------------"); |
| LOG.info("# nodes = {}, # racks = {}, capacity " + |
| "of each node {}.", |
| numNMs, numRacks, nodeManagerResource); |
| LOG.info("------------------------------------"); |
| // job |
| LOG.info("# applications = {}, # total " + |
| "tasks = {}, average # tasks per application = {}", |
| numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs))); |
| LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); |
| for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) { |
| AMSimulator am = entry.getValue(); |
| LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() |
| + "\t" + am.getDuration() + "\t" + am.getNumTasks()); |
| } |
| LOG.info("------------------------------------"); |
| // queue |
| LOG.info("number of queues = {} average number of apps = {}", |
| queueAppNumMap.size(), |
| (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); |
| LOG.info("------------------------------------"); |
| // runtime |
| LOG.info("estimated simulation time is {} seconds", |
| (long)(Math.ceil(maxRuntime / 1000.0))); |
| LOG.info("------------------------------------"); |
| } |
| // package these information in the simulateInfoMap used by other places |
| simulateInfoMap.put("Number of racks", numRacks); |
| simulateInfoMap.put("Number of nodes", numNMs); |
| simulateInfoMap.put("Node memory (MB)", |
| nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI)); |
| simulateInfoMap.put("Node VCores", |
| nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI)); |
| simulateInfoMap.put("Number of applications", numAMs); |
| simulateInfoMap.put("Number of tasks", numTasks); |
| simulateInfoMap.put("Average tasks per applicaion", |
| (int)(Math.ceil((numTasks + 0.0) / numAMs))); |
| simulateInfoMap.put("Number of queues", queueAppNumMap.size()); |
| simulateInfoMap.put("Average applications per queue", |
| (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); |
| simulateInfoMap.put("Estimated simulate time (s)", |
| (long)(Math.ceil(maxRuntime / 1000.0))); |
| } |
| |
| public HashMap<NodeId, NMSimulator> getNmMap() { |
| return nmMap; |
| } |
| |
| public static void decreaseRemainingApps() { |
| remainingApps--; |
| |
| if (remainingApps == 0) { |
| LOG.info("SLSRunner tears down."); |
| if (exitAtTheFinish) { |
| System.exit(0); |
| } |
| } |
| } |
| |
| public void stop() throws InterruptedException { |
| rm.stop(); |
| runner.stop(); |
| } |
| |
| public int run(final String[] argv) throws IOException, InterruptedException, |
| ParseException, ClassNotFoundException, YarnException { |
| |
| Options options = new Options(); |
| |
| // Left for compatibility |
| options.addOption("inputrumen", true, "input rumen files"); |
| options.addOption("inputsls", true, "input sls files"); |
| |
| // New more general format |
| options.addOption("tracetype", true, "the type of trace"); |
| options.addOption("tracelocation", true, "input trace files"); |
| |
| options.addOption("nodes", true, "input topology"); |
| options.addOption("output", true, "output directory"); |
| options.addOption("trackjobs", true, |
| "jobs to be tracked during simulating"); |
| options.addOption("printsimulation", false, |
| "print out simulation information"); |
| |
| CommandLineParser parser = new GnuParser(); |
| CommandLine cmd = parser.parse(options, argv); |
| |
| String traceType = null; |
| String traceLocation = null; |
| |
| // compatibility with old commandline |
| if (cmd.hasOption("inputrumen")) { |
| traceType = "RUMEN"; |
| traceLocation = cmd.getOptionValue("inputrumen"); |
| } |
| if (cmd.hasOption("inputsls")) { |
| traceType = "SLS"; |
| traceLocation = cmd.getOptionValue("inputsls"); |
| } |
| |
| if (cmd.hasOption("tracetype")) { |
| traceType = cmd.getOptionValue("tracetype"); |
| traceLocation = cmd.getOptionValue("tracelocation"); |
| } |
| |
| String output = cmd.getOptionValue("output"); |
| |
| File outputFile = new File(output); |
| if (!outputFile.exists() && !outputFile.mkdirs()) { |
| System.err.println("ERROR: Cannot create output directory " |
| + outputFile.getAbsolutePath()); |
| throw new YarnException("Cannot create output directory"); |
| } |
| |
| Set<String> trackedJobSet = new HashSet<String>(); |
| if (cmd.hasOption("trackjobs")) { |
| String trackjobs = cmd.getOptionValue("trackjobs"); |
| String jobIds[] = trackjobs.split(","); |
| trackedJobSet.addAll(Arrays.asList(jobIds)); |
| } |
| |
| String tempNodeFile = |
| cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; |
| |
| TraceType tempTraceType = TraceType.SLS; |
| switch (traceType) { |
| case "SLS": |
| tempTraceType = TraceType.SLS; |
| break; |
| case "RUMEN": |
| tempTraceType = TraceType.RUMEN; |
| break; |
| |
| case "SYNTH": |
| tempTraceType = TraceType.SYNTH; |
| break; |
| default: |
| printUsage(); |
| throw new YarnException("Misconfigured input"); |
| } |
| |
| String[] inputFiles = traceLocation.split(","); |
| |
| setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, |
| trackedJobSet, cmd.hasOption("printsimulation")); |
| |
| start(); |
| |
| return 0; |
| } |
| |
| public static void main(String[] argv) throws Exception { |
| exitAtTheFinish = true; |
| ToolRunner.run(new Configuration(), new SLSRunner(), argv); |
| } |
| |
| static void printUsage() { |
| System.err.println(); |
| System.err.println("ERROR: Wrong tracetype"); |
| System.err.println(); |
| System.err.println( |
| "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " |
| + "(deprecated alternative options --inputsls FILE, FILE,... " |
| + " | --inputrumen FILE,FILE,...)" |
| + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " |
| + "[-printsimulation]"); |
| System.err.println(); |
| } |
| |
| } |