| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.cc; |
| |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.logging.Logger; |
| |
| import jol.core.Runtime; |
| import jol.types.basic.BasicTupleSet; |
| import jol.types.basic.Tuple; |
| import jol.types.basic.TupleSet; |
| import jol.types.exception.BadKeyException; |
| import jol.types.exception.UpdateException; |
| import jol.types.table.BasicTable; |
| import jol.types.table.EventTable; |
| import jol.types.table.Function; |
| import jol.types.table.Key; |
| import jol.types.table.TableName; |
| |
| import org.json.JSONArray; |
| import org.json.JSONObject; |
| |
| import edu.uci.ics.hyracks.api.comm.Endpoint; |
| import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint; |
| import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint; |
| import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint; |
| import edu.uci.ics.hyracks.api.constraints.LocationConstraint; |
| import edu.uci.ics.hyracks.api.constraints.PartitionConstraint; |
| import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint; |
| import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId; |
| import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.Direction; |
| import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder; |
| import edu.uci.ics.hyracks.api.dataflow.IActivityNode; |
| import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.PortInstanceId; |
| import edu.uci.ics.hyracks.api.job.JobFlag; |
| import edu.uci.ics.hyracks.api.job.JobPlan; |
| import edu.uci.ics.hyracks.api.job.JobSpecification; |
| import edu.uci.ics.hyracks.api.job.JobStatus; |
| |
| public class JOLJobManagerImpl implements IJobManager { |
| private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName()); |
| |
| public static final String JOL_SCOPE = "hyrackscc"; |
| |
| private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/control/cc/scheduler.olg"; |
| |
| private final Runtime jolRuntime; |
| |
| private final LinkedBlockingQueue<Runnable> jobQueue; |
| |
| private final JobTable jobTable; |
| |
| private final JobQueueThread jobQueueThread; |
| |
| private final OperatorDescriptorTable odTable; |
| |
| private final OperatorLocationTable olTable; |
| |
| private final OperatorCloneCountTable ocTable; |
| |
| private final ConnectorDescriptorTable cdTable; |
| |
| private final ActivityNodeTable anTable; |
| |
| private final ActivityConnectionTable acTable; |
| |
| private final ActivityBlockedTable abTable; |
| |
| private final JobStartTable jobStartTable; |
| |
| private final JobCleanUpTable jobCleanUpTable; |
| |
| private final JobCleanUpCompleteTable jobCleanUpCompleteTable; |
| |
| private final StartMessageTable startMessageTable; |
| |
| private final StageletCompleteTable stageletCompleteTable; |
| |
| private final StageletFailureTable stageletFailureTable; |
| |
| private final AvailableNodesTable availableNodesTable; |
| |
| private final RankedAvailableNodesTable rankedAvailableNodesTable; |
| |
| private final FailedNodesTable failedNodesTable; |
| |
| private final AbortMessageTable abortMessageTable; |
| |
| private final AbortNotifyTable abortNotifyTable; |
| |
| private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction; |
| |
| private final ProfileUpdateTable puTable; |
| |
| private final List<String> rankedAvailableNodes; |
| |
| private final IJobManagerQueryInterface qi; |
| |
| public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception { |
| this.jolRuntime = jolRuntime; |
| jobQueue = new LinkedBlockingQueue<Runnable>(); |
| jobQueueThread = new JobQueueThread(); |
| jobQueueThread.start(); |
| |
| this.jobTable = new JobTable(jolRuntime); |
| this.odTable = new OperatorDescriptorTable(jolRuntime); |
| this.olTable = new OperatorLocationTable(jolRuntime); |
| this.ocTable = new OperatorCloneCountTable(jolRuntime); |
| this.cdTable = new ConnectorDescriptorTable(jolRuntime); |
| this.anTable = new ActivityNodeTable(jolRuntime); |
| this.acTable = new ActivityConnectionTable(jolRuntime); |
| this.abTable = new ActivityBlockedTable(jolRuntime); |
| this.jobStartTable = new JobStartTable(); |
| this.jobCleanUpTable = new JobCleanUpTable(jolRuntime); |
| this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable(); |
| this.startMessageTable = new StartMessageTable(jolRuntime); |
| this.stageletCompleteTable = new StageletCompleteTable(jolRuntime); |
| this.stageletFailureTable = new StageletFailureTable(jolRuntime); |
| this.availableNodesTable = new AvailableNodesTable(jolRuntime); |
| this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime); |
| this.failedNodesTable = new FailedNodesTable(jolRuntime); |
| this.abortMessageTable = new AbortMessageTable(jolRuntime); |
| this.abortNotifyTable = new AbortNotifyTable(jolRuntime); |
| this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction(); |
| this.puTable = new ProfileUpdateTable(); |
| this.rankedAvailableNodes = new ArrayList<String>(); |
| |
| jolRuntime.catalog().register(jobTable); |
| jolRuntime.catalog().register(odTable); |
| jolRuntime.catalog().register(olTable); |
| jolRuntime.catalog().register(ocTable); |
| jolRuntime.catalog().register(cdTable); |
| jolRuntime.catalog().register(anTable); |
| jolRuntime.catalog().register(acTable); |
| jolRuntime.catalog().register(abTable); |
| jolRuntime.catalog().register(jobStartTable); |
| jolRuntime.catalog().register(jobCleanUpTable); |
| jolRuntime.catalog().register(jobCleanUpCompleteTable); |
| jolRuntime.catalog().register(startMessageTable); |
| jolRuntime.catalog().register(stageletCompleteTable); |
| jolRuntime.catalog().register(stageletFailureTable); |
| jolRuntime.catalog().register(availableNodesTable); |
| jolRuntime.catalog().register(rankedAvailableNodesTable); |
| jolRuntime.catalog().register(failedNodesTable); |
| jolRuntime.catalog().register(abortMessageTable); |
| jolRuntime.catalog().register(abortNotifyTable); |
| jolRuntime.catalog().register(expandPartitionCountConstraintFunction); |
| jolRuntime.catalog().register(puTable); |
| |
| jobTable.register(new JobTable.Callback() { |
| @Override |
| public void deletion(TupleSet arg0) { |
| jobTable.notifyAll(); |
| } |
| |
| @Override |
| public void insertion(TupleSet arg0) { |
| jobTable.notifyAll(); |
| } |
| }); |
| |
| startMessageTable.register(new StartMessageTable.Callback() { |
| @Override |
| public void deletion(TupleSet tuples) { |
| |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void insertion(TupleSet tuples) { |
| for (final Tuple t : tuples) { |
| jobQueue.add(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Object[] data = t.toArray(); |
| UUID jobId = (UUID) data[0]; |
| UUID stageId = (UUID) data[1]; |
| Integer attempt = (Integer) data[2]; |
| String appName = (String) data[3]; |
| JobPlan plan = (JobPlan) data[4]; |
| Set<List> ts = (Set<List>) data[5]; |
| Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>(); |
| for (List t2 : ts) { |
| Object[] t2Data = t2.toArray(); |
| Set<List> activityInfoSet = (Set<List>) t2Data[1]; |
| for (List l : activityInfoSet) { |
| Object[] lData = l.toArray(); |
| ActivityNodeId aid = (ActivityNodeId) lData[0]; |
| Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId()); |
| if (opParts == null) { |
| opParts = new HashSet<Integer>(); |
| opPartitions.put(aid.getOperatorDescriptorId(), opParts); |
| } |
| opParts.add((Integer) lData[1]); |
| } |
| } |
| ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts |
| .size()]; |
| int i = 0; |
| for (List t2 : ts) { |
| Object[] t2Data = t2.toArray(); |
| Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>(); |
| Set<List> activityInfoSet = (Set<List>) t2Data[1]; |
| for (List l : activityInfoSet) { |
| Object[] lData = l.toArray(); |
| ActivityNodeId aid = (ActivityNodeId) lData[0]; |
| Set<Integer> aParts = tasks.get(aid); |
| if (aParts == null) { |
| aParts = new HashSet<Integer>(); |
| tasks.put(aid, aParts); |
| } |
| aParts.add((Integer) lData[1]); |
| } |
| p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId, |
| appName, plan, stageId, attempt, tasks, opPartitions); |
| } |
| LOGGER.info("Stage start - Phase 1"); |
| Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is, |
| new ClusterControllerService.PortMapMergingAccumulator()); |
| |
| ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts |
| .size()]; |
| ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts |
| .size()]; |
| ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts |
| .size()]; |
| i = 0; |
| for (List t2 : ts) { |
| Object[] t2Data = t2.toArray(); |
| Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>(); |
| Set<List> activityInfoSet = (Set<List>) t2Data[1]; |
| for (List l : activityInfoSet) { |
| Object[] lData = l.toArray(); |
| ActivityNodeId aid = (ActivityNodeId) lData[0]; |
| Set<Integer> aParts = tasks.get(aid); |
| if (aParts == null) { |
| aParts = new HashSet<Integer>(); |
| tasks.put(aid, aParts); |
| } |
| aParts.add((Integer) lData[1]); |
| } |
| p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, |
| appName, plan, stageId, tasks, opPartitions, globalPortMap); |
| p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId, |
| stageId); |
| ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, |
| stageId); |
| ++i; |
| } |
| LOGGER.info("Stage start - Phase 2"); |
| ccs.runRemote(p2is, null); |
| LOGGER.info("Stage start - Phase 3"); |
| ccs.runRemote(p3is, null); |
| LOGGER.info("Stage start"); |
| ccs.runRemote(ss, null); |
| LOGGER.info("Stage started"); |
| } catch (Exception e) { |
| } |
| } |
| }); |
| } |
| } |
| }); |
| |
| jobCleanUpTable.register(new JobCleanUpTable.Callback() { |
| @Override |
| public void deletion(TupleSet tuples) { |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void insertion(TupleSet tuples) { |
| for (final Tuple t : tuples) { |
| jobQueue.add(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Object[] data = t.toArray(); |
| UUID jobId = (UUID) data[0]; |
| Set<String> ts = (Set<String>) data[1]; |
| ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts |
| .size()]; |
| int i = 0; |
| for (String n : ts) { |
| jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId); |
| } |
| try { |
| ccs.runRemote(jcns, null); |
| } finally { |
| BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable |
| .createTuple(jobId)); |
| jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null); |
| jolRuntime.evaluate(); |
| } |
| } catch (Exception e) { |
| } |
| } |
| }); |
| } |
| } |
| }); |
| |
| abortMessageTable.register(new AbortMessageTable.Callback() { |
| @Override |
| public void deletion(TupleSet tuples) { |
| |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void insertion(TupleSet tuples) { |
| for (final Tuple t : tuples) { |
| jobQueue.add(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Object[] data = t.toArray(); |
| UUID jobId = (UUID) data[0]; |
| UUID stageId = (UUID) data[1]; |
| Integer attempt = (Integer) data[2]; |
| Set<List> ts = (Set<List>) data[4]; |
| ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts |
| .size()]; |
| int i = 0; |
| BasicTupleSet notificationTuples = new BasicTupleSet(); |
| for (List t2 : ts) { |
| Object[] t2Data = t2.toArray(); |
| String nodeId = (String) t2Data[0]; |
| jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, |
| attempt); |
| notificationTuples.add(AbortNotifyTable |
| .createTuple(jobId, stageId, nodeId, attempt)); |
| } |
| try { |
| ccs.runRemote(jas, null); |
| } finally { |
| jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, |
| null); |
| jolRuntime.evaluate(); |
| } |
| } catch (Exception e) { |
| } |
| } |
| }); |
| } |
| } |
| }); |
| |
| jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE)); |
| jolRuntime.evaluate(); |
| |
| qi = new QueryInterfaceImpl(); |
| } |
| |
| @Override |
| public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { |
| final UUID jobId = UUID.randomUUID(); |
| |
| final JobPlanBuilder builder = new JobPlanBuilder(); |
| builder.init(jobSpec, jobFlags); |
| |
| final BasicTupleSet anTuples = new BasicTupleSet(); |
| final BasicTupleSet acTuples = new BasicTupleSet(); |
| final BasicTupleSet abTuples = new BasicTupleSet(); |
| IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() { |
| @Override |
| public void addTask(IActivityNode task) { |
| anTuples.add(ActivityNodeTable.createTuple(jobId, task)); |
| builder.addTask(task); |
| } |
| |
| @Override |
| public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) { |
| acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex, |
| taskOutputIndex)); |
| builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex); |
| } |
| |
| @Override |
| public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) { |
| acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex, |
| taskInputIndex)); |
| builder.addSourceEdge(operatorInputIndex, task, taskInputIndex); |
| } |
| |
| @Override |
| public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) { |
| abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked)); |
| builder.addBlockingEdge(blocker, blocked); |
| } |
| }; |
| |
| BasicTupleSet odTuples = new BasicTupleSet(); |
| BasicTupleSet olTuples = new BasicTupleSet(); |
| BasicTupleSet ocTuples = new BasicTupleSet(); |
| for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) { |
| IOperatorDescriptor od = e.getValue(); |
| int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples); |
| odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od)); |
| od.contributeTaskGraph(gBuilder); |
| } |
| |
| BasicTupleSet cdTuples = new BasicTupleSet(); |
| for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) { |
| cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue())); |
| } |
| |
| BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, appName, jobSpec, |
| builder.getPlan())); |
| |
| jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null); |
| jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null); |
| |
| jolRuntime.evaluate(); |
| |
| return jobId; |
| } |
| |
| private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples, |
| BasicTupleSet ocTuples) { |
| PartitionConstraint pc = od.getPartitionConstraint(); |
| |
| switch (pc.getPartitionConstraintType()) { |
| case COUNT: |
| int count = ((PartitionCountConstraint) pc).getCount(); |
| ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count)); |
| return count; |
| |
| case EXPLICIT: |
| LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints(); |
| for (int i = 0; i < locationConstraints.length; ++i) { |
| addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0); |
| } |
| return locationConstraints.length; |
| } |
| throw new IllegalArgumentException(); |
| } |
| |
| private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i, |
| LocationConstraint locationConstraint, int benefit) { |
| switch (locationConstraint.getConstraintType()) { |
| case ABSOLUTE: |
| String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId(); |
| olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit)); |
| break; |
| |
| case CHOICE: |
| int index = 0; |
| for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) { |
| addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index); |
| index++; |
| } |
| } |
| } |
| |
| @Override |
| public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception { |
| BasicTupleSet puTuples = new BasicTupleSet(); |
| for (Map.Entry<UUID, Map<String, Long>> e : counterDump.entrySet()) { |
| puTuples.add(ProfileUpdateTable.createTuple(e.getKey(), id, e.getValue())); |
| } |
| jolRuntime.schedule(JOL_SCOPE, ProfileUpdateTable.TABLE_NAME, puTuples, null); |
| } |
| |
| @Override |
| public JobStatus getJobStatus(UUID jobId) { |
| synchronized (jobTable) { |
| try { |
| Tuple jobTuple = jobTable.lookupJob(jobId); |
| if (jobTuple == null) { |
| return null; |
| } |
| return (JobStatus) jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX); |
| } catch (BadKeyException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void notifyNodeFailure(String nodeId) throws Exception { |
| int len = rankedAvailableNodes.size(); |
| int delIndex = -1; |
| for (int i = 0; i < len; ++i) { |
| if (nodeId.equals(rankedAvailableNodes.get(i))) { |
| delIndex = i; |
| break; |
| } |
| } |
| if (delIndex < 0) { |
| return; |
| } |
| BasicTupleSet delRANTuples = new BasicTupleSet(); |
| delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex)); |
| |
| BasicTupleSet insRANTuples = new BasicTupleSet(); |
| for (int i = delIndex + 1; i < len; ++i) { |
| insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1)); |
| } |
| |
| rankedAvailableNodes.remove(delIndex); |
| |
| jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples); |
| |
| BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId)); |
| |
| jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples); |
| |
| jolRuntime.evaluate(); |
| |
| BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId)); |
| |
| jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null); |
| |
| jolRuntime.evaluate(); |
| } |
| |
| @Override |
| public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId, |
| Map<String, Long> statistics) throws Exception { |
| BasicTupleSet scTuples = new BasicTupleSet(); |
| scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics)); |
| |
| jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null); |
| |
| jolRuntime.evaluate(); |
| } |
| |
| @Override |
| public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) |
| throws Exception { |
| BasicTupleSet sfTuples = new BasicTupleSet(); |
| sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt)); |
| |
| jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null); |
| |
| jolRuntime.evaluate(); |
| } |
| |
| @Override |
| public void start(UUID jobId) throws Exception { |
| BasicTupleSet jsTuples = new BasicTupleSet(); |
| jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis())); |
| |
| jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null); |
| |
| jolRuntime.evaluate(); |
| } |
| |
| @Override |
| public synchronized void registerNode(String nodeId) throws Exception { |
| rankedAvailableNodes.add(nodeId); |
| BasicTupleSet insRANTuples = new BasicTupleSet(); |
| insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1)); |
| |
| jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null); |
| |
| BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId)); |
| |
| jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null); |
| |
| jolRuntime.evaluate(); |
| |
| BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId)); |
| |
| jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples); |
| |
| jolRuntime.evaluate(); |
| } |
| |
| @Override |
| public void waitForCompletion(UUID jobId) throws Exception { |
| synchronized (jobTable) { |
| Tuple jobTuple = null; |
| while ((jobTuple = jobTable.lookupJob(jobId)) != null |
| && jobTuple.value(JobTable.JOBSTATUS_FIELD_INDEX) != JobStatus.TERMINATED) { |
| jobTable.wait(); |
| } |
| } |
| } |
| |
| @Override |
| public IJobManagerQueryInterface getQueryInterface() { |
| return qi; |
| } |
| |
| public void visitJobs(ITupleProcessor processor) throws Exception { |
| for (Tuple t : jobTable.tuples()) { |
| processor.process(t); |
| } |
| } |
| |
| public void visitJob(UUID jobId, ITupleProcessor processor) throws Exception { |
| Tuple job = jobTable.lookupJob(jobId); |
| if (job != null) { |
| processor.process(job); |
| } |
| } |
| |
| /* |
| * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan}) |
| */ |
| private static class JobTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job"); |
| |
| private static Key PRIMARY_KEY = new Key(0); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, JobStatus.class, |
| JobSpecification.class, JobPlan.class, Map.class }; |
| |
| public static final int JOBID_FIELD_INDEX = 0; |
| public static final int APPNAME_FIELD_INDEX = 1; |
| public static final int JOBSTATUS_FIELD_INDEX = 2; |
| public static final int JOBSPEC_FIELD_INDEX = 3; |
| public static final int JOBPLAN_FIELD_INDEX = 4; |
| public static final int STATISTICS_FIELD_INDEX = 5; |
| |
| public JobTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| @SuppressWarnings("unchecked") |
| static Tuple createInitialJobTuple(UUID jobId, String appName, JobSpecification jobSpec, JobPlan plan) { |
| return new Tuple(jobId, appName, JobStatus.INITIALIZED, jobSpec, plan, new HashMap()); |
| } |
| |
| Tuple lookupJob(UUID jobId) throws BadKeyException { |
| TupleSet set = primary().lookupByKey(jobId); |
| if (set.isEmpty()) { |
| return null; |
| } |
| return (Tuple) set.toArray()[0]; |
| } |
| } |
| |
| /* |
| * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor}) |
| */ |
| private static class OperatorDescriptorTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class, |
| IOperatorDescriptor.class }; |
| |
| public OperatorDescriptorTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) { |
| return new Tuple(jobId, od.getOperatorId(), nPartitions, od); |
| } |
| } |
| |
| /* |
| * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId}) |
| */ |
| private static class OperatorLocationTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation"); |
| |
| private static Key PRIMARY_KEY = new Key(); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, String.class, |
| Integer.class, Integer.class }; |
| |
| public OperatorLocationTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) { |
| return new Tuple(jobId, opId, nodeId, partition, benefit); |
| } |
| } |
| |
| /* |
| * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count}) |
| */ |
| private static class OperatorCloneCountTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount"); |
| |
| private static Key PRIMARY_KEY = new Key(); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class }; |
| |
| public OperatorCloneCountTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) { |
| return new Tuple(jobId, opId, cloneCount); |
| } |
| } |
| |
| /* |
| * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor}) |
| */ |
| private static class ConnectorDescriptorTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, ConnectorDescriptorId.class, |
| OperatorDescriptorId.class, Integer.class, OperatorDescriptorId.class, Integer.class, |
| IConnectorDescriptor.class }; |
| |
| public ConnectorDescriptorTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) { |
| IOperatorDescriptor srcOD = jobSpec.getProducer(conn); |
| int srcPort = jobSpec.getProducerOutputIndex(conn); |
| IOperatorDescriptor destOD = jobSpec.getConsumer(conn); |
| int destPort = jobSpec.getConsumerInputIndex(conn); |
| Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, |
| destOD.getOperatorId(), destPort, conn); |
| return cdTuple; |
| } |
| } |
| |
| /* |
| * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode}) |
| */ |
| private static class ActivityNodeTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, |
| ActivityNodeId.class, IActivityNode.class }; |
| |
| public ActivityNodeTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, IActivityNode aNode) { |
| return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(), |
| aNode); |
| } |
| } |
| |
| /* |
| * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer}) |
| */ |
| private static class ActivityConnectionTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2, 3); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class, |
| Direction.class, ActivityNodeId.class, Integer.class }; |
| |
| public ActivityConnectionTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) { |
| return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, |
| aNode.getActivityNodeId(), activityPort); |
| } |
| } |
| |
| /* |
| * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId}) |
| */ |
| private static class ActivityBlockedTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, |
| ActivityNodeId.class, OperatorDescriptorId.class, ActivityNodeId.class }; |
| |
| public ActivityBlockedTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) { |
| ActivityNodeId blockerANId = blocker.getActivityNodeId(); |
| OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId(); |
| ActivityNodeId blockedANId = blocked.getActivityNodeId(); |
| OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId(); |
| return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId); |
| } |
| } |
| |
| /* |
| * declare(jobstart, keys(0), {JobId, SubmitTime}) |
| */ |
| private static class JobStartTable extends EventTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart"); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, Long.class }; |
| |
| public JobStartTable() { |
| super(TABLE_NAME, SCHEMA); |
| } |
| |
| static Tuple createTuple(UUID jobId, long submitTime) { |
| return new Tuple(jobId, submitTime); |
| } |
| } |
| |
| /* |
| * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet}) |
| */ |
| private static class StartMessageTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, String.class, |
| JobPlan.class, Set.class }; |
| |
| public StartMessageTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| } |
| |
| /* |
| * declare(jobcleanup, keys(0), {JobId, Set<NodeId>}) |
| */ |
| private static class JobCleanUpTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup"); |
| |
| private static Key PRIMARY_KEY = new Key(0); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class }; |
| |
| public JobCleanUpTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| } |
| |
| /* |
| * declare(jobcleanupcomplete, keys(0), {JobId}) |
| */ |
| private static class JobCleanUpCompleteTable extends EventTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete"); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class }; |
| |
| public JobCleanUpCompleteTable() { |
| super(TABLE_NAME, SCHEMA); |
| } |
| |
| public static Tuple createTuple(UUID jobId) { |
| return new Tuple(jobId); |
| } |
| } |
| |
| /* |
| * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt}) |
| */ |
| private static class StageletCompleteTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2, 3); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class, |
| Map.class }; |
| |
| public StageletCompleteTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt, |
| Map<String, Long> statistics) { |
| return new Tuple(jobId, stageId, nodeId, attempt, statistics); |
| } |
| } |
| |
| /* |
| * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt}) |
| */ |
| private static class StageletFailureTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2, 3); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class }; |
| |
| public StageletFailureTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) { |
| return new Tuple(jobId, stageId, nodeId, attempt); |
| } |
| } |
| |
| /* |
| * declare(availablenodes, keys(0), {NodeId}) |
| */ |
| private static class AvailableNodesTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes"); |
| |
| private static Key PRIMARY_KEY = new Key(0); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { String.class }; |
| |
| public AvailableNodesTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(String nodeId) { |
| return new Tuple(nodeId); |
| } |
| } |
| |
| /* |
| * declare(rankedavailablenodes, keys(0), {NodeId, Integer}) |
| */ |
| private static class RankedAvailableNodesTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes"); |
| |
| private static Key PRIMARY_KEY = new Key(0); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { String.class, Integer.class }; |
| |
| public RankedAvailableNodesTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(String nodeId, int rank) { |
| return new Tuple(nodeId, rank); |
| } |
| } |
| |
| /* |
| * declare(failednodes, keys(0), {NodeId}) |
| */ |
| private static class FailedNodesTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes"); |
| |
| private static Key PRIMARY_KEY = new Key(0); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { String.class }; |
| |
| public FailedNodesTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(String nodeId) { |
| return new Tuple(nodeId); |
| } |
| } |
| |
| /* |
| * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet}) |
| */ |
| private static class AbortMessageTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class, |
| Set.class }; |
| |
| public AbortMessageTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| } |
| |
| /* |
| * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics}) |
| */ |
| private static class AbortNotifyTable extends BasicTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify"); |
| |
| private static Key PRIMARY_KEY = new Key(0, 1, 2, 3); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class }; |
| |
| public AbortNotifyTable(Runtime context) { |
| super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA); |
| } |
| |
| public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) { |
| return new Tuple(jobId, stageId, nodeId, attempt); |
| } |
| } |
| |
| private static class ExpandPartitionCountConstraintTableFunction extends Function { |
| private static final String TABLE_NAME = "expandpartitioncountconstraint"; |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class, |
| Integer.class }; |
| |
| public static final int JOBID_FIELD_INDEX = 0; |
| public static final int ODID_FIELD_INDEX = 1; |
| public static final int NPARTITIONS_FIELD_INDEX = 2; |
| public static final int ORDINAL_FIELD_INDEX = 3; |
| |
| public ExpandPartitionCountConstraintTableFunction() { |
| super(TABLE_NAME, SCHEMA); |
| } |
| |
| @Override |
| public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException { |
| TupleSet result = new BasicTupleSet(); |
| int counter = 0; |
| for (Tuple t : tuples) { |
| int nPartitions = (Integer) t.value(NPARTITIONS_FIELD_INDEX); |
| for (int i = 0; i < nPartitions; ++i) { |
| result.add(new Tuple(t.value(JOBID_FIELD_INDEX), t.value(ODID_FIELD_INDEX), i, counter++)); |
| } |
| } |
| return result; |
| } |
| } |
| |
| /* |
| * declare(profileupdate, keys(0, 1), {JobId, NodeId, Map}) |
| */ |
| private static class ProfileUpdateTable extends EventTable { |
| private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "profileupdate"); |
| |
| @SuppressWarnings("unchecked") |
| private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, Map.class }; |
| |
| public ProfileUpdateTable() { |
| super(TABLE_NAME, SCHEMA); |
| } |
| |
| public static Tuple createTuple(UUID jobId, String nodeId, Map<String, Long> statistics) { |
| return new Tuple(jobId, nodeId, statistics); |
| } |
| } |
| |
| private class JobQueueThread extends Thread { |
| public JobQueueThread() { |
| setDaemon(true); |
| } |
| |
| public void run() { |
| Runnable r; |
| while (true) { |
| try { |
| r = jobQueue.take(); |
| } catch (InterruptedException e) { |
| continue; |
| } |
| try { |
| r.run(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| } |
| |
| public interface ITupleProcessor { |
| public void process(Tuple t) throws Exception; |
| } |
| |
| private class QueryInterfaceImpl implements IJobManagerQueryInterface { |
| @Override |
| public JSONArray getAllJobSummaries() throws Exception { |
| final JSONArray jobs = new JSONArray(); |
| JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() { |
| @Override |
| public void process(Tuple t) throws Exception { |
| JSONObject jo = new JSONObject(); |
| jo.put("type", "job-summary"); |
| jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString()); |
| jo.put("status", t.value(JobTable.JOBSTATUS_FIELD_INDEX).toString()); |
| jobs.put(jo); |
| } |
| }; |
| visitJobs(tp); |
| return jobs; |
| } |
| |
| @Override |
| public JSONObject getJobSpecification(UUID jobId) throws Exception { |
| final JSONArray jobs = new JSONArray(); |
| JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() { |
| @Override |
| public void process(Tuple t) throws Exception { |
| JobSpecification js = (JobSpecification) t.value(JobTable.JOBSPEC_FIELD_INDEX); |
| jobs.put(js.toJSON()); |
| } |
| }; |
| visitJob(jobId, tp); |
| return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0)); |
| } |
| |
| @Override |
| public JSONObject getJobPlan(UUID jobId) throws Exception { |
| final JSONArray jobs = new JSONArray(); |
| JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() { |
| @Override |
| public void process(Tuple t) throws Exception { |
| } |
| }; |
| visitJob(jobId, tp); |
| return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0)); |
| } |
| |
| @Override |
| public JSONObject getJobProfile(UUID jobId) throws Exception { |
| final JSONArray jobs = new JSONArray(); |
| JOLJobManagerImpl.ITupleProcessor tp = new JOLJobManagerImpl.ITupleProcessor() { |
| @Override |
| public void process(Tuple t) throws Exception { |
| JSONObject jo = new JSONObject(); |
| jo.put("type", "profile"); |
| jo.put("id", t.value(JobTable.JOBID_FIELD_INDEX).toString()); |
| Map<String, Long> profile = (Map<String, Long>) t.value(JobTable.STATISTICS_FIELD_INDEX); |
| if (profile != null) { |
| for (Map.Entry<String, Long> e : profile.entrySet()) { |
| JSONObject jpe = new JSONObject(); |
| jpe.put("name", e.getKey()); |
| jpe.put("value", e.getValue()); |
| jo.accumulate("counters", jpe); |
| } |
| } |
| jobs.put(jo); |
| } |
| }; |
| visitJob(jobId, tp); |
| return (JSONObject) (jobs.length() == 0 ? new JSONObject() : jobs.get(0)); |
| } |
| } |
| } |