blob: c6a03dce6f7fe9e69f19b9c5f4ace3ca7c0fc200 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class Joblet implements IHyracksJobletContext, ICounterContext {
private final NodeControllerService nodeController;
private final INCApplicationContext appCtx;
private final DeploymentId deploymentId;
private final JobId jobId;
private final ActivityClusterGraph acg;
private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
private final IOperatorEnvironment env;
private final Map<Object, IStateObject> stateObjectMap;
private final Map<TaskAttemptId, Task> taskMap;
private final Map<String, Counter> counterMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
private final IWorkspaceFileFactory fileFactory;
private final Object globalJobData;
private final IJobletEventListener jobletEventListener;
private final int frameSize;
private JobStatus cleanupStatus;
private boolean cleanupPending;
public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
INCApplicationContext appCtx, ActivityClusterGraph acg) {
this.nodeController = nodeController;
this.appCtx = appCtx;
this.deploymentId = deploymentId;
this.jobId = jobId;
this.frameSize = acg.getFrameSize();
this.acg = acg;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
env = new OperatorEnvironmentImpl(nodeController.getId());
stateObjectMap = new HashMap<Object, IStateObject>();
taskMap = new HashMap<TaskAttemptId, Task>();
counterMap = new HashMap<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
cleanupPending = false;
IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
if (jelf != null) {
IJobletEventListener listener = jelf.createListener(this);
this.jobletEventListener = listener;
listener.jobletStart();
} else {
jobletEventListener = null;
}
IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
}
@Override
public JobId getJobId() {
return jobId;
}
public ActivityClusterGraph getActivityClusterGraph() {
return acg;
}
public IOperatorEnvironment getEnvironment() {
return env;
}
public void addTask(Task task) {
taskMap.put(task.getTaskAttemptId(), task);
}
public void removeTask(Task task) {
taskMap.remove(task.getTaskAttemptId());
if (cleanupPending && taskMap.isEmpty()) {
performCleanup();
}
}
public Map<TaskAttemptId, Task> getTaskMap() {
return taskMap;
}
private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
private final String nodeId;
public OperatorEnvironmentImpl(String nodeId) {
this.nodeId = nodeId;
}
public String toString() {
return super.toString() + "@" + nodeId;
}
@Override
public synchronized void setStateObject(IStateObject taskState) {
stateObjectMap.put(taskState.getId(), taskState);
}
@Override
public synchronized IStateObject getStateObject(Object id) {
return stateObjectMap.get(id);
}
}
public NodeControllerService getNodeController() {
return nodeController;
}
public void dumpProfile(JobletProfile jProfile) {
Map<String, Long> counters = jProfile.getCounters();
for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
counters.put(e.getKey(), e.getValue().get());
}
for (Task task : taskMap.values()) {
TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
new Hashtable<PartitionId, PartitionProfile>(task.getPartitionSendProfile()));
task.dumpProfile(taskProfile);
jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
}
}
@Override
public INCApplicationContext getApplicationContext() {
return appCtx;
}
@Override
public ICounterContext getCounterContext() {
return this;
}
@Override
public void registerDeallocatable(IDeallocatable deallocatable) {
deallocatableRegistry.registerDeallocatable(deallocatable);
}
public void close() {
nodeController.getExecutor().execute(new Runnable() {
@Override
public void run() {
deallocatableRegistry.close();
}
});
}
ByteBuffer allocateFrame() {
return ByteBuffer.allocate(getFrameSize());
}
int getFrameSize() {
return frameSize;
}
IIOManager getIOManager() {
return appCtx.getRootContext().getIOManager();
}
@Override
public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
return fileFactory.createManagedWorkspaceFile(prefix);
}
@Override
public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
return fileFactory.createUnmanagedWorkspaceFile(prefix);
}
@Override
public synchronized ICounter getCounter(String name, boolean create) {
Counter counter = counterMap.get(name);
if (counter == null && create) {
counter = new Counter(name);
counterMap.put(name, counter);
}
return counter;
}
@Override
public Object getGlobalJobData() {
return globalJobData;
}
public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
IPartitionCollector collector, PartitionState minState) throws Exception {
for (PartitionId pid : pids) {
partitionRequestMap.put(pid, collector);
PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
nodeController.getClusterController().registerPartitionRequest(req);
}
}
public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
IPartitionCollector collector = partitionRequestMap.get(channel.getPartitionId());
if (collector != null) {
collector.addPartitions(Collections.singleton(channel));
}
}
public IJobletEventListener getJobletEventListener() {
return jobletEventListener;
}
public void cleanup(JobStatus status) {
cleanupStatus = status;
cleanupPending = true;
if (taskMap.isEmpty()) {
performCleanup();
}
}
private void performCleanup() {
nodeController.getJobletMap().remove(jobId);
IJobletEventListener listener = getJobletEventListener();
if (listener != null) {
listener.jobletFinish(cleanupStatus);
}
close();
cleanupPending = false;
try {
nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Class<?> loadClass(String className) {
try {
return DeploymentUtils.loadClass(className, deploymentId, appCtx);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public ClassLoader getClassLoader() {
try {
return DeploymentUtils.getClassLoader(deploymentId, appCtx);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}