blob: 866858b0941601718e4dd29b1da12752961d5e92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/
angular.module('dashboard')
/** TODO: refactoring work required */
.factory('restapi', ['$q', '$http', '$timeout', '$modal', 'Upload', 'conf', 'HealthCheckService',
function ($q, $http, $timeout, $modal, Upload, conf, HealthCheckService) {
'use strict';
function decodeSuccessResponse(data) {
return angular.merge({
success: true
}, (data || {}));
}
function decodeErrorResponse(data) {
var errorMessage = '';
var stackTrace = [];
var lines = (data || '').split('\n');
if (lines.length) {
errorMessage = lines[0].replace(', error summary:', '');
stackTrace = lines.slice(1);
}
return {success: false, error: errorMessage, stackTrace: stackTrace};
}
var restapiV1Root = conf.restapiRoot + 'api/' + conf.restapiProtocol + '/';
var self = {
/**
* Retrieve data from rest service endpoint (HTTP GET) periodically in an angular scope.
*/
subscribe: function (path, scope, onData, interval) {
var timeoutPromise;
var shouldCancel = false;
scope.$on('$destroy', function () {
shouldCancel = true;
$timeout.cancel(timeoutPromise);
});
interval = interval || conf.restapiQueryInterval;
var fn = function () {
var promise = self.get(path);
promise.then(function (response) {
if (!shouldCancel && angular.isFunction(onData)) {
shouldCancel = onData(response.data);
}
}, function (response) {
if (!shouldCancel && angular.isFunction(onData)) {
shouldCancel = onData(response.data);
}
})
.finally(function () {
if (!shouldCancel) {
timeoutPromise = $timeout(fn, interval);
}
});
};
timeoutPromise = $timeout(fn, interval);
},
/**
* Query model from service endpoint and return a promise.
* Note that if operation is failed, it will return the failure after a default timeout. If
* health check indicates the service is unavailable, no request will be sent to server, just
* simple return a failure after a default timeout.
*/
get: function (path) {
if (!HealthCheckService.isServiceAvailable()) {
var deferred = $q.defer();
_.delay(deferred.reject, conf.restapiQueryTimeout);
return deferred.promise;
}
return $http.get(restapiV1Root + path, {timeout: conf.restapiQueryTimeout});
},
/** Get data from server periodically until an user cancellation or scope exit. */
repeatUntil: function (url, scope, onData) {
// TODO: Once `subscribe` is turned to websocket push model, there is no need to have this method
this.subscribe(url, scope,
function (data) {
return !onData || onData(data);
});
},
/** Kill a running application */
killApp: function (appId) {
var url = restapiV1Root + 'appmaster/' + appId;
return $http.delete(url);
},
/** Restart a running application and return a promise */
restartAppAsync: function (appId) {
var url = restapiV1Root + 'appmaster/' + appId + '/restart';
return $http.post(url);
},
/** Return the config link of an application */
appConfigLink: function (appId) {
return restapiV1Root + 'appmaster/' + appId + '/config';
},
/** Return the config link of an application */
appExecutorConfigLink: function (appId, executorId) {
return restapiV1Root + 'appmaster/' + appId + '/executor/' + executorId + '/config';
},
/** Return the config link of a worker */
workerConfigLink: function (workerId) {
return restapiV1Root + 'worker/' + workerId + '/config';
},
/** Return the config link of the master */
masterConfigLink: function () {
return restapiV1Root + 'master/config';
},
/** Submit an user defined application with user configuration */
submitUserApp: function (files, fileFieldNames, executorNum, args, onComplete) {
return self._submitApp(restapiV1Root + 'master/submitapp',
files, fileFieldNames, executorNum, args, onComplete);
},
/** Submit a Storm application */
submitStormApp: function (files, formFormNames, executorNum, args, onComplete) {
return self._submitApp(restapiV1Root + 'master/submitstormapp',
files, formFormNames, executorNum, args, onComplete);
},
_submitApp: function (url, files, fileFieldNames, executorNum, args, onComplete) {
var upload = Upload.upload({
url: url,
method: 'POST',
file: files,
fileFormDataName: fileFieldNames,
fields: {
"executorcount": executorNum,
"args": args
}
});
upload.then(function (response) {
if (onComplete) {
var data = response.data;
onComplete({success: data && data.success});
}
}, function (response) {
if (onComplete) {
onComplete(decodeErrorResponse(response.data));
}
});
},
/** Submit an user defined application with user configuration */
submitDag: function (args, onComplete) {
var url = restapiV1Root + 'master/submitdag';
return $http.post(url, args).then(function (response) {
if (onComplete) {
onComplete(decodeSuccessResponse(response.data));
}
}, function (response) {
if (onComplete) {
onComplete(decodeErrorResponse(response.data));
}
});
},
/** Upload a set of JAR files */
uploadJars: function (files, onComplete) {
var upload = Upload.upload({
url: restapiV1Root + 'master/uploadjar',
method: 'POST',
file: files,
fileFormDataName: 'jar'
});
upload.then(function (response) {
if (onComplete) {
onComplete(decodeSuccessResponse({files: response.data}));
}
}, function (response) {
if (onComplete) {
onComplete(decodeErrorResponse(response.data));
}
});
},
/** Add a new worker */
addWorker: function (onComplete) {
var count = 1;
var url = restapiV1Root + 'supervisor/addworker/' + count;
return $http.post(url).then(function (response) {
if (angular.isFunction(onComplete)) {
onComplete(decodeSuccessResponse(response.data));
}
}, function (response) {
if (angular.isFunction(onComplete)) {
onComplete(decodeErrorResponse(response.data));
}
});
},
/** Remove a new worker */
removeWorker: function (workerId, onComplete) {
var url = restapiV1Root + 'supervisor/removeworker/' + workerId;
return $http.post(url).then(function (response) {
if (angular.isFunction(onComplete)) {
onComplete(decodeSuccessResponse(response.data));
}
}, function (response) {
if (angular.isFunction(onComplete)) {
onComplete(decodeErrorResponse(response.data));
}
});
},
/** Replace a dag processor at runtime */
replaceDagProcessor: function (files, formFormNames, appId, oldProcessorId, newProcessorDescription, inheritConf, onComplete) {
var url = restapiV1Root + 'appmaster/' + appId + '/dynamicdag';
var args = {
"$type": 'org.apache.gearpump.streaming.appmaster.DagManager.ReplaceProcessor',
oldProcessorId: oldProcessorId,
newProcessorDescription: angular.merge({
id: oldProcessorId
}, newProcessorDescription),
inheritConf: inheritConf
};
url += '?args=' + encodeURIComponent(angular.toJson(args));
var promise;
var filtered = _.filter(files, function (file) {
return file;
});
if (filtered.length) {
promise = Upload.upload({
url: url,
method: 'POST',
file: filtered,
fileFormDataName: formFormNames
});
} else {
promise = $http.post(url);
}
promise.then(function () {
if (onComplete) {
onComplete({success: true});
}
}, function (response) {
if (onComplete) {
onComplete({success: false, reason: response.data});
}
});
},
/** Return the service version in onData callback */
serviceVersion: function (onData) {
return $http.get(conf.restapiRoot + 'version').then(function (response) {
if (angular.isFunction(onData)) {
onData(response.data);
}
});
}
};
return self;
}
])
;