| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| 'use strict'; |
| |
| const uuid = require('uuid/v4'); |
| |
| const fs = require('fs'); |
| const path = require('path'); |
| const JSZip = require('jszip'); |
| const socketio = require('socket.io'); |
| const _ = require('lodash'); |
| |
| // Fire me up! |
| |
| /** |
| * Module interaction with agents. |
| */ |
| module.exports = { |
| implements: 'agents-handler', |
| inject: ['settings', 'mongo', 'agent-socket'] |
| }; |
| |
| /** |
| * @param settings |
| * @param mongo |
| * @param {AgentSocket} AgentSocket |
| * @returns {AgentsHandler} |
| */ |
| module.exports.factory = function(settings, mongo, AgentSocket) { |
| class AgentSockets { |
| constructor() { |
| /** |
| * @type {Map.<String, Array.<String>>} |
| */ |
| this.sockets = new Map(); |
| } |
| |
| get(account) { |
| let sockets = this.sockets.get(account._id.toString()); |
| |
| if (_.isEmpty(sockets)) |
| this.sockets.set(account._id.toString(), sockets = []); |
| |
| return sockets; |
| } |
| |
| /** |
| * @param {AgentSocket} sock |
| * @param {String} account |
| * @return {Array.<AgentSocket>} |
| */ |
| add(account, sock) { |
| const sockets = this.get(account); |
| |
| sockets.push(sock); |
| } |
| |
| /** |
| * @param {Socket} browserSocket |
| * @return {AgentSocket} |
| */ |
| find(browserSocket) { |
| const {_id} = browserSocket.request.user; |
| |
| const sockets = this.sockets.get(_id); |
| |
| return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket)); |
| } |
| } |
| |
| class Cluster { |
| constructor(top) { |
| const clusterName = top.clusterName; |
| |
| this.id = _.isEmpty(top.clusterId) ? uuid() : top.clusterId; |
| this.name = _.isEmpty(clusterName) ? `Cluster ${this.id.substring(0, 8).toUpperCase()}` : clusterName; |
| this.nids = top.nids; |
| this.addresses = top.addresses; |
| this.clients = top.clients; |
| this.clusterVersion = top.clusterVersion; |
| this.active = top.active; |
| this.secured = top.secured; |
| } |
| |
| isSameCluster(top) { |
| return _.intersection(this.nids, top.nids).length > 0; |
| } |
| |
| update(top) { |
| this.clusterVersion = top.clusterVersion; |
| this.nids = top.nids; |
| this.addresses = top.addresses; |
| this.clients = top.clients; |
| this.clusterVersion = top.clusterVersion; |
| this.active = top.active; |
| this.secured = top.secured; |
| } |
| |
| same(top) { |
| return _.difference(this.nids, top.nids).length === 0 && |
| _.isEqual(this.addresses, top.addresses) && |
| this.clusterVersion === top.clusterVersion && |
| this.active === top.active; |
| } |
| } |
| |
| /** |
| * Connected agents manager. |
| */ |
| class AgentsHandler { |
| /** |
| * @constructor |
| */ |
| constructor() { |
| /** |
| * Connected agents. |
| * @type {AgentSockets} |
| */ |
| this._agentSockets = new AgentSockets(); |
| |
| this.clusters = []; |
| this.topLsnrs = []; |
| } |
| |
| /** |
| * Collect supported agents list. |
| * @private |
| */ |
| _collectSupportedAgents() { |
| const jarFilter = (file) => path.extname(file) === '.jar'; |
| |
| const agentArchives = fs.readdirSync(settings.agent.dists) |
| .filter((file) => path.extname(file) === '.zip'); |
| |
| const agentsPromises = _.map(agentArchives, (fileName) => { |
| const filePath = path.join(settings.agent.dists, fileName); |
| |
| return JSZip.loadAsync(fs.readFileSync(filePath)) |
| .then((zip) => { |
| const jarPath = _.find(_.keys(zip.files), jarFilter); |
| |
| return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer')) |
| .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string')) |
| .then((lines) => |
| _.reduce(lines.split(/\s*\n+\s*/), (acc, line) => { |
| if (!_.isEmpty(line)) { |
| const arr = line.split(/\s*:\s*/); |
| |
| acc[arr[0]] = arr[1]; |
| } |
| |
| return acc; |
| }, {})) |
| .then((manifest) => { |
| const ver = manifest['Implementation-Version']; |
| const buildTime = manifest['Build-Time']; |
| |
| if (ver && buildTime) |
| return { fileName, filePath, ver, buildTime }; |
| }); |
| }); |
| }); |
| |
| return Promise.all(agentsPromises) |
| .then((descs) => { |
| const agentDescs = _.keyBy(_.remove(descs, null), 'ver'); |
| |
| const latestVer = _.head(Object.keys(agentDescs).sort((a, b) => { |
| const aParts = a.split('.'); |
| const bParts = b.split('.'); |
| |
| for (let i = 0; i < aParts.length; ++i) { |
| if (aParts[i] !== bParts[i]) |
| return aParts[i] < bParts[i] ? 1 : -1; |
| } |
| |
| if (aParts.length === bParts.length) |
| return 0; |
| |
| return aParts.length < bParts.length ? 1 : -1; |
| })); |
| |
| // Latest version of agent distribution. |
| if (latestVer) |
| agentDescs.current = agentDescs[latestVer]; |
| |
| return agentDescs; |
| }); |
| } |
| |
| getOrCreateCluster(top) { |
| let cluster = _.find(this.clusters, (c) => c.isSameCluster(top)); |
| |
| if (_.isNil(cluster)) { |
| cluster = new Cluster(top); |
| |
| this.clusters.push(cluster); |
| } |
| |
| return cluster; |
| } |
| |
| /** |
| * Add topology listener. |
| * |
| * @param lsnr |
| */ |
| addTopologyListener(lsnr) { |
| this.topLsnrs.push(lsnr); |
| } |
| |
| /** |
| * Link agent with browsers by account. |
| * |
| * @param {Socket} sock |
| * @param {Array.<mongo.Account>} accounts |
| * @param {Array.<String>} tokens |
| * @param {boolean} demoEnabled |
| * |
| * @private |
| */ |
| onConnect(sock, accounts, tokens, demoEnabled) { |
| const agentSocket = new AgentSocket(sock, accounts, tokens, demoEnabled); |
| |
| _.forEach(accounts, (account) => { |
| this._agentSockets.add(account, agentSocket); |
| |
| this._browsersHnd.agentStats(account); |
| }); |
| |
| sock.on('disconnect', () => { |
| _.forEach(accounts, (account) => { |
| _.pull(this._agentSockets.get(account), agentSocket); |
| |
| this._browsersHnd.agentStats(account); |
| }); |
| }); |
| |
| sock.on('cluster:topology', (top) => { |
| if (_.isNil(top)) { |
| console.log('Invalid format of message: "cluster:topology"'); |
| |
| return; |
| } |
| |
| const cluster = this.getOrCreateCluster(top); |
| |
| _.forEach(this.topLsnrs, (lsnr) => lsnr(agentSocket, cluster, top)); |
| |
| if (agentSocket.cluster !== cluster) { |
| agentSocket.cluster = cluster; |
| |
| _.forEach(accounts, (account) => { |
| this._browsersHnd.agentStats(account); |
| }); |
| } |
| else { |
| const changed = !cluster.same(top); |
| |
| if (changed) { |
| cluster.update(top); |
| |
| _.forEach(accounts, (account) => { |
| this._browsersHnd.clusterChanged(account, cluster); |
| }); |
| } |
| } |
| }); |
| |
| sock.on('cluster:disconnected', () => { |
| const newTop = _.assign({}, agentSocket.cluster, {nids: []}); |
| |
| _.forEach(this.topLsnrs, (lsnr) => lsnr(agentSocket, agentSocket.cluster, newTop)); |
| |
| agentSocket.cluster = null; |
| |
| _.forEach(accounts, (account) => { |
| this._browsersHnd.agentStats(account); |
| }); |
| }); |
| |
| return agentSocket; |
| } |
| |
| getAccounts(tokens) { |
| return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() |
| .then((accounts) => ({accounts, activeTokens: _.uniq(_.map(accounts, 'token'))})); |
| } |
| |
| /** |
| * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler. |
| * @param {BrowsersHandler} browsersHnd |
| */ |
| attach(srv, browsersHnd) { |
| this._browsersHnd = browsersHnd; |
| |
| this._collectSupportedAgents() |
| .then((supportedAgents) => { |
| this.currentAgent = _.get(supportedAgents, 'current'); |
| |
| if (this.io) |
| throw 'Agent server already started!'; |
| |
| this.io = socketio(srv, {path: '/agents'}); |
| |
| this.io.on('connection', (sock) => { |
| const sockId = sock.id; |
| |
| console.log('Connected agent with socketId: ', sockId); |
| |
| sock.on('disconnect', (reason) => { |
| console.log(`Agent disconnected with [socketId=${sockId}, reason=${reason}]`); |
| }); |
| |
| sock.on('agent:auth', ({ver, bt, tokens, disableDemo} = {}, cb) => { |
| console.log(`Received authentication request [socketId=${sockId}, tokens=${tokens}, ver=${ver}].`); |
| |
| if (_.isEmpty(tokens)) |
| return cb('Tokens not set. Please reload agent archive or check settings'); |
| |
| if (ver && bt && !_.isEmpty(supportedAgents)) { |
| const btDistr = _.get(supportedAgents, [ver, 'buildTime']); |
| |
| if (_.isEmpty(btDistr) || btDistr !== bt) |
| return cb('You are using an older version of the agent. Please reload agent'); |
| } |
| |
| return this.getAccounts(tokens) |
| .then(({accounts, activeTokens}) => { |
| if (_.isEmpty(activeTokens)) |
| return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`); |
| |
| cb(null, activeTokens); |
| |
| return this.onConnect(sock, accounts, activeTokens, !disableDemo); |
| }) |
| // TODO IGNITE-1379 send error to web master. |
| .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`)); |
| }); |
| }); |
| }) |
| .catch(() => { |
| console.log('Failed to collect supported agents'); |
| }); |
| } |
| |
| agent(account, demo, clusterId) { |
| if (!this.io) |
| return Promise.reject(new Error('Agent server not started yet!')); |
| |
| const socks = this._agentSockets.get(account); |
| |
| if (_.isEmpty(socks)) |
| return Promise.reject(new Error('Failed to find connected agent for this account')); |
| |
| if (demo) { |
| const sock = _.find(socks, (sock) => sock.demo.enabled); |
| |
| if (sock) |
| return Promise.resolve(sock); |
| |
| return Promise.reject(new Error('Demo mode disabled by administrator')); |
| } |
| |
| if (_.isNil(clusterId)) |
| return Promise.resolve(_.head(socks)); |
| |
| const sock = _.find(socks, (agentSock) => _.get(agentSock, 'cluster.id') === clusterId); |
| |
| if (_.isEmpty(sock)) |
| return Promise.reject(new Error('Failed to find agent connected to cluster')); |
| |
| return Promise.resolve(sock); |
| } |
| |
| agents(account) { |
| if (!this.io) |
| return Promise.reject(new Error('Agent server not started yet!')); |
| |
| const socks = this._agentSockets.get(account); |
| |
| if (_.isEmpty(socks)) |
| return Promise.reject(new Error('Failed to find connected agent for this token')); |
| |
| return Promise.resolve(socks); |
| } |
| |
| /** |
| * Try stop agent for token if not used by others. |
| * |
| * @param {mongo.Account} account |
| */ |
| onTokenReset(account) { |
| if (_.isNil(this.io)) |
| return; |
| |
| const agentSockets = this._agentSockets.get(account); |
| |
| _.forEach(agentSockets, (sock) => sock.resetToken(account.token)); |
| } |
| } |
| |
| return new AgentsHandler(); |
| }; |