blob: b48be8f9e1e28a6a89f1eb54f28e319e1edf654d [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
public class Joblet {
private static final long serialVersionUID = 1L;
private final NodeControllerService nodeController;
private final UUID jobId;
private final Map<UUID, Stagelet> stageletMap;
private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
public Joblet(NodeControllerService nodeController, UUID jobId) {
this.nodeController = nodeController;
this.jobId = jobId;
stageletMap = new HashMap<UUID, Stagelet>();
envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
}
public UUID getJobId() {
return jobId;
}
public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
if (!envMap.containsKey(hod.getOperatorId())) {
envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
opEnvMap.put(partition, new OperatorEnvironmentImpl());
}
return opEnvMap.get(partition);
}
private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
private final Map<String, Object> map;
public OperatorEnvironmentImpl() {
map = new HashMap<String, Object>();
}
@Override
public Object get(String name) {
return map.get(name);
}
@Override
public void set(String name, Object value) {
map.put(name, value);
}
}
public void setStagelet(UUID stageId, Stagelet stagelet) {
stageletMap.put(stageId, stagelet);
}
public Stagelet getStagelet(UUID stageId) throws Exception {
return stageletMap.get(stageId);
}
public Executor getExecutor() {
return nodeController.getExecutor();
}
public synchronized void notifyStageletComplete(UUID stageId, int attempt, Map<String, Long> stats)
throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageFailed(jobId, stageId, attempt);
}
public NodeControllerService getNodeController() {
return nodeController;
}
public void dumpProfile(Map<String, Long> counterDump) {
Set<UUID> stageIds;
synchronized (this) {
stageIds = new HashSet<UUID>(stageletMap.keySet());
}
for (UUID stageId : stageIds) {
Stagelet si;
synchronized (this) {
si = stageletMap.get(stageId);
}
if (si != null) {
si.dumpProfile(counterDump);
}
}
}
}