blob: 6cc745067bdf486e119158ce09a40c51511f0785 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.api.job.profiling.om.StageletProfile;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.ManagedWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class Joblet implements IHyracksJobletContext, ICounterContext {
private static final long serialVersionUID = 1L;
private final NodeControllerService nodeController;
private final INCApplicationContext appCtx;
private final UUID jobId;
private final int attempt;
private final Map<UUID, Stagelet> stageletMap;
private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
private final Map<String, Counter> counterMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
private final IWorkspaceFileFactory fileFactory;
public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
this.jobId = jobId;
this.attempt = attempt;
stageletMap = new HashMap<UUID, Stagelet>();
envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
counterMap = new HashMap<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
}
@Override
public UUID getJobId() {
return jobId;
}
public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
if (!envMap.containsKey(hod.getOperatorId())) {
envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
opEnvMap.put(partition, new OperatorEnvironmentImpl());
}
return opEnvMap.get(partition);
}
private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
private final Map<String, Object> map;
public OperatorEnvironmentImpl() {
map = new HashMap<String, Object>();
}
@Override
public Object get(String name) {
return map.get(name);
}
@Override
public void set(String name, Object value) {
map.put(name, value);
}
}
public void setStagelet(UUID stageId, Stagelet stagelet) {
stageletMap.put(stageId, stagelet);
}
public Stagelet getStagelet(UUID stageId) throws Exception {
return stageletMap.get(stageId);
}
public Executor getExecutor() {
return nodeController.getExecutor();
}
public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageFailed(jobId, stageId, attempt);
}
public NodeControllerService getNodeController() {
return nodeController;
}
public synchronized void dumpProfile(JobletProfile jProfile) {
Map<String, Long> counters = jProfile.getCounters();
for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
counters.put(e.getKey(), e.getValue().get());
}
for (Stagelet si : stageletMap.values()) {
StageletProfile sProfile = new StageletProfile(si.getStageId());
si.dumpProfile(sProfile);
jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
}
}
@Override
public INCApplicationContext getApplicationContext() {
return appCtx;
}
@Override
public int getAttempt() {
return attempt;
}
@Override
public ICounterContext getCounterContext() {
return this;
}
@Override
public void registerDeallocatable(IDeallocatable deallocatable) {
deallocatableRegistry.registerDeallocatable(deallocatable);
}
public void close() {
deallocatableRegistry.close();
}
@Override
public ByteBuffer allocateFrame() {
return appCtx.getRootContext().allocateFrame();
}
@Override
public int getFrameSize() {
return appCtx.getRootContext().getFrameSize();
}
@Override
public IIOManager getIOManager() {
return appCtx.getRootContext().getIOManager();
}
@Override
public FileReference createWorkspaceFile(String prefix) throws HyracksDataException {
return fileFactory.createWorkspaceFile(prefix);
}
public Map<UUID, Stagelet> getStageletMap() {
return stageletMap;
}
@Override
public synchronized ICounter getCounter(String name, boolean create) {
Counter counter = counterMap.get(name);
if (counter == null && create) {
counter = new Counter(name);
counterMap.put(name, counter);
}
return counter;
}
}