| program hyrackscc; |
| |
| import java.util.UUID; |
| import java.util.Set; |
| import java.util.Map; |
| import java.util.HashMap; |
| |
| import jol.types.basic.Tuple; |
| import jol.types.basic.TupleSet; |
| |
| import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId; |
| import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; |
| import edu.uci.ics.hyracks.api.job.JobStatus; |
| import edu.uci.ics.hyracks.api.job.JobPlan; |
| |
| watch(job, a); |
| watch(job, i); |
| watch(job, d); |
| |
| define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer}); |
| |
| activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :- |
| activitynode(JobId, OperatorId, ActivityId, _); |
| |
| activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :- |
| /* use activitystage_temp#insert() to trigger only on insert */ |
| activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1), |
| activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2), |
| activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2), |
| StageNumber2 <= StageNumber1 |
| { |
| StageNumber := StageNumber1 + 1; |
| }; |
| |
| activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :- |
| activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1), |
| activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2), |
| activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _), |
| activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _), |
| connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _), |
| StageNumber1 != StageNumber2 |
| { |
| StageNumber := java.lang.Math.max(StageNumber1, StageNumber2); |
| }; |
| |
| activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :- |
| activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1), |
| activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2), |
| activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _), |
| activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _), |
| connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _), |
| StageNumber1 != StageNumber2 |
| { |
| StageNumber := java.lang.Math.max(StageNumber1, StageNumber2); |
| }; |
| |
| watch(activitystage_temp, a); |
| |
| watch(activityconnection, a); |
| watch(activityblocked, a); |
| watch(operatordescriptor, a); |
| watch(connectordescriptor, a); |
| |
| watch(activitystage, a); |
| watch(activitystage, i); |
| watch(activitystage, d); |
| |
| define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer}); |
| |
| activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :- |
| activitystage_temp(JobId, OperatorId, ActivityId, StageNumber); |
| |
| define(jobstage, keys(0, 1), {UUID, Integer, UUID}); |
| |
| jobstage(JobId, StageNumber, StageId) :- |
| activitystage(JobId, _, _, StageNumber) |
| { |
| StageId := java.util.UUID.randomUUID(); |
| }; |
| |
| watch(jobstage, a); |
| |
| define(jobattempt, keys(), {UUID, Integer}); |
| |
| jobattempt(JobId, 0) :- |
| job(JobId, _, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _), |
| jobstart(JobId, _); |
| |
| jobattempt(JobId, NextAttempt) :- |
| jobattempt(JobId, Attempt), |
| stagestart(JobId, _, Attempt), |
| abortcomplete(JobId, _, Attempt) |
| { |
| NextAttempt := Attempt + 1; |
| }; |
| |
| job(JobId, AppName, Status, JobSpec, JobPlan, Stats) :- |
| job(JobId, AppName, Status, JobSpec, JobPlan, Stats), |
| profileupdate(JobId, _, JobletStats) |
| { |
| Stats.putAll(JobletStats); |
| }; |
| |
| define(stagestart, keys(), {UUID, Integer, Integer}); |
| define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Map}); |
| |
| watch(jobstart, i); |
| |
| stagestart_INITIAL stagestart(JobId, 0, Attempt) :- |
| jobattempt#insert(JobId, Attempt); |
| |
| update_job_status_RUNNING job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats) :- |
| job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, Stats), |
| jobstart(JobId, _); |
| |
| stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :- |
| stagestart(JobId, StageNumber, Attempt), |
| stagefinish#insert(StageId, StageNumber, Attempt, _) |
| { |
| NextStageNumber := StageNumber + 1; |
| }; |
| |
| watch(stagestart, a); |
| watch(stagestart, d); |
| |
| define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer}); |
| |
| operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :- |
| operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit), |
| availablenodes(NodeId); |
| |
| watch(availablenodes, a); |
| watch(availablenodes, i); |
| watch(availablenodes, d); |
| |
| define(availablenodecount, keys(0), {Integer, Integer}); |
| |
| watch(availablenodecount, a); |
| watch(availablenodecount, i); |
| watch(availablenodecount, d); |
| |
| /* Try not using a 0 key -- Tyson feels that should work! */ |
| availablenodecount(0, count<NodeId>) :- |
| availablenodes(NodeId); |
| |
| watch(rankedavailablenodes, a); |
| watch(rankedavailablenodes, i); |
| watch(rankedavailablenodes, d); |
| |
| watch(operatorlocationcandidates, a); |
| watch(operatorlocationcandidates, i); |
| watch(operatorlocationcandidates, d); |
| |
| define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer}); |
| |
| maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :- |
| operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit); |
| |
| watch(maxoperatorlocationbenefit, a); |
| watch(maxoperatorlocationbenefit, i); |
| watch(maxoperatorlocationbenefit, d); |
| |
| define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer}); |
| |
| watch(attemptoperatorlocationdecision, a); |
| watch(attemptoperatorlocationdecision, i); |
| watch(attemptoperatorlocationdecision, d); |
| |
| attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :- |
| jobattempt#insert(JobId, Attempt), |
| operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit), |
| maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit); |
| |
| attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :- |
| jobattempt#insert(JobId, Attempt), |
| operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank), |
| rankedavailablenodes(NodeId, NodeRank), |
| availablenodecount(_, NodeCount), |
| NodeRank == CloneRank % NodeCount; |
| |
| define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer}); |
| |
| operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :- |
| operatorclonecount(JobId, OperatorId, NPartitions); |
| |
| define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer}); |
| |
| operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :- |
| expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank)); |
| |
| watch(operatorclonecountexpansiontotalorder, a); |
| watch(operatorclonecountexpansiontotalorder, i); |
| watch(operatorclonecountexpansiontotalorder, d); |
| |
| watch(operatorclonecount, a); |
| watch(operatorclonecount, i); |
| watch(operatorclonecount, d); |
| |
| define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer}); |
| |
| activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :- |
| stagestart#insert(JobId, StageNumber, Attempt), |
| operatordescriptor(JobId, OperatorId, _, _), |
| activitystage(JobId, OperatorId, ActivityId, StageNumber), |
| jobstage(JobId, StageNumber, StageId), |
| attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt); |
| |
| watch(activitystart, a); |
| |
| define(stageletstart, keys(0, 1, 4, 5), {UUID, UUID, String, JobPlan, String, Integer, Set}); |
| |
| stageletstart(JobId, StageId, AppName, JobPlan, NodeId, Attempt, set<ActivityInfo>) :- |
| activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition), |
| job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _) |
| { |
| ActivityInfo := [ActivityId, Partition]; |
| }; |
| |
| watch(stageletstart, a); |
| watch(stageletstart, i); |
| |
| define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, String, JobPlan, Set}); |
| |
| startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, set<Tuple>) :- |
| stageletstart#insert(JobId, StageId, AppName, JobPlan, NodeId, Attempt, ActivityInfoSet), |
| availablenodes(NodeId), |
| ActivityInfoSet.size() != 0 |
| { |
| Tuple := [NodeId, ActivityInfoSet]; |
| }; |
| |
| startmessage(JobId, StageId, Attempt, AppName, JobPlan, TSet) :- |
| startmessage_agg(JobId, StageId, Attempt, AppName, JobPlan, TSet); |
| |
| watch(startmessage, a); |
| watch(startmessage, i); |
| |
| define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set}); |
| |
| stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :- |
| stageletfailure(JobId, StageId, NodeId, Attempt), |
| stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, ActivityIdSet); |
| |
| stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :- |
| stageletstart(JobId, StageId, _, JobPlan, NodeId, Attempt, _), |
| stageletstart(JobId, StageId, _, _, NodeIdOther, Attempt, ActivityIdSet), |
| failednodes#insert(NodeId), |
| notin stageletcomplete(JobId, StageId, NodeId, Attempt, _); |
| |
| watch(stageletabort, a); |
| watch(stageletabort, i); |
| watch(stageletabort, d); |
| |
| define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set}); |
| |
| stageabort(JobId, StageId, Attempt, set<NodeId>) :- |
| stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _); |
| |
| define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set}); |
| |
| abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :- |
| stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet), |
| availablenodes(NodeId) |
| { |
| Tuple := [NodeId, ActivityIdSet]; |
| }; |
| |
| abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :- |
| abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet), |
| TSet.size() != 0; |
| |
| watch(abortmessage, a); |
| watch(abortmessage, i); |
| |
| define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer}); |
| |
| stageletabortcomplete(JobId, StageId, NodeId, Attempt) :- |
| abortnotify(JobId, StageId, NodeId, Attempt); |
| |
| stageletabortcomplete(JobId, StageId, NodeId, Attempt) :- |
| stageletabort(JobId, StageId, _, NodeId, Attempt, _), |
| notin availablenodes(NodeId); |
| |
| define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set}); |
| |
| stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :- |
| stageletabortcomplete(JobId, StageId, NodeId, Attempt); |
| |
| define(abortcomplete, keys(), {UUID, UUID, Integer}); |
| |
| abortcomplete(JobId, StageId, Attempt) :- |
| stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1), |
| stageabort(JobId, StageId, Attempt, NodeIdSet2), |
| NodeIdSet1.size() == NodeIdSet2.size(); |
| |
| define(stageletcompletecount, keys(0, 1, 2), {UUID, UUID, Integer, Integer}); |
| |
| stageletcompletecount(JobId, StageId, Attempt, count<NodeId>) :- |
| stageletcomplete(JobId, StageId, NodeId, Attempt, _); |
| |
| define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Map}); |
| |
| stageletcomplete_agg(JobId, StageId, Attempt, generic<(new HashMap()).putAll(Statistics)>) :- |
| stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics); |
| |
| stagefinish(JobId, StageNumber, Attempt, SMap) :- |
| startmessage_agg(JobId, StageId, Attempt, _, _, TSet), |
| stageletcompletecount(JobId, StageId, Attempt, Count), |
| stageletcomplete_agg(JobId, StageId, Attempt, SMap), |
| jobstage(JobId, StageNumber, StageId), |
| TSet.size() == Count; |
| |
| update_job_status_TERMINATED job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, NewStats) :- |
| job(JobId, AppName, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, Stats), |
| stagestart#insert(JobId, StageNumber, Attempt), |
| stagefinish(JobId, _, Attempt, SMap), |
| notin jobstage(JobId, StageNumber) |
| { |
| NewStats := new HashMap(); |
| NewStats.putAll(Stats); |
| NewStats.putAll(SMap); |
| }; |
| |
| define(jobcleanup_agg, {UUID, Set}); |
| |
| jobcleanup_agg(JobId, set<NodeId>) :- |
| stagestart#insert(JobId, StageNumber, Attempt), |
| stagefinish(JobId, _, Attempt, _), |
| attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt), |
| notin jobstage(JobId, StageNumber); |
| |
| jobcleanup(JobId, NodeIdSet) :- |
| jobcleanup_agg(JobId, NodeIdSet); |