[EAGLE-1041] Support policy processing pipeline
https://issues.apache.org/jira/browse/EAGLE-1041
Two updates:
* if an inputStream is an intermediate stream (defined by select clause statement), then remove it from inputStream list and outputStream list
* if an inputStream is an intermediate stream, remove its PartitionSpec
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #947 from qingwen220/EAGLE-1041.
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
index ae8cf7e..0a64a42 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
@@ -191,7 +191,7 @@
$scope.definition = data;
// Input streams
- $scope.policy.inputStreams = $.map(data.policyExecutionPlan.inputStreams, function (value, stream) {
+ var inputStreams = $.map(data.policyExecutionPlan.inputStreams, function (value, stream) {
return stream;
});
@@ -199,15 +199,28 @@
var outputStreams = $.map(data.policyExecutionPlan.outputStreams, function (value, stream) {
return stream;
});
+
+ // Partition
+ $scope.policy.partitionSpec = $.grep(data.policyExecutionPlan.streamPartitions, function (partition) {
+ return $.inArray(partition.streamId, outputStreams) === -1;
+ });
+
+ var tempStreams = $.grep(inputStreams, function (i) {
+ return $.inArray(i, outputStreams) > -1;
+ });
+
+ $.each(tempStreams, function (i, tempStream) {
+ inputStreams = common.array.remove(tempStream, inputStreams);
+ outputStreams = common.array.remove(tempStream, outputStreams);
+ });
+
$scope.policy.outputStreams = outputStreams.concat();
+ $scope.policy.inputStreams = inputStreams;
$scope.outputStreams = outputStreams;
autoDescription();
// Dedup fields
$scope.refreshOutputSteamFields();
-
- // Partition
- $scope.policy.partitionSpec = data.policyExecutionPlan.streamPartitions;
}
} else {
$scope.definition = {};