blob: cd9a02b9d7908b69f8bfc6f779ba3fa070798fc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const _ = require('lodash');
// Fire me up!
module.exports = {
implements: 'services/clusters',
inject: ['mongo', 'services/spaces', 'services/caches', 'services/domains', 'services/igfss', 'errors']
};
/**
* @param mongo
* @param {SpacesService} spacesService
* @param {CachesService} cachesService
* @param {DomainsService} modelsService
* @param {IgfssService} igfssService
* @param errors
* @returns {ClustersService}
*/
module.exports.factory = (mongo, spacesService, cachesService, modelsService, igfssService, errors) => {
/**
* Convert remove status operation to own presentation.
*
* @param {RemoveResult} result - The results of remove operation.
*/
const convertRemoveStatus = (result) => ({rowsAffected: result.n});
/**
* Update existing cluster.
*
* @param {Object} cluster - The cluster for updating.
* @returns {Promise.<mongo.ObjectId>} that resolves cluster id.
*/
const update = (cluster) => {
const clusterId = cluster._id;
return mongo.Cluster.updateOne({_id: clusterId}, cluster, {upsert: true}).exec()
.then(() => mongo.Cache.updateMany({_id: {$in: cluster.caches}}, {$addToSet: {clusters: clusterId}}).exec())
.then(() => mongo.Cache.updateMany({_id: {$nin: cluster.caches}}, {$pull: {clusters: clusterId}}).exec())
.then(() => mongo.Igfs.updateMany({_id: {$in: cluster.igfss}}, {$addToSet: {clusters: clusterId}}).exec())
.then(() => mongo.Igfs.updateMany({_id: {$nin: cluster.igfss}}, {$pull: {clusters: clusterId}}).exec())
.then(() => cluster)
.catch((err) => {
if (err.code === mongo.errCodes.DUPLICATE_KEY_UPDATE_ERROR || err.code === mongo.errCodes.DUPLICATE_KEY_ERROR)
throw new errors.DuplicateKeyException('Cluster with name: "' + cluster.name + '" already exist.');
else
throw err;
});
};
/**
* Create new cluster.
*
* @param {Object} cluster - The cluster for creation.
* @returns {Promise.<mongo.ObjectId>} that resolves cluster id.
*/
const create = (cluster) => {
return mongo.Cluster.create(cluster)
.catch((err) => {
if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR)
throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`);
else
throw err;
})
.then((savedCluster) =>
mongo.Cache.updateMany({_id: {$in: savedCluster.caches}}, {$addToSet: {clusters: savedCluster._id}}).exec()
.then(() => mongo.Igfs.updateMany({_id: {$in: savedCluster.igfss}}, {$addToSet: {clusters: savedCluster._id}}).exec())
.then(() => savedCluster)
);
};
/**
* Remove all caches by space ids.
*
* @param {Number[]} spaceIds - The space ids for cache deletion.
* @returns {Promise.<RemoveResult>} - that resolves results of remove operation.
*/
const removeAllBySpaces = (spaceIds) => {
return Promise.all([
mongo.DomainModel.deleteMany({space: {$in: spaceIds}}).exec(),
mongo.Cache.deleteMany({space: {$in: spaceIds}}).exec(),
mongo.Igfs.deleteMany({space: {$in: spaceIds}}).exec()
])
.then(() => mongo.Cluster.deleteMany({space: {$in: spaceIds}}).exec());
};
class ClustersService {
static shortList(userId, demo) {
return spacesService.spaceIds(userId, demo)
.then((spaceIds) => mongo.Cluster.find({space: {$in: spaceIds}}).select('name discovery.kind caches models igfss').lean().exec())
.then((clusters) => _.map(clusters, (cluster) => ({
_id: cluster._id,
name: cluster.name,
discovery: cluster.discovery.kind,
cachesCount: _.size(cluster.caches),
modelsCount: _.size(cluster.models),
igfsCount: _.size(cluster.igfss)
})));
}
static get(userId, demo, _id) {
return spacesService.spaceIds(userId, demo)
.then((spaceIds) => mongo.Cluster.findOne({space: {$in: spaceIds}, _id}).lean().exec());
}
static normalize(spaceId, cluster, ...models) {
cluster.space = spaceId;
_.forEach(models, (model) => {
_.forEach(model, (item) => {
item.space = spaceId;
item.clusters = [cluster._id];
});
});
}
static removedInCluster(oldCluster, newCluster, field) {
return _.difference(_.invokeMap(_.get(oldCluster, field), 'toString'), _.get(newCluster, field));
}
static upsertBasic(userId, demo, {cluster, caches}) {
if (_.isNil(cluster._id))
return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null'));
return spacesService.spaceIds(userId, demo)
.then((spaceIds) => {
this.normalize(_.head(spaceIds), cluster, caches);
const query = _.pick(cluster, ['space', '_id']);
const basicCluster = _.pick(cluster, [
'space',
'_id',
'name',
'discovery',
'caches',
'memoryConfiguration.memoryPolicies',
'dataStorageConfiguration.defaultDataRegionConfiguration.maxSize'
]);
return mongo.Cluster.findOneAndUpdate(query, {$set: basicCluster}, {projection: 'caches', upsert: true}).lean().exec()
.catch((err) => {
if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR)
throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`);
throw err;
})
.then((oldCluster) => {
if (oldCluster) {
const ids = this.removedInCluster(oldCluster, cluster, 'caches');
return cachesService.remove(ids);
}
cluster.caches = _.map(caches, '_id');
return mongo.Cluster.updateOne(query, {$set: cluster, new: true}, {upsert: true}).exec();
});
})
.then(() => _.map(caches, cachesService.upsertBasic))
.then(() => ({rowsAffected: 1}));
}
static upsert(userId, demo, {cluster, caches, models, igfss}) {
if (_.isNil(cluster._id))
return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null'));
return spacesService.spaceIds(userId, demo)
.then((spaceIds) => {
this.normalize(_.head(spaceIds), cluster, caches, models, igfss);
const query = _.pick(cluster, ['space', '_id']);
return mongo.Cluster.findOneAndUpdate(query, {$set: cluster}, {projection: {models: 1, caches: 1, igfss: 1}, upsert: true}).lean().exec()
.catch((err) => {
if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR)
throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`);
throw err;
})
.then((oldCluster) => {
const modelIds = this.removedInCluster(oldCluster, cluster, 'models');
const cacheIds = this.removedInCluster(oldCluster, cluster, 'caches');
const igfsIds = this.removedInCluster(oldCluster, cluster, 'igfss');
return Promise.all([modelsService.remove(modelIds), cachesService.remove(cacheIds), igfssService.remove(igfsIds)]);
});
})
.then(() => Promise.all(_.concat(_.map(models, modelsService.upsert), _.map(caches, cachesService.upsert), _.map(igfss, igfssService.upsert))))
.then(() => ({rowsAffected: 1}));
}
/**
* Create or update cluster.
*
* @param {Object} cluster - The cluster.
* @returns {Promise.<mongo.ObjectId>} that resolves cluster id of merge operation.
*/
static merge(cluster) {
if (cluster._id)
return update(cluster);
return create(cluster);
}
/**
* Get clusters and linked objects by space.
*
* @param {mongo.ObjectId|String} spaceIds The spaces ids that own clusters.
* @returns {Promise.<Array<mongo.Cluster>>} Requested clusters.
*/
static listBySpaces(spaceIds) {
return mongo.Cluster.find({space: {$in: spaceIds}}).sort('name').lean().exec();
}
/**
* Remove clusters.
*
* @param {Array.<String>|String} ids - The cluster ids for remove.
* @returns {Promise.<{rowsAffected}>} - The number of affected rows.
*/
static remove(ids) {
if (_.isNil(ids))
return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null'));
if (_.isEmpty(ids))
return Promise.resolve({rowsAffected: 0});
ids = _.castArray(ids);
return Promise.all(_.map(ids, (id) => {
return mongo.Cluster.findByIdAndRemove(id).exec()
.then((cluster) => {
if (_.isNil(cluster))
return 0;
return Promise.all([
mongo.DomainModel.deleteMany({_id: {$in: cluster.models}}).exec(),
mongo.Cache.deleteMany({_id: {$in: cluster.caches}}).exec(),
mongo.Igfs.deleteMany({_id: {$in: cluster.igfss}}).exec()
])
.then(() => 1);
});
}))
.then((res) => ({rowsAffected: _.sum(res)}));
}
/**
* Remove all clusters by user.
* @param {mongo.ObjectId|String} userId - The user id that own cluster.
* @param {Boolean} demo - The flag indicates that need lookup in demo space.
* @returns {Promise.<{rowsAffected}>} - The number of affected rows.
*/
static removeAll(userId, demo) {
return spacesService.spaceIds(userId, demo)
.then(removeAllBySpaces)
.then(convertRemoveStatus);
}
}
return ClustersService;
};