blob: 3f0352615fd7785f745ea40e437032f993bfd980 [file] [log] [blame]
program hyrackscc;
import java.util.UUID;
import java.util.Set;
import jol.types.basic.Tuple;
import jol.types.basic.TupleSet;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.JobPlan;
define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
activitynode(JobId, OperatorId, ActivityId, _);
activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
StageNumber2 <= StageNumber1
{
StageNumber := StageNumber1 + 1;
};
activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
StageNumber1 != StageNumber2
{
StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
};
activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
StageNumber1 != StageNumber2
{
StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
};
watch(activitystage_temp, a);
watch(activityconnection, a);
watch(activityblocked, a);
watch(operatordescriptor, a);
watch(connectordescriptor, a);
watch(activitystage, a);
watch(activitystage, i);
watch(activitystage, d);
define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
define(jobstage, keys(0, 1), {UUID, Integer, UUID});
jobstage(JobId, StageNumber, StageId) :-
activitystage(JobId, _, _, StageNumber)
{
StageId := java.util.UUID.randomUUID();
};
watch(jobstage, a);
define(jobattempt, keys(), {UUID, Integer});
jobattempt(JobId, 0) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
jobstart(JobId, _);
jobattempt(JobId, NextAttempt) :-
jobattempt(JobId, Attempt),
stagestart(JobId, _, Attempt),
abortcomplete(JobId, _, Attempt)
{
NextAttempt := Attempt + 1;
};
define(stagestart, keys(), {UUID, Integer, Integer});
define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
watch(jobstart, i);
stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
jobattempt#insert(JobId, Attempt);
update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
jobstart(JobId, _);
stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
stagestart(JobId, StageNumber, Attempt),
stagefinish#insert(StageId, StageNumber, Attempt, _)
{
NextStageNumber := StageNumber + 1;
};
watch(stagestart, a);
watch(stagestart, d);
define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
availablenodes(NodeId);
watch(availablenodes, a);
watch(availablenodes, i);
watch(availablenodes, d);
define(availablenodecount, keys(0), {Integer, Integer});
watch(availablenodecount, a);
watch(availablenodecount, i);
watch(availablenodecount, d);
availablenodecount(0, count<NodeId>) :-
availablenodes(NodeId);
watch(rankedavailablenodes, a);
watch(rankedavailablenodes, i);
watch(rankedavailablenodes, d);
watch(operatorlocationcandidates, a);
watch(operatorlocationcandidates, i);
watch(operatorlocationcandidates, d);
define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
watch(maxoperatorlocationbenefit, a);
watch(maxoperatorlocationbenefit, i);
watch(maxoperatorlocationbenefit, d);
define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
watch(attemptoperatorlocationdecision, a);
watch(attemptoperatorlocationdecision, i);
watch(attemptoperatorlocationdecision, d);
attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
jobattempt#insert(JobId, Attempt),
operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
jobattempt#insert(JobId, Attempt),
operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
rankedavailablenodes(NodeId, NodeRank),
availablenodecount(_, NodeCount),
NodeRank == CloneRank % NodeCount;
define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
operatorclonecount(JobId, OperatorId, NPartitions);
define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
watch(operatorclonecountexpansiontotalorder, a);
watch(operatorclonecountexpansiontotalorder, i);
watch(operatorclonecountexpansiontotalorder, d);
watch(operatorclonecount, a);
watch(operatorclonecount, i);
watch(operatorclonecount, d);
define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
stagestart#insert(JobId, StageNumber, Attempt),
operatordescriptor(JobId, OperatorId, _, _),
activitystage(JobId, OperatorId, ActivityId, StageNumber),
jobstage(JobId, StageNumber, StageId),
attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
watch(activitystart, a);
define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
{
ActivityInfo := [ActivityId, Partition];
};
watch(stageletstart, a);
watch(stageletstart, i);
define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
availablenodes(NodeId),
ActivityInfoSet.size() != 0
{
Tuple := [NodeId, ActivityInfoSet];
};
startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
watch(startmessage, a);
watch(startmessage, i);
define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
stageletfailure(JobId, StageId, NodeId, Attempt),
stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
failednodes#insert(NodeId),
notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
watch(stageletabort, a);
watch(stageletabort, i);
watch(stageletabort, d);
define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
stageabort(JobId, StageId, Attempt, set<NodeId>) :-
stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
availablenodes(NodeId)
{
Tuple := [NodeId, ActivityIdSet];
};
abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
TSet.size() != 0;
watch(abortmessage, a);
watch(abortmessage, i);
define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
abortnotify(JobId, StageId, NodeId, Attempt);
stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
stageletabort(JobId, StageId, _, NodeId, Attempt, _),
notin availablenodes(NodeId);
define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
stageletabortcomplete(JobId, StageId, NodeId, Attempt);
define(abortcomplete, keys(), {UUID, UUID, Integer});
abortcomplete(JobId, StageId, Attempt) :-
stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
stageabort(JobId, StageId, Attempt, NodeIdSet2),
NodeIdSet1.size() == NodeIdSet2.size();
define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
stagefinish(JobId, StageNumber, Attempt, SSet) :-
startmessage_agg(JobId, StageId, Attempt, _, TSet),
stageletcomplete_agg(JobId, StageId, Attempt, SSet),
jobstage(JobId, StageNumber, StageId),
TSet.size() == SSet.size();
update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
stagestart#insert(JobId, StageNumber, Attempt),
stagefinish(JobId, _, Attempt, SSet),
notin jobstage(JobId, StageNumber);
define(jobcleanup_agg, {UUID, Set});
jobcleanup_agg(JobId, set<NodeId>) :-
stagestart#insert(JobId, StageNumber, Attempt),
stagefinish(JobId, _, Attempt, _),
attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
notin jobstage(JobId, StageNumber);
jobcleanup(JobId, NodeIdSet) :-
jobcleanup_agg(JobId, NodeIdSet);