blob: 28d7d7e536e9abc7a569dbd50d97bf0b93acd9b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
angular.module('org.apache.gearpump.models', [])
/** TODO: to be absorbed as scalajs */
.factory('models', ['$timeout', 'conf', 'restapi', 'locator', 'StreamingAppDag', 'Metrics',
function ($timeout, conf, restapi, locator, StreamingAppDag, Metrics) {
'use strict';
var util = {
usage: function (current, total) {
return total > 0 ? 100 * current / total : 0;
},
getOrCreate: function (obj, prop, init) {
if (!obj.hasOwnProperty(prop)) {
obj[prop] = init;
}
return obj[prop];
},
parseIntFromQueryPathTail: function (path) {
return Number(_.last(path.split('.')).replace(/[^0-9]/g, ''));
}
};
/**
* Retrieves a model from backend as a promise.
* The resolved object will have two special methods.
* `$subscribe` - watch model changes within a scope.
* `$data` - return pure model data without these two methods.
*/
function get(path, decodeFn, args) {
args = args || {};
return restapi.get(path).then(function (response) {
var oldModel;
var model = decodeFn(response.data, args);
model.$subscribe = function (scope, onData, onError) {
restapi.subscribe(args.pathOverride || path, scope, function (data) {
try {
var newModel = decodeFn(data, args);
if (!_.isEqual(newModel, oldModel)) {
oldModel = newModel;
return onData(newModel);
}
} catch (ex) {
if (angular.isFunction(onError)) {
return onError(data);
}
}
}, args.period);
};
model.$data = function () {
return _.omit(model, _.isFunction);
};
return model;
});
}
var decoder = {
_asAssociativeArray: function (objs, decodeFn, keyName) {
var result = {};
_.map(objs, function (obj) {
var model = decodeFn(obj);
var key = model[keyName];
result[key] = model;
});
return result;
},
_akkaAddr: function (actorPath) {
return actorPath
.split('@')[1]
.split('/')[0];
},
_jvm: function (s) {
var tuple = s.split('@');
return {
pid: tuple[0],
hostname: tuple[1]
};
},
/** Do the necessary deserialization. */
master: function (wrapper) {
var obj = wrapper.masterDescription;
angular.merge(obj, {
// upickle conversion
cluster: _.map(obj.cluster, function (node) {
return node.host + ":" + node.port;
}),
jvm: decoder._jvm(obj.jvmName),
leader: obj.leader.host + ":" + obj.leader.port,
// extra properties/methods
isHealthy: obj.masterStatus === 'synced',
configLink: restapi.masterConfigLink()
});
return obj;
},
partitioners: function (wrapper) {
return wrapper.partitioners;
},
workers: function (objs) {
return decoder._asAssociativeArray(objs, decoder.worker, 'workerId');
},
worker: function (obj) {
var slotsUsed = obj.totalSlots - obj.availableSlots;
return angular.merge(obj, {
// extra properties
jvm: decoder._jvm(obj.jvmName),
akkaAddr: decoder._akkaAddr(obj.actorPath),
isRunning: obj.state === 'active',
slots: {
usage: util.usage(slotsUsed, obj.totalSlots),
used: slotsUsed,
total: obj.totalSlots
},
// extra methods
pageUrl: locator.worker(obj.workerId),
configLink: restapi.workerConfigLink(obj.workerId)
});
},
supervisor: function (obj) {
return obj;
},
apps: function (wrapper) {
var objs = wrapper.appMasters;
return decoder._asAssociativeArray(objs, decoder.appSummary, 'appId');
},
appSummary: function (obj) {
// todo: add `type` field to summary and detailed app response
angular.merge(obj, {
type: 'streaming'
});
return angular.merge(obj, {
// extra properties
isRunning: obj.status === 'active',
isDead: !(obj.status === 'active' || obj.status === 'pending'),
akkaAddr: decoder._akkaAddr(obj.appMasterPath),
// extra methods
pageUrl: locator.app(obj.appId, obj.type),
configLink: restapi.appConfigLink(obj.appId),
terminate: function () {
return restapi.killApp(obj.appId);
},
restart: function () {
return restapi.restartAppAsync(obj.appId);
}
});
},
app: function (obj) {
// todo: add `type` field to summary and detailed app response
angular.merge(obj, {
status: 'active',
type: obj.hasOwnProperty('dag') ? 'streaming' : ''
});
if (obj.hasOwnProperty('clock') && !moment(Number(obj.clock)).isValid()) {
console.warn({message: 'invalid application clock', clock: obj.clock});
delete obj.clock;
} else {
obj.clock = Number(obj.clock);
}
// upickle conversion 1: streaming app related decoding
obj.processors = _.zipObject(obj.processors);
_.forEach(obj.processors, function (processor) {
// add an active property
var active = true;
var replaced = false;
if (angular.isNumber(obj.clock) && processor.hasOwnProperty('life')) {
var life = processor.life;
if (life.hasOwnProperty('death') && obj.clock > Number(life.death)) {
active = false;
replaced = Number(life.death) === 0; // caution: death will be set to 0 after processor getting replaced!
}
}
processor.active = active;
processor.replaced = replaced;
});
_.forEach(_.zipObject(obj.processorLevels), function (hierarchy, processorId) {
obj.processors[processorId].hierarchy = hierarchy;
});
delete obj.processorLevels;
if (obj.dag && Array.isArray(obj.dag.edgeList)) {
var edges = {};
_.forEach(obj.dag.edgeList, function (tuple) {
var from = parseInt(tuple[0]);
var to = parseInt(tuple[2]);
var partitionerClass = tuple[1];
edges[from + '_' + to] = {
from: from,
to: to,
partitioner: partitionerClass
};
});
obj.dag.edgeList = edges;
}
// upickle conversion 2a: convert array to dictionary
obj.executors = _.object(_.map(obj.executors, 'executorId'), obj.executors);
// upickle conversion 2b: add extra executor properties and methods
_.forEach(obj.executors, function (executor) {
angular.merge(executor, {
isRunning: executor.status === 'active',
pageUrl: locator.executor(obj.appId, obj.type, executor.executorId),
workerPageUrl: locator.worker(executor.workerId)
});
});
// upickle conversion 2c: task count is executor specific property for streaming app
_.forEach(obj.processors, function (processor) {
var taskCountLookup = _.zipObject(processor.taskCount);
// Backend returns executor ids, but names as `executor`. We change them to real executors.
processor.executors = _.map(processor.executors, function (executorId) {
var executor = obj.executors[executorId];
var processorExecutor = angular.copy(executor); // The task count is for particular processor, so we make a copy
processorExecutor.taskCount = taskCountLookup[executorId].count;
// Update global executor task count by the way
executor.taskCount = (executor.taskCount || 0) + processorExecutor.taskCount;
return processorExecutor;
});
});
angular.merge(obj, {
// extra properties
isRunning: true, // todo: handle empty response, which is the case application is stopped
// extra methods
pageUrl: locator.app(obj.appId, obj.type),
configLink: restapi.appConfigLink(obj.appId),
terminate: function () {
return restapi.killApp(obj.appId);
}
});
return obj;
},
appExecutor: function (obj) {
return angular.merge(obj, {
// extra properties and methods
jvm: decoder._jvm(obj.jvmName),
workerPageUrl: locator.worker(obj.workerId),
configLink: restapi.appExecutorConfigLink(obj.appId, obj.id)
});
},
/** Return a map. the key is processor id, the value is an array of its stalling tasks */
appStallingTasks: function (wrapper) {
var result = _.groupBy(wrapper.tasks, 'processorId');
_.forEach(result, function (processor, processorId) {
result[processorId] = _.map(processor, 'index');
});
return result;
},
/** Return an array of application alerts */
appAlerts: function (obj) {
if (obj.time > 0) {
return [{
severity: 'error',
time: Number(obj.time),
message: obj.error
}];
}
return [];
},
metrics: function (wrapper, args) {
var metrics = decoder._metricsGroups(wrapper);
// Reduce nested array by one level, if we want to filter particular search path.
if (args.filterPath) {
decoder._removeUnrelatedMetricsFrom2dArray(metrics, args.filterPath);
}
return metrics;
},
appMetrics: function (wrapper, args) {
var metrics = decoder.metrics(wrapper, args);
return _.mapValues(metrics, function (values) {
return _.transform(values, function (result, metrics, path) {
var id = util.parseIntFromQueryPathTail(path);
result[id] = metrics;
});
});
},
appTaskLatestMetricValues: function (wrapper, args) {
var metrics = decoder.metrics(wrapper, args);
return _.mapValues(metrics, function (values) {
return _.transform(values, function (result, metrics, path) {
var id = util.parseIntFromQueryPathTail(path);
result[id] = _.last(metrics).values;
});
});
},
/**
* Note that it returns a 2d associative array.
* The 1st level key is the metric class (e.g. memory.total.used)
* The 2nd level key is the object path (e.g. master or app0.processor0)
* The value is an array of metrics, which are sorted by time.
*/
_metricsGroups: function (wrapper) {
var result = {};
_.forEach(wrapper.metrics, function (obj) {
var metric = Metrics.$auto(obj);
if (metric) {
var metricsGroup = util.getOrCreate(result, metric.meta.name, {});
var metricSeries = util.getOrCreate(metricsGroup, metric.meta.path, {});
delete metric.meta; // As meta is in the keys, we don't need it in every metric.
metricSeries[metric.time] = metric;
}
});
// Remove duplicates and sort metrics by time defensively
// https://github.com/gearpump/gearpump/issues/1385
_.forEach(result, function (metricsGroup) {
_.forEach(metricsGroup, function (metricSeries, path) {
metricsGroup[path] = _.sortBy(metricSeries, 'time');
});
});
return result;
},
/** Remove related metrics paths and change the given 2d array to 1d. */
_removeUnrelatedMetricsFrom2dArray: function (metrics, filterPath) {
_.forEach(metrics, function (metricsGroup, name) {
if (metricsGroup.hasOwnProperty(filterPath)) {
metrics[name] = metricsGroup[filterPath];
} else {
delete metrics[name];
}
});
}
};
var getter = {
master: function () {
return get('master',
decoder.master);
},
masterMetrics: function (updateInterval) {
return getter._masterMetrics({period: updateInterval});
},
masterHistMetrics: function () {
return getter._masterMetrics({all: true});
},
_masterMetrics: function (args) {
return getter._metrics('master/metrics/', 'master', args);
},
partitioners: function () {
return get('master/partitioners',
decoder.partitioners);
},
workers: function () {
return get('master/workerlist',
decoder.workers);
},
worker: function (workerId) {
return get('worker/' + workerId,
decoder.worker);
},
workerMetrics: function (workerId, updateInterval) {
return getter._workerMetrics(workerId, {period: updateInterval});
},
workerHistMetrics: function (workerId) {
return getter._workerMetrics(workerId, {all: true});
},
_workerMetrics: function (workerId, args) {
return getter._metrics('worker/' + workerId + '/metrics/', 'worker' + workerId, args);
},
supervisor: function () {
return get('supervisor',
decoder.supervisor);
},
apps: function () {
return get('master/applist',
decoder.apps);
},
app: function (appId) {
return get('appmaster/' + appId + '?detail=true',
decoder.app);
},
/** Note that executor related metrics will be excluded. */
appMetrics: function (appId, updateInterval) {
return getter._appMetrics(appId, {period: updateInterval});
},
appHistMetrics: function (appId) {
return getter._appMetrics(appId, {all: true});
},
appLatestMetrics: function (appId) {
return getter._appMetrics(appId, {all: 'latest'});
},
_appMetrics: function (appId, args) {
args.aggregator = 'org.apache.gearpump.streaming.metrics.ProcessorAggregator';
args.decoder = decoder.appMetrics;
return getter._metrics('appmaster/' + appId + '/metrics/app' + appId, '', args);
},
appTaskLatestMetricValues: function (appId, processorId, metricName, range) {
var taskRangeArgs = range && range.hasOwnProperty('start') ?
'&startTask=' + range.start + '&endTask=' + (range.stop + 1) : '';
var args = {
all: 'latest',
aggregator: 'org.apache.gearpump.streaming.metrics.TaskFilterAggregator' +
'&startProcessor=' + processorId + '&endProcessor=' + (processorId + 1) + taskRangeArgs,
decoder: decoder.appTaskLatestMetricValues
};
metricName = metricName ? ':' + metricName : '';
return getter._metrics('appmaster/' + appId + '/metrics/app' + appId +
'.processor' + processorId + '.*' + metricName, '', args);
},
appExecutor: function (appId, executorId) {
return get('appmaster/' + appId + '/executor/' + executorId,
decoder.appExecutor);
},
appExecutorMetrics: function (appId, executorId, updateInterval) {
return getter._appExecutorMetrics(appId, executorId, {period: updateInterval});
},
appExecutorHistMetrics: function (appId, executorId) {
return getter._appExecutorMetrics(appId, executorId, {all: true});
},
_appExecutorMetrics: function (appId, executorId, args) {
return getter._metrics(
'appmaster/' + appId + '/metrics/', 'app' + appId + '.executor' + executorId, args);
},
appStallingTasks: function (appId) {
return get('appmaster/' + appId + '/stallingtasks',
decoder.appStallingTasks);
},
appAlerts: function (appId) {
return get('appmaster/' + appId + '/errors',
decoder.appAlerts);
},
_metrics: function (pathPrefix, path, args) {
args = args || {};
var aggregatorArg = angular.isString(args.aggregator) ?
('&aggregator=' + args.aggregator) : '';
args.pathOverride = pathPrefix + path + '?readOption=readLatest' + aggregatorArg;
args.filterPath = path;
var readOption = args.all === true ? 'readHistory' :
(args.all === 'latest' ? 'readLatest' : 'readRecent');
return get(pathPrefix + path + '?readOption=' + readOption + aggregatorArg,
args.decoder || decoder.metrics, args
);
}
};
return {
$get: getter,
/** Attempts to get model and then subscribe changes as long as the scope is valid. */
$subscribe: function (scope, getModelFn, onData, period) {
var shouldCancel = false;
var promise;
scope.$on('$destroy', function () {
shouldCancel = true;
$timeout.cancel(promise);
});
function trySubscribe() {
if (shouldCancel) {
return;
}
getModelFn().then(function (data) {
return onData(data);
}, /*onerror=*/function () {
promise = $timeout(trySubscribe, period || conf.restapiQueryInterval);
});
}
trySubscribe();
},
// TODO: scalajs should return a app.details object with dag, if it is a streaming application.
createDag: function (clock, processors, edges) {
var dag = new StreamingAppDag(clock, processors, edges);
dag.replaceProcessor = restapi.replaceDagProcessor;
return dag;
},
/** Submit a DAG along with jar files */
submitDag: function (files, dag, onComplete) {
if (Object.keys(files).length !== 1) {
return onComplete({success: false, message: 'One jar file is expected'});
}
files = _.values(files)[0]; // todo: only one file can be uploaded once (issue 1450)
return restapi.uploadJars(files, function (response) {
if (!response.success) {
return onComplete(response);
}
// todo: cannot set jar for individual processor
angular.forEach(dag.processors, function (elem) {
elem[1].jar = response.files;
});
return restapi.submitDag(dag, function (response) {
return onComplete(response);
});
});
},
DAG_DEATH_UNSPECIFIED: '9223372036854775807' /* Long.max */
};
}])
;