blob: 7e20b50c73bf15a863403250ccff253eea453a04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/
angular.module('dashboard')
.config(['$stateProvider',
function ($stateProvider) {
'use strict';
$stateProvider
.state('streamingapp.processor', {
url: '/processor/:processorId',
templateUrl: 'views/apps/streamingapp/processor.html',
controller: 'StreamingAppProcessorCtrl'
});
}])
.controller('StreamingAppProcessorCtrl', ['$scope', '$interval', '$stateParams', '$propertyTableBuilder',
'i18n', 'models', 'conf', 'helper',
function ($scope, $interval, $stateParams, $ptb, i18n, models, conf, helper) {
'use strict';
$scope.whatIsProcessor = i18n.terminology.processor;
$scope.processor = $scope.dag.getProcessor($stateParams.processorId);
$scope.processorInfoTable = [
$ptb.text('Task Class').done(),
$ptb.number('Parallelism').done(),
$ptb.text('Data Flow Type').done(),
$ptb.datetime('Birth Time').done(),
$ptb.datetime('Death Time').done()
];
function describeProcessorType(inputs, outputs) {
if (inputs === 0) {
return outputs > 0 ? 'Data Source (%s outputs)'.replace('%s', outputs) : 'Orphan';
} else if (outputs === 0) {
return inputs > 0 ? 'Data Sink (%s inputs)'.replace('%s', inputs) : 'Orphan';
}
return 'General (%s inputs %s outputs)'.replace('%s', inputs).replace('%s', outputs);
}
function updateProcessorInfoTable(processor) {
var degree = $scope.dag.getProcessorIndegreeAndOutdegree(processor.id);
$ptb.$update($scope.processorInfoTable, [
processor.taskClass,
processor.parallelism,
describeProcessorType(degree.indegree, degree.outdegree),
processor.life.birth <= 0 ?
'Start with the application' : processor.life.birth,
processor.life.death === models.DAG_DEATH_UNSPECIFIED ?
'Not specified' : processor.life.death
]);
$scope.tasks = {
selected: [],
available: []
};
}
updateProcessorInfoTable($scope.processor);
// query task metrics
var promise;
$scope.$on('$destroy', function () {
$interval.cancel(promise);
});
$scope.$watch('metricName', function () {
queryMetrics();
});
$scope.queryLimit = conf.restapiTaskLevelMetricsQueryLimit;
$scope.shouldPaginateTasks = $scope.processor.parallelism > $scope.queryLimit;
var requesting = false;
function queryMetrics() {
if (!requesting && $scope.metricName) {
requesting = true;
models.$get.appTaskLatestMetricValues(
$scope.app.appId, $scope.processor.id, $scope.metricName, $scope.taskRange).then(function (metrics) {
if (metrics.hasOwnProperty($scope.metricName)) {
$scope.taskMetrics = metrics[$scope.metricName];
}
requesting = false;
});
}
}
promise = $interval(queryMetrics, conf.restapiQueryInterval);
queryMetrics();
$scope.whatIsTask = i18n.terminology.task;
$scope.taskRange = {
start: 0,
stop: $scope.shouldPaginateTasks ?
$scope.queryLimit - 1 : $scope.processor.parallelism - 1
};
$scope.$watch('taskRange', function (range) {
if (range.hasOwnProperty('start')) {
updateTaskSelection(range);
}
}, /*deep=*/true);
function updateTaskSelection(range) {
$scope.tasks = {
selected: [],
available: function () {
var count = range.stop - range.start + 1;
return _.times(count, function (i) {
return 'T' + (i + range.start);
});
}()
};
}
// For the bar chart control
$scope.tasksBarChart = {
options: {
height: '110px',
seriesNames: [''],
barMinWidth: 4,
barMinSpacing: 1,
valueFormatter: function (value) {
var unit = $scope.metricType === 'meter' ? 'msg/s' : 'ms';
return helper.readableMetricValue(value) + ' ' + unit;
},
data: _.map($scope.tasks.available.length, function (taskName) {
return {x: taskName, y: 0};
})
}
};
$scope.$watch('taskMetrics', function (metricsSelection) {
if (angular.isObject(metricsSelection)) {
updateBarChartData(metricsSelection);
}
});
function updateBarChartData(metricsSelection) {
if ($scope.tasks.available.length === 0 ||
_.keys(metricsSelection).length !== $scope.tasks.available.length) {
return;
}
var data = _.map($scope.tasks.available, function (taskName) {
return {x: taskName};
});
var i = 0;
var metricField = $scope.metricType === 'meter' ? 'movingAverage1m' : 'mean';
_.forEach(metricsSelection, function (metric) {
data[i].y = helper.metricRounded(metric[metricField]);
i++;
});
var xLabelChanged = !_.isEqual(_.map($scope.tasksBarChart.options.data, 'x'), _.map(data, 'x'));
if (xLabelChanged) {
$scope.tasksBarChart.options.data = data;
} else {
$scope.tasksBarChart.data = data;
}
}
}])
;