| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.nc; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.PrintWriter; |
| import java.io.UnsupportedEncodingException; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.Hashtable; |
| import java.util.LinkedHashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Semaphore; |
| |
| import edu.uci.ics.hyracks.api.comm.IFrameReader; |
| import edu.uci.ics.hyracks.api.comm.IFrameWriter; |
| import edu.uci.ics.hyracks.api.comm.IPartitionCollector; |
| import edu.uci.ics.hyracks.api.context.IHyracksJobletContext; |
| import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; |
| import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId; |
| import edu.uci.ics.hyracks.api.dataflow.TaskId; |
| import edu.uci.ics.hyracks.api.dataflow.state.ITaskState; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksException; |
| import edu.uci.ics.hyracks.api.io.FileReference; |
| import edu.uci.ics.hyracks.api.io.IIOManager; |
| import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory; |
| import edu.uci.ics.hyracks.api.job.IOperatorEnvironment; |
| import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter; |
| import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext; |
| import edu.uci.ics.hyracks.api.partitions.PartitionId; |
| import edu.uci.ics.hyracks.api.resources.IDeallocatable; |
| import edu.uci.ics.hyracks.control.common.job.PartitionState; |
| import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter; |
| import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile; |
| import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile; |
| import edu.uci.ics.hyracks.control.nc.io.IOManager; |
| import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory; |
| import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry; |
| import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork; |
| import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork; |
| |
| public class Task implements IHyracksTaskContext, ICounterContext, Runnable { |
| private final Joblet joblet; |
| |
| private final TaskAttemptId taskAttemptId; |
| |
| private final String displayName; |
| |
| private final Executor executor; |
| |
| private final IWorkspaceFileFactory fileFactory; |
| |
| private final DefaultDeallocatableRegistry deallocatableRegistry; |
| |
| private final Map<String, Counter> counterMap; |
| |
| private final IOperatorEnvironment opEnv; |
| |
| private final Map<PartitionId, PartitionProfile> partitionSendProfile; |
| |
| private final Set<Thread> pendingThreads; |
| |
| private IPartitionCollector[] collectors; |
| |
| private IOperatorNodePushable operator; |
| |
| private volatile boolean failed; |
| |
| private ByteArrayOutputStream errorBaos; |
| |
| private PrintWriter errorWriter; |
| |
| private volatile boolean aborted; |
| |
| public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) { |
| this.joblet = joblet; |
| this.taskAttemptId = taskId; |
| this.displayName = displayName; |
| this.executor = executor; |
| fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager()); |
| deallocatableRegistry = new DefaultDeallocatableRegistry(); |
| counterMap = new HashMap<String, Counter>(); |
| opEnv = joblet.getEnvironment(); |
| partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>(); |
| pendingThreads = new LinkedHashSet<Thread>(); |
| failed = false; |
| errorBaos = new ByteArrayOutputStream(); |
| errorWriter = new PrintWriter(errorBaos, true); |
| } |
| |
| public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) { |
| this.collectors = collectors; |
| this.operator = operator; |
| } |
| |
| @Override |
| public ByteBuffer allocateFrame() { |
| return joblet.allocateFrame(); |
| } |
| |
| @Override |
| public int getFrameSize() { |
| return joblet.getFrameSize(); |
| } |
| |
| @Override |
| public IIOManager getIOManager() { |
| return joblet.getIOManager(); |
| } |
| |
| @Override |
| public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException { |
| return fileFactory.createUnmanagedWorkspaceFile(prefix); |
| } |
| |
| @Override |
| public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException { |
| return fileFactory.createManagedWorkspaceFile(prefix); |
| } |
| |
| @Override |
| public void registerDeallocatable(IDeallocatable deallocatable) { |
| deallocatableRegistry.registerDeallocatable(deallocatable); |
| } |
| |
| public void close() { |
| deallocatableRegistry.close(); |
| } |
| |
| @Override |
| public IHyracksJobletContext getJobletContext() { |
| return joblet; |
| } |
| |
| @Override |
| public TaskAttemptId getTaskAttemptId() { |
| return taskAttemptId; |
| } |
| |
| @Override |
| public ICounter getCounter(String name, boolean create) { |
| Counter counter = counterMap.get(name); |
| if (counter == null && create) { |
| counter = new Counter(name); |
| counterMap.put(name, counter); |
| } |
| return counter; |
| } |
| |
| @Override |
| public ICounterContext getCounterContext() { |
| return this; |
| } |
| |
| public Joblet getJoblet() { |
| return joblet; |
| } |
| |
| public Map<PartitionId, PartitionProfile> getPartitionSendProfile() { |
| return partitionSendProfile; |
| } |
| |
| public synchronized void dumpProfile(TaskProfile tProfile) { |
| Map<String, Long> dumpMap = tProfile.getCounters(); |
| for (Counter c : counterMap.values()) { |
| dumpMap.put(c.getName(), c.get()); |
| } |
| } |
| |
| public void setPartitionSendProfile(PartitionProfile profile) { |
| partitionSendProfile.put(profile.getPartitionId(), profile); |
| } |
| |
| public void start() throws HyracksException { |
| aborted = false; |
| executor.execute(this); |
| } |
| |
| public synchronized void abort() { |
| aborted = true; |
| for (IPartitionCollector c : collectors) { |
| c.abort(); |
| } |
| for (Thread t : pendingThreads) { |
| t.interrupt(); |
| } |
| } |
| |
| private synchronized void addPendingThread(Thread t) { |
| pendingThreads.add(t); |
| } |
| |
| private synchronized void removePendingThread(Thread t) { |
| pendingThreads.remove(t); |
| if (pendingThreads.isEmpty()) { |
| notifyAll(); |
| } |
| } |
| |
| public synchronized void waitForCompletion() throws InterruptedException { |
| while (!pendingThreads.isEmpty()) { |
| wait(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| Thread ct = Thread.currentThread(); |
| String threadName = ct.getName(); |
| addPendingThread(ct); |
| try { |
| ct.setName(displayName + ":" + taskAttemptId + ":" + 0); |
| operator.initialize(); |
| try { |
| if (collectors.length > 0) { |
| final Semaphore sem = new Semaphore(collectors.length - 1); |
| for (int i = 1; i < collectors.length; ++i) { |
| final IPartitionCollector collector = collectors[i]; |
| final IFrameWriter writer = operator.getInputFrameWriter(i); |
| sem.acquire(); |
| final int cIdx = i; |
| executor.execute(new Runnable() { |
| public void run() { |
| if (aborted) { |
| return; |
| } |
| Thread thread = Thread.currentThread(); |
| addPendingThread(thread); |
| String oldName = thread.getName(); |
| thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx); |
| try { |
| pushFrames(collector, writer); |
| } catch (HyracksDataException e) { |
| synchronized (Task.this) { |
| failed = true; |
| errorWriter.println("Exception caught by thread: " + thread.getName()); |
| e.printStackTrace(errorWriter); |
| errorWriter.println(); |
| } |
| } finally { |
| thread.setName(oldName); |
| sem.release(); |
| removePendingThread(thread); |
| } |
| } |
| }); |
| } |
| try { |
| pushFrames(collectors[0], operator.getInputFrameWriter(0)); |
| } finally { |
| sem.acquire(collectors.length - 1); |
| } |
| } |
| } finally { |
| operator.deinitialize(); |
| } |
| NodeControllerService ncs = joblet.getNodeController(); |
| ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this)); |
| } catch (Exception e) { |
| failed = true; |
| errorWriter.println("Exception caught by thread: " + ct.getName()); |
| e.printStackTrace(errorWriter); |
| errorWriter.println(); |
| } finally { |
| ct.setName(threadName); |
| close(); |
| removePendingThread(ct); |
| } |
| if (failed) { |
| errorWriter.close(); |
| NodeControllerService ncs = joblet.getNodeController(); |
| try { |
| ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8"))); |
| } catch (UnsupportedEncodingException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException { |
| if (aborted) { |
| return; |
| } |
| try { |
| collector.open(); |
| try { |
| joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, |
| PartitionState.STARTED); |
| IFrameReader reader = collector.getReader(); |
| reader.open(); |
| try { |
| writer.open(); |
| ByteBuffer buffer = allocateFrame(); |
| while (reader.nextFrame(buffer)) { |
| if (aborted) { |
| return; |
| } |
| buffer.flip(); |
| writer.nextFrame(buffer); |
| buffer.compact(); |
| } |
| writer.close(); |
| } catch (Exception e) { |
| writer.fail(); |
| throw e; |
| } finally { |
| reader.close(); |
| } |
| } finally { |
| collector.close(); |
| } |
| } catch (HyracksException e) { |
| throw new HyracksDataException(e); |
| } catch (Exception e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| |
| @Override |
| public void setTaskState(ITaskState taskState) { |
| opEnv.setTaskState(taskState); |
| } |
| |
| @Override |
| public ITaskState getTaskState(TaskId taskId) { |
| return opEnv.getTaskState(taskId); |
| } |
| } |