blob: ba1c711409e32d5101a3c3ca98c4b808f127f625 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobAttempt;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.JobStage;
import edu.uci.ics.hyracks.control.cc.job.JobStageAttempt;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.Phase1Installer;
import edu.uci.ics.hyracks.control.cc.remote.ops.Phase2Installer;
import edu.uci.ics.hyracks.control.cc.remote.ops.Phase3Installer;
import edu.uci.ics.hyracks.control.cc.remote.ops.PortMapMergingAccumulator;
import edu.uci.ics.hyracks.control.cc.remote.ops.StageStarter;
import edu.uci.ics.hyracks.control.cc.scheduler.ISchedule;
public class ScheduleRunnableStagesEvent implements Runnable {
private static final Logger LOGGER = Logger.getLogger(ScheduleRunnableStagesEvent.class.getName());
private ClusterControllerService ccs;
private UUID jobId;
private int attempt;
public ScheduleRunnableStagesEvent(ClusterControllerService ccs, UUID jobId, int attempt) {
this.ccs = ccs;
this.jobId = jobId;
this.attempt = attempt;
}
@Override
public void run() {
JobRun run = ccs.getRunMap().get(jobId);
JobAttempt ja = run.getAttempts().get(attempt);
Set<UUID> pendingStages = ja.getPendingStageIds();
Set<UUID> scheduledStages = ja.getInProgressStageIds();
LOGGER.info(jobId + ":" + attempt + ":Pending stages: " + pendingStages + " Scheduled stages: "
+ scheduledStages);
if (pendingStages.size() == 1 && scheduledStages.isEmpty()) {
LOGGER.info(jobId + ":" + attempt + ":No more runnable stages");
ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobId, attempt, JobStatus.TERMINATED));
return;
}
Map<UUID, JobStageAttempt> stageAttemptMap = ja.getStageAttemptMap();
Set<JobStage> runnableStages = new HashSet<JobStage>();
ja.findRunnableStages(runnableStages);
LOGGER.info(jobId + ":" + attempt + ": Found " + runnableStages.size() + " runnable stages");
Set<JobStageAttempt> runnableStageAttempts = new HashSet<JobStageAttempt>();
for (JobStage rs : runnableStages) {
UUID stageId = rs.getId();
LOGGER.info("Runnable Stage: " + jobId + ":" + rs.getId());
pendingStages.remove(stageId);
scheduledStages.add(stageId);
JobStageAttempt jsa = new JobStageAttempt(rs, ja);
stageAttemptMap.put(stageId, jsa);
runnableStageAttempts.add(jsa);
}
try {
ccs.getScheduler().schedule(runnableStageAttempts);
} catch (HyracksException e) {
e.printStackTrace();
ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jobId, attempt));
return;
}
final JobPlan plan = run.getJobPlan();
for (final JobStageAttempt jsa : runnableStageAttempts) {
ISchedule schedule = jsa.getSchedule();
final Map<OperatorDescriptorId, Integer> partCountMap = new HashMap<OperatorDescriptorId, Integer>();
final Map<String, Map<ActivityNodeId, Set<Integer>>> targetMap = new HashMap<String, Map<ActivityNodeId, Set<Integer>>>();
for (ActivityNodeId aid : jsa.getJobStage().getTasks()) {
String[] locations = schedule.getPartitions(aid);
partCountMap.put(aid.getOperatorDescriptorId(), locations.length);
int nLoc = locations.length;
for (int i = 0; i < nLoc; ++i) {
Map<ActivityNodeId, Set<Integer>> target = targetMap.get(locations[i]);
if (target == null) {
target = new HashMap<ActivityNodeId, Set<Integer>>();
targetMap.put(locations[i], target);
}
Set<Integer> partIdxs = target.get(aid);
if (partIdxs == null) {
partIdxs = new HashSet<Integer>();
target.put(aid, partIdxs);
}
partIdxs.add(i);
}
}
Set<String> participatingNodeIds = ja.getParticipatingNodeIds();
for (String nid : targetMap.keySet()) {
ccs.getNodeMap().get(nid).getActiveJobIds().add(jobId);
participatingNodeIds.add(nid);
}
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
Phase1Installer p1is[] = new Phase1Installer[targetMap.size()];
int i = 0;
for (String nid : targetMap.keySet()) {
p1is[i] = new Phase1Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
.getJobStage().getId(), jsa.getJobAttempt().getAttempt(), targetMap.get(nid),
partCountMap);
++i;
}
LOGGER.info("Stage start - Phase 1");
try {
Map<PortInstanceId, Endpoint> globalPortMap = RemoteRunner.runRemote(ccs, p1is,
new PortMapMergingAccumulator());
Phase2Installer[] p2is = new Phase2Installer[targetMap.size()];
Phase3Installer[] p3is = new Phase3Installer[targetMap.size()];
StageStarter[] ss = new StageStarter[targetMap.size()];
i = 0;
for (String nid : targetMap.keySet()) {
p2is[i] = new Phase2Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
.getJobStage().getId(), targetMap.get(nid), partCountMap, globalPortMap);
p3is[i] = new Phase3Installer(nid, plan.getJobId(), jsa.getJobStage().getId());
ss[i] = new StageStarter(nid, plan.getJobId(), jsa.getJobStage().getId());
++i;
}
LOGGER.info("Stage start - Phase 2");
RemoteRunner.runRemote(ccs, p2is, null);
LOGGER.info("Stage start - Phase 3");
RemoteRunner.runRemote(ccs, p3is, null);
LOGGER.info("Stage start");
RemoteRunner.runRemote(ccs, ss, null);
LOGGER.info("Stage started");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}