| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.ProtocolSignature; |
| import org.apache.hadoop.mapred.TaskStatus.State; |
| import org.apache.hadoop.mapred.TaskStatus.Phase; |
| import org.apache.hadoop.mapreduce.ClusterMetrics; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.JobPriority; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.mapreduce.QueueAclsInfo; |
| import org.apache.hadoop.mapreduce.QueueInfo; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.TaskReport; |
| import org.apache.hadoop.mapreduce.TaskTrackerInfo; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; |
| import org.apache.hadoop.mapreduce.protocol.ClientProtocol; |
| import org.apache.hadoop.tools.rumen.TaskInfo; |
| import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo; |
| import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo; |
| import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.mapreduce.split.JobSplit.*; |
| // |
| // Mock jobtracker class that check heartbeat() in parameters and |
| // sends responses based on a prepopulated table |
| // |
| public class MockSimulatorJobTracker implements InterTrackerProtocol, |
| ClientProtocol { |
| private final long simulationStartTime; |
| private final int heartbeatInterval; |
| |
| // Helper table, used iff checkHeartbeats == true |
| // Contains the expected task tracker status report at time t for all task |
| // trackers identified by their name and the heartbeat response to send |
| private SortedMap<Long, TreeMap<String, HeartbeatHelper>> heartbeats = |
| new TreeMap<Long, TreeMap<String, HeartbeatHelper>>(); |
| private final boolean checkHeartbeats; |
| private int jobId = 0; |
| |
| static final Log LOG = LogFactory.getLog(MockSimulatorJobTracker.class); |
| |
| public MockSimulatorJobTracker(long simulationStartTime, |
| int heartbeatInterval, |
| boolean checkHeartbeats) { |
| this.simulationStartTime = simulationStartTime; |
| this.heartbeatInterval = heartbeatInterval; |
| this.checkHeartbeats = checkHeartbeats; |
| } |
| |
| @Override |
| public JobID getNewJobID() throws IOException { |
| return new JobID("mockJT", jobId++); |
| } |
| |
| @Override |
| public JobStatus submitJob( |
| JobID jobId, String jobSubmitDir, Credentials ts) throws IOException { |
| JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f, |
| JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", ""); |
| return status; |
| } |
| |
| @Override |
| public HeartbeatResponse heartbeat(TaskTrackerStatus status, |
| boolean restarted, boolean initialContact, boolean acceptNewTasks, |
| short responseId) throws IOException { |
| if (!(status instanceof SimulatorTaskTrackerStatus)) { |
| throw new IllegalArgumentException( |
| "Expecting SimulatorTaskTrackerStatus, actual status type " |
| + status.getClass()); |
| } |
| SimulatorTaskTrackerStatus trackerStatus = |
| (SimulatorTaskTrackerStatus)status; |
| long now = trackerStatus.getCurrentSimulationTime(); |
| String trackerName = status.getTrackerName(); |
| |
| LOG.debug("Received heartbeat() from trackerName=" + trackerName + |
| ", now=" + now); |
| |
| HeartbeatResponse response = new HeartbeatResponse(); |
| response.setHeartbeatInterval(heartbeatInterval); |
| response.setActions(new TaskTrackerAction[0]); |
| |
| if (checkHeartbeats) { |
| Assert.assertFalse("No more heartbeats were expected ", heartbeats.isEmpty()); |
| long nextToCheck = heartbeats.firstKey(); |
| // Missing heartbeat check |
| Assert.assertTrue(nextToCheck <= now); |
| if (nextToCheck < now) { |
| LOG.debug("Simulation time progressed, last checked heartbeat at=" + |
| nextToCheck + ", now=" + now + ". Checking if no " + |
| "required heartbeats were missed in the past"); |
| SortedMap<String, HeartbeatHelper> previousHeartbeats = |
| heartbeats.get(nextToCheck); |
| Assert.assertNotNull(previousHeartbeats); |
| Assert.assertTrue(previousHeartbeats.isEmpty()); |
| heartbeats.remove(nextToCheck); |
| nextToCheck = heartbeats.firstKey(); |
| } |
| Assert.assertEquals("Heartbeat at the wrong time", nextToCheck, now); |
| |
| SortedMap<String, HeartbeatHelper> currentHeartbeats = |
| heartbeats.get(now); |
| HeartbeatHelper currentHeartbeat = currentHeartbeats.get(trackerName); |
| Assert.assertNotNull("Unknown task tracker name=" + trackerName, |
| currentHeartbeat); |
| currentHeartbeats.remove(trackerName); |
| |
| currentHeartbeat.checkHeartbeatParameters(status, acceptNewTasks); |
| |
| response.setActions(currentHeartbeat.getTaskTrackerActions()); |
| } |
| return response; |
| } |
| |
| // |
| // Populates the mock jobtracker's helper & checker table with expected |
| // empty reports from the task trackers and empty task actions to perform |
| // |
| public void expectEmptyHeartbeats(String taskTrackerName, |
| int numHeartbeats) { |
| long simulationTime = simulationStartTime; |
| for (int i=0; i<numHeartbeats; i++) { |
| TreeMap<String, HeartbeatHelper> hb = heartbeats.get(simulationTime); |
| if (hb == null) { |
| hb = new TreeMap<String, HeartbeatHelper>(); |
| heartbeats.put(simulationTime, hb); |
| } |
| hb.put(taskTrackerName, new HeartbeatHelper()); |
| simulationTime += heartbeatInterval; |
| } |
| } |
| |
| // Fills in all the expected and return heartbeat parameters corresponding |
| // to running a map task on a task tracker. |
| // Use killTime < 0 if not killed |
| public void runMapTask(String taskTrackerName, TaskAttemptID taskId, |
| long mapStart, long mapRuntime, long killHeartbeat) { |
| long mapDone = mapStart + mapRuntime; |
| long mapEndHeartbeat = nextHeartbeat(mapDone); |
| final boolean isKilled = (killHeartbeat>=0); |
| if (isKilled) { |
| mapEndHeartbeat = nextHeartbeat(killHeartbeat + 1); |
| } |
| |
| LOG.debug("mapStart=" + mapStart + ", mapDone=" + mapDone + |
| ", mapEndHeartbeat=" + mapEndHeartbeat + |
| ", killHeartbeat=" + killHeartbeat); |
| |
| final int numSlotsRequired = 1; |
| org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = |
| org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId); |
| Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, new TaskSplitIndex(), |
| numSlotsRequired); |
| // all byte counters are 0 |
| TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); |
| MapTaskAttemptInfo taskAttemptInfo = |
| new MapTaskAttemptInfo(State.SUCCEEDED, taskInfo, mapRuntime); |
| TaskTrackerAction action = |
| new SimulatorLaunchTaskAction(task, taskAttemptInfo); |
| heartbeats.get(mapStart).get(taskTrackerName).addTaskTrackerAction(action); |
| if (isKilled) { |
| action = new KillTaskAction(taskIdOldApi); |
| heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction( |
| action); |
| } |
| |
| for(long simulationTime = mapStart + heartbeatInterval; |
| simulationTime <= mapEndHeartbeat; |
| simulationTime += heartbeatInterval) { |
| State state = simulationTime < mapEndHeartbeat ? |
| State.RUNNING : State.SUCCEEDED; |
| if (simulationTime == mapEndHeartbeat && isKilled) { |
| state = State.KILLED; |
| } |
| MapTaskStatus mapStatus = new MapTaskStatus( |
| task.getTaskID(), 0.0f, 0, state, "", "", null, Phase.MAP, null); |
| heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport( |
| mapStatus); |
| } |
| } |
| |
| // Fills in all the expected and return heartbeat parameters corresponding |
| // to running a reduce task on a task tracker. |
| // Use killTime<0 if not killed |
| public void runReduceTask(String taskTrackerName, TaskAttemptID taskId, |
| long reduceStart, long mapDoneDelay, |
| long reduceRuntime, long killHeartbeat) { |
| long mapDone = nextHeartbeat(reduceStart + mapDoneDelay); |
| long reduceDone = mapDone + reduceRuntime; |
| long reduceEndHeartbeat = nextHeartbeat(reduceDone); |
| final boolean isKilled = (killHeartbeat>=0); |
| if (isKilled) { |
| reduceEndHeartbeat = nextHeartbeat(killHeartbeat + 1); |
| } |
| |
| LOG.debug("reduceStart=" + reduceStart + ", mapDone=" + mapDone + |
| ", reduceDone=" + reduceDone + |
| ", reduceEndHeartbeat=" + reduceEndHeartbeat + |
| ", killHeartbeat=" + killHeartbeat); |
| |
| final int numSlotsRequired = 1; |
| org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = |
| org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId); |
| Task task = new ReduceTask("dummyjobfile", taskIdOldApi, 0, 0, |
| numSlotsRequired); |
| // all byte counters are 0 |
| TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); |
| ReduceTaskAttemptInfo taskAttemptInfo = |
| new ReduceTaskAttemptInfo(State.SUCCEEDED, taskInfo, 0, 0, |
| reduceRuntime); |
| TaskTrackerAction action = |
| new SimulatorLaunchTaskAction(task, taskAttemptInfo); |
| heartbeats.get(reduceStart).get(taskTrackerName).addTaskTrackerAction( |
| action); |
| if (!isKilled || mapDone < killHeartbeat) { |
| action = new AllMapsCompletedTaskAction(task.getTaskID()); |
| heartbeats.get(mapDone).get(taskTrackerName).addTaskTrackerAction( |
| action); |
| } |
| if (isKilled) { |
| action = new KillTaskAction(taskIdOldApi); |
| heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction( |
| action); |
| } |
| |
| for(long simulationTime = reduceStart + heartbeatInterval; |
| simulationTime <= reduceEndHeartbeat; |
| simulationTime += heartbeatInterval) { |
| State state = simulationTime < reduceEndHeartbeat ? |
| State.RUNNING : State.SUCCEEDED; |
| if (simulationTime == reduceEndHeartbeat && isKilled) { |
| state = State.KILLED; |
| } |
| // mapDone is when the all maps done event delivered |
| Phase phase = simulationTime <= mapDone ? Phase.SHUFFLE : Phase.REDUCE; |
| ReduceTaskStatus reduceStatus = new ReduceTaskStatus( |
| task.getTaskID(), 0.0f, 0, state, "", "", null, phase, null); |
| heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport( |
| reduceStatus); |
| } |
| } |
| |
| // Should be called at the end of the simulation: Mock JT should have |
| // consumed all entries from the heartbeats table by that time |
| public void checkMissingHeartbeats() { |
| Assert.assertEquals(1, heartbeats.size()); |
| long lastHeartbeat = heartbeats.firstKey(); |
| Assert.assertTrue("Missing heartbeats, last heartbeat=" + lastHeartbeat, |
| heartbeats.get(lastHeartbeat).isEmpty()); |
| } |
| |
| // rounds up to the next heartbeat time |
| public long nextHeartbeat(long time) { |
| long numHeartbeats = (long)Math.ceil( |
| (time - simulationStartTime)/(double)heartbeatInterval); |
| return simulationStartTime + numHeartbeats * heartbeatInterval; |
| } |
| |
| // Rest of InterTrackerProtocol follows, unused in simulation |
| @Override |
| public String getFilesystemName() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void reportTaskTrackerError(String taskTracker, |
| String errorClass, |
| String errorMessage) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, |
| int fromEventId, int maxEvents) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String getSystemDir() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String getStagingAreaDir() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String getBuildVersion() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getProtocolVersion(String protocol, long clientVersion) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public TaskCompletionEvent[] getTaskCompletionEvents( |
| org.apache.hadoop.mapred.JobID jobid, int fromEventId, int maxEvents) |
| throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public TaskTrackerInfo[] getActiveTrackers() throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public JobStatus[] getAllJobs() throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public QueueInfo[] getChildQueues(String queueName) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public ClusterMetrics getClusterMetrics() throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Counters getJobCounters(JobID jobid) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String getJobHistoryDir() throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public JobStatus getJobStatus(JobID jobid) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public org.apache.hadoop.mapreduce.server.jobtracker.State getJobTrackerState() |
| throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public JobTrackerStatus getJobTrackerStatus() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public QueueInfo getQueue(String queueName) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| |
| } |
| |
| @Override |
| public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| |
| } |
| |
| @Override |
| public QueueInfo[] getQueues() throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| |
| } |
| |
| @Override |
| public QueueInfo[] getRootQueues() throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| |
| } |
| |
| @Override |
| public AccessControlList getQueueAdmins(String queueName) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public TaskReport[] getTaskReports(JobID jobid, TaskType type) |
| throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getTaskTrackerExpiryInterval() throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void killJob(JobID jobid) throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public boolean killTask(TaskAttemptID taskId, boolean shouldFail) |
| throws IOException, InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void setJobPriority(JobID jobid, String priority) throws IOException, |
| InterruptedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void cancelDelegationToken(Token<DelegationTokenIdentifier> token |
| ) throws IOException, |
| InterruptedException { |
| } |
| |
| @Override |
| public Token<DelegationTokenIdentifier> |
| getDelegationToken(Text renewer) throws IOException, InterruptedException { |
| return null; |
| } |
| |
| @Override |
| public long renewDelegationToken(Token<DelegationTokenIdentifier> token |
| ) throws IOException,InterruptedException{ |
| return 0; |
| } |
| |
| @Override |
| public ProtocolSignature getProtocolSignature(String protocol, |
| long clientVersion, int clientMethodsHash) throws IOException { |
| return ProtocolSignature.getProtocolSignature( |
| this, protocol, clientVersion, clientMethodsHash); |
| } |
| } |