blob: 7a295ae6f8d9e115a1911830287ab2ead540483d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.Iterator;
import java.util.Random;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.SimulatorEvent;
import org.apache.hadoop.mapred.SimulatorEventQueue;
import org.apache.hadoop.mapred.JobCompleteEvent;
import org.apache.hadoop.mapred.SimulatorJobClient;
import org.apache.hadoop.mapred.SimulatorJobTracker;
import org.apache.hadoop.mapred.SimulatorTaskTracker;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.tools.rumen.ClusterStory;
import org.apache.hadoop.tools.rumen.ClusterTopologyReader;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.LoggedNetworkTopology;
import org.apache.hadoop.tools.rumen.MachineNode;
import org.apache.hadoop.tools.rumen.RackNode;
import org.apache.hadoop.tools.rumen.ZombieCluster;
import org.apache.hadoop.tools.rumen.RandomSeedGenerator;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* {@link SimulatorEngine} is the main class of the simulator. To launch the
* simulator, user can either run the main class directly with two parameters,
* input trace file and corresponding topology file, or use the script
* "bin/mumak.sh trace.json topology.json". Trace file and topology file are
* produced by rumen.
*/
public class SimulatorEngine extends Configured implements Tool {
public static final List<SimulatorEvent> EMPTY_EVENTS = new ArrayList<SimulatorEvent>();
/** Default number of milliseconds required to boot up the entire cluster. */
public static final int DEFAULT_CLUSTER_STARTUP_DURATION = 100*1000;
protected final SimulatorEventQueue queue = new SimulatorEventQueue();
String traceFile;
String topologyFile;
SimulatorJobTracker jt;
SimulatorJobClient jc;
boolean shutdown = false;
long terminateTime = Long.MAX_VALUE;
long currentTime;
/** The HashSet for storing all the simulated threads useful for
* job initialization for capacity scheduler.
*/
HashSet<SimulatorCSJobInitializationThread> threadSet;
/** The log object to send our messages to; only used for debugging. */
private static final Log LOG = LogFactory.getLog(SimulatorEngine.class);
/**
* Master random seed read from the configuration file, if present.
* It is (only) used for creating sub seeds for all the random number
* generators.
*/
long masterRandomSeed;
/**
* Start simulated task trackers based on topology.
* @param clusterStory the cluster topology.
* @param jobConf configuration object.
* @param now
* time stamp when the simulator is started, {@link SimulatorTaskTracker}s
* are started uniformly randomly spread in [now,now+startDuration).
* @return time stamp by which the entire cluster is booted up and all task
* trackers are sending hearbeats in their steady rate.
*/
long startTaskTrackers(ClusterStory cluster, JobConf jobConf, long now) {
/** port assigned to TTs, incremented by 1 for each TT */
int port = 10000;
int numTaskTrackers = 0;
Random random = new Random(RandomSeedGenerator.getSeed(
"forStartTaskTrackers()", masterRandomSeed));
final int startDuration = jobConf.getInt("mumak.cluster.startup.duration",
DEFAULT_CLUSTER_STARTUP_DURATION);
for (MachineNode node : cluster.getMachines()) {
jobConf.set("mumak.tasktracker.host.name", node.getName());
jobConf.set("mumak.tasktracker.tracker.name",
"tracker_" + node.getName() + ":localhost/127.0.0.1:" + port);
long subRandomSeed = RandomSeedGenerator.getSeed(
"forTaskTracker" + numTaskTrackers, masterRandomSeed);
jobConf.setLong("mumak.tasktracker.random.seed", subRandomSeed);
numTaskTrackers++;
port++;
SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, jobConf);
long firstHeartbeat = now + random.nextInt(startDuration);
queue.addAll(tt.init(firstHeartbeat));
}
// In startDuration + heartbeat interval of the full cluster time each
// TT is started up and told on its 2nd heartbeat to beat at a rate
// corresponding to the steady state of the cluster
long clusterSteady = now + startDuration + jt.getNextHeartbeatInterval();
return clusterSteady;
}
/**
* Reads a positive long integer from a configuration.
*
* @param Configuration conf configuration objects
* @param String propertyName name of the property
* @return time
*/
long getTimeProperty(Configuration conf, String propertyName,
long defaultValue)
throws IllegalArgumentException {
// possible improvement: change date format to human readable ?
long time = conf.getLong(propertyName, defaultValue);
if (time <= 0) {
throw new IllegalArgumentException(propertyName + "time must be positive: "
+ time);
}
return time;
}
/**
* Creates the configuration for mumak simulation. This is kept modular mostly for
* testing purposes. so that the standard configuration can be modified before passing
* it to the init() function.
* @return JobConf: the configuration for the SimulatorJobTracker
*/
JobConf createMumakConf() {
JobConf jobConf = new JobConf(getConf());
jobConf.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
jobConf.set("fs.default.name", "file:///");
jobConf.set("mapred.job.tracker", "localhost:8012");
jobConf.setInt("mapred.jobtracker.job.history.block.size", 512);
jobConf.setInt("mapred.jobtracker.job.history.buffer.size", 512);
jobConf.setLong("mapred.tasktracker.expiry.interval", 5000);
jobConf.setInt("mapred.reduce.copy.backoff", 4);
jobConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
jobConf.setUser("mumak");
jobConf.set("mapred.system.dir",
jobConf.get("hadoop.log.dir", "/tmp/hadoop-"+jobConf.getUser()) + "/mapred/system");
return jobConf;
}
/**
* Initialize components in the simulation.
* @throws InterruptedException
* @throws IOException if trace or topology files cannot be opened.
*/
void init() throws InterruptedException, IOException {
JobConf jobConf = createMumakConf();
init(jobConf);
}
/**
* Initiate components in the simulation. The JobConf is
* create separately and passed to the init().
* @param JobConf: The configuration for the jobtracker.
* @throws InterruptedException
* @throws IOException if trace or topology files cannot be opened.
*/
@SuppressWarnings("deprecation")
void init(JobConf jobConf) throws InterruptedException, IOException {
FileSystem lfs = FileSystem.getLocal(getConf());
Path logPath =
new Path(System.getProperty("hadoop.log.dir")).makeQualified(lfs);
jobConf.set("mapred.system.dir", logPath.toString());
jobConf.set("hadoop.job.history.location", (new Path(logPath, "history")
.toString()));
// start time for virtual clock
// possible improvement: set default value to sth more meaningful based on
// the 1st job
long now = getTimeProperty(jobConf, "mumak.start.time",
System.currentTimeMillis());
jt = SimulatorJobTracker.startTracker(jobConf, now, this);
jt.offerService();
masterRandomSeed = jobConf.getLong("mumak.random.seed", System.nanoTime());
// max Map/Reduce tasks per node
int maxMaps = getConf().getInt(
"mapred.tasktracker.map.tasks.maximum",
SimulatorTaskTracker.DEFAULT_MAP_SLOTS);
int maxReduces = getConf().getInt(
"mapred.tasktracker.reduce.tasks.maximum",
SimulatorTaskTracker.DEFAULT_REDUCE_SLOTS);
MachineNode defaultNode = new MachineNode.Builder("default", 2)
.setMapSlots(maxMaps).setReduceSlots(maxReduces).build();
LoggedNetworkTopology topology = new ClusterTopologyReader(new Path(
topologyFile), jobConf).get();
// Setting the static mapping before removing numeric IP hosts.
setStaticMapping(topology);
if (getConf().getBoolean("mumak.topology.filter-numeric-ips", true)) {
removeIpHosts(topology);
}
ZombieCluster cluster = new ZombieCluster(topology, defaultNode);
// create TTs based on topology.json
long firstJobStartTime = startTaskTrackers(cluster, jobConf, now);
long subRandomSeed = RandomSeedGenerator.getSeed("forSimulatorJobStoryProducer",
masterRandomSeed);
JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer(
new Path(traceFile), cluster, firstJobStartTime, jobConf, subRandomSeed);
final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy
.getPolicy(jobConf);
jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
queue.addAll(jc.init(firstJobStartTime));
//if the taskScheduler is CapacityTaskScheduler start off the JobInitialization
//threads too
if (jobConf.get("mapred.jobtracker.taskScheduler").equals
(CapacityTaskScheduler.class.getName())) {
LOG.info("CapacityScheduler used: starting simulatorThreads");
startSimulatorThreadsCapSched(now);
}
terminateTime = getTimeProperty(jobConf, "mumak.terminate.time",
Long.MAX_VALUE);
}
/**
* In this function, we collect the set of leaf queues from JobTracker, and
* for each of them creates a simulated thread that performs the same
* check as JobInitializationPoller.JobInitializationThread in Capacity Scheduler.
* @param now
* @throws IOException
*/
private void startSimulatorThreadsCapSched(long now) throws IOException {
Set<String> queueNames = jt.getQueueManager().getLeafQueueNames();
TaskScheduler taskScheduler = jt.getTaskScheduler();
threadSet = new HashSet<SimulatorCSJobInitializationThread>();
// We create a different thread for each queue and hold a
//reference to each of them
for (String jobQueue: queueNames) {
SimulatorCSJobInitializationThread capThread =
new SimulatorCSJobInitializationThread(taskScheduler,jobQueue);
threadSet.add(capThread);
queue.addAll(capThread.init(now));
}
}
/**
* The main loop of the simulation. First call init() to get objects ready,
* then go into the main loop, where {@link SimulatorEvent}s are handled removed from
* the {@link SimulatorEventQueue}, and new {@link SimulatorEvent}s are created and inserted
* into the {@link SimulatorEventQueue}.
* @throws IOException
* @throws InterruptedException
*/
void run() throws IOException, InterruptedException {
init();
for (SimulatorEvent next = queue.get(); next != null
&& next.getTimeStamp() < terminateTime && !shutdown; next = queue.get()) {
currentTime = next.getTimeStamp();
assert(currentTime == queue.getCurrentTime());
SimulatorEventListener listener = next.getListener();
List<SimulatorEvent> response = listener.accept(next);
queue.addAll(response);
}
summary(System.out);
}
/**
* Run after the main loop.
* @param out stream to output information about the simulation
*/
void summary(PrintStream out) {
out.println("Done, total events processed: " + queue.getEventCount());
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SimulatorEngine(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
parseParameters(args);
try {
run();
return 0;
} finally {
if (jt != null) {
jt.getTaskScheduler().terminate();
}
}
}
void parseParameters(String[] args) {
if (args.length != 2) {
throw new IllegalArgumentException("Usage: java ... SimulatorEngine trace.json topology.json");
}
traceFile = args[0];
topologyFile = args[1];
}
/**
* Called when a job is completed. Insert a {@link JobCompleteEvent} into the
* {@link SimulatorEventQueue}. This event will be picked up by
* {@link SimulatorJobClient}, which will in turn decide whether the
* simulation is done.
* @param jobStatus final status of a job, SUCCEEDED or FAILED
* @param timestamp time stamp when the job is completed
*/
void markCompletedJob(JobStatus jobStatus, long timestamp) {
queue.add(new JobCompleteEvent(jc, timestamp, jobStatus, this));
}
/**
* Called by {@link SimulatorJobClient} when the simulation is completed and
* should be stopped.
*/
void shutdown() {
shutdown = true;
}
/**
* Get the current virtual time of the on-going simulation. It is defined by
* the time stamp of the last event handled.
* @return the current virtual time
*/
long getCurrentTime() {
return currentTime;
}
// Due to HDFS-778, a node may appear in job history logs as both numeric
// ips and as host names. We remove them from the parsed network topology
// before feeding it to ZombieCluster.
static void removeIpHosts(LoggedNetworkTopology topology) {
for (Iterator<LoggedNetworkTopology> rackIt = topology.getChildren()
.iterator(); rackIt.hasNext();) {
LoggedNetworkTopology rack = rackIt.next();
List<LoggedNetworkTopology> nodes = rack.getChildren();
for (Iterator<LoggedNetworkTopology> it = nodes.iterator(); it.hasNext();) {
LoggedNetworkTopology node = it.next();
if (isIPAddress(node.getName())) {
it.remove();
}
}
if (nodes.isEmpty()) {
rackIt.remove();
}
}
}
static Pattern IP_PATTERN;
static {
// 0-255
String IPV4BK1 = "(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)";
// .b.c.d - where b/c/d are 0-255, and optionally adding two more
// backslashes before each period
String IPV4BKN = "(?:\\\\?\\." + IPV4BK1 + "){3}";
String IPV4_PATTERN = IPV4BK1 + IPV4BKN;
// first hexadecimal number
String IPV6BK1 = "(?:[0-9a-fA-F]{1,4})";
// remaining 7 hexadecimal numbers, each preceded with ":".
String IPV6BKN = "(?::" + IPV6BK1 + "){7}";
String IPV6_PATTERN = IPV6BK1 + IPV6BKN;
IP_PATTERN = Pattern.compile(
"^(?:" + IPV4_PATTERN + "|" + IPV6_PATTERN + ")$");
}
static boolean isIPAddress(String hostname) {
return IP_PATTERN.matcher(hostname).matches();
}
static void setStaticMapping(LoggedNetworkTopology topology) {
for (LoggedNetworkTopology rack : topology.getChildren()) {
for (LoggedNetworkTopology node : rack.getChildren()) {
StaticMapping.addNodeToRack(node.getName(),
new RackNode(rack.getName(), 1).getName());
}
}
}
}