blob: 693fc04f2b12941bd700f5fae81a5dd51ccd41cb [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
import { utils } from "./utilities.js";
const { queue } = require("d3-queue");
class Topology {
constructor(connectionManager, interval) {
this.connection = connectionManager;
this.updatedActions = {};
this.changedActions = {};
this.entities = []; // which entities to request each topology update
this.entityAttribs = { connection: [] };
this._nodeInfo = {}; // info about all known nodes and entities
this.edgeList = [];
this.filtering = false; // filter out nodes that don't have connection info
this.timeout = 5000;
this.updateInterval = interval;
this._getTimer = null;
this.updating = false;
this.counter = 0;
}
addUpdatedAction(key, action) {
if (typeof action === "function") {
this.updatedActions[key] = action;
}
}
delUpdatedAction(key) {
if (key in this.updatedActions) delete this.updatedActions[key];
}
executeUpdatedActions(results) {
for (var action in this.updatedActions) {
this.updatedActions[action](results);
}
}
setUpdateEntities(entities) {
this.entities = entities;
for (var i = 0; i < entities.length; i++) {
this.entityAttribs[entities[i]] = [];
}
}
addUpdateEntities(entityAttribs) {
for (var i = 0; i < entityAttribs.length; i++) {
var entity = entityAttribs[i].entity;
this.entityAttribs[entity] = entityAttribs[i].attrs || [];
}
}
addChangedAction(key, action) {
if (typeof action === "function") {
this.changedActions[key] = action;
}
}
delChangedAction(key) {
if (key in this.changedActions) delete this.changedActions[key];
}
executeChangedActions(error) {
for (var action in this.changedActions) {
this.changedActions[action](error);
}
}
on(eventName, fn, key) {
if (eventName === "updated") {
this.addUpdatedAction(key, fn);
} else if (eventName === "changed") {
this.addChangedAction(key, fn);
}
}
unregister(eventName, key) {
if (eventName === "updated") this.delUpdatedAction(key);
}
nodeInfo() {
return this._nodeInfo;
}
edgesPerRouter() {
const perRouter = {};
for (const id in this._nodeInfo) {
if (utils.typeFromId(id) === "_topo") {
const node = this._nodeInfo[id];
perRouter[id] = [];
if (node.connection) {
const roleIndex = node.connection.attributeNames.indexOf("role");
const containerIndex = node.connection.attributeNames.indexOf("container");
if (roleIndex >= 0 && containerIndex >= 0) {
node.connection.results.forEach(result => {
if (result[roleIndex] === "edge") {
perRouter[id].push(result[containerIndex]);
}
});
}
}
}
}
return perRouter;
}
saveResults(workInfo, all) {
const changes = { newRouters: [], lostRouters: [], connections: [] };
let workSet = new Set(Object.keys(workInfo));
for (let rId in this._nodeInfo) {
if (workSet.has(rId)) {
// copy entities
for (let entity in workInfo[rId]) {
if (
!this._nodeInfo[rId][entity] ||
workInfo[rId][entity]["timestamp"] + "" >
this._nodeInfo[rId][entity]["timestamp"] + ""
) {
// check for changed number of connections
if (entity === "connection") {
const oldConnections =
this._nodeInfo && this._nodeInfo[rId] && this._nodeInfo[rId].connection
? this._nodeInfo[rId].connection.results.length
: 0;
const newConnections = workInfo[rId].connection.results.length;
if (oldConnections !== newConnections) {
changes.connections.push({
router: rId,
from: oldConnections,
to: newConnections
});
}
}
this._nodeInfo[rId][entity] = utils.copy(workInfo[rId][entity]);
}
}
// find routers that have no connection info
if (!workInfo[rId].connection) {
this._nodeInfo[rId].connection.stale = true;
}
} else if (all) {
changes.lostRouters.push(rId);
delete this._nodeInfo[rId];
}
}
// add any new routers
if (all) {
let nodeSet = new Set(Object.keys(this._nodeInfo));
for (let rId in workInfo) {
if (!nodeSet.has(rId)) {
this._nodeInfo[rId] = utils.copy(workInfo[rId]);
if (!this.edgeList.some(edgeId => edgeId === rId)) {
changes.newRouters.push(rId);
}
}
}
}
if (
changes.connections.length > 0 ||
changes.lostRouters.length > 0 ||
changes.newRouters.length > 0
) {
this.executeChangedActions(changes);
}
}
get() {
return new Promise(
function(resolve, reject) {
this.connection.sendMgmtQuery("GET-MGMT-NODES").then(
function(results) {
let routerIds = results.response;
if (Object.prototype.toString.call(routerIds) === "[object Array]") {
// if there is only one node, it will not be returned
if (routerIds.length === 0) {
var parts = this.connection.getReceiverAddress().split("/");
parts[parts.length - 1] = "$management";
routerIds.push(parts.join("/"));
}
let finish = function(workInfo) {
this.saveResults(workInfo, true);
this.onDone(this._nodeInfo);
resolve(this._nodeInfo);
};
let connectedToEdge = function(response, workInfo) {
let routerId = null;
if (response.length === 1) {
let parts = response[0].split("/");
// we are connected to an edge router
if (parts[1] === "_edge") {
// find the role:edge connection
let conn = workInfo[response[0]].connection;
if (conn) {
let roleIndex = conn.attributeNames.indexOf("role");
for (let i = 0; i < conn.results.length; i++) {
if (conn.results[i][roleIndex] === "edge") {
let container = utils.valFor(
conn.attributeNames,
conn.results[i],
"container"
);
return utils.idFromName(container, "_topo");
}
}
}
}
}
return routerId;
};
// list of router Ids to query
let allIds;
let onlyConnections;
// if we are connected to an edge router, we don't need to
// get the workInfo for the edgeList here since we will be
// calling doget() again
if (utils.typeFromId(routerIds[0]) === "_edge") {
allIds = routerIds;
onlyConnections = true;
} else {
allIds = [...routerIds, ...this.edgeList];
onlyConnections = false;
}
this.doget(allIds, onlyConnections).then(
function(workInfo) {
// test for edge case of being connected to an edge router
// get the routerId of the interior router to which we are connected
let routerId = connectedToEdge(routerIds, workInfo);
if (routerId) {
// if we are connected to an edge router, send the request for
// all routers to the edge router's interior router
this.connection.sendMgmtQuery("GET-MGMT-NODES", routerId).then(
function(results) {
let response = results.response;
if (
Object.prototype.toString.call(response) === "[object Array]"
) {
// special case of edge case:
// we are connected to an edge router that is connected to
// a router that is not connected to any other interior routers
if (response.length === 0) {
response = [routerId];
}
this.doget([...response, ...this.edgeList]).then(
function(workInfo) {
finish.call(this, workInfo);
}.bind(this)
);
}
}.bind(this)
);
} else {
finish.call(this, workInfo);
}
}.bind(this)
);
}
}.bind(this),
function(error) {
reject(error);
}
);
}.bind(this)
);
}
doget(allIds, onlyConnections) {
// dedup ids
const ids = [...new Set(allIds)];
return new Promise(resolve => {
let workInfo = {};
for (let i = 0; i < ids.length; ++i) {
workInfo[ids[i]] = {};
}
const gotResponse = (nodeName, entity, response) => {
workInfo[nodeName][entity] = response;
workInfo[nodeName][entity]["timestamp"] = new Date();
};
let q = queue(this.connection.availableQeueuDepth());
let entityAttribs = this.entityAttribs;
if (onlyConnections) {
entityAttribs = { connection: [] };
}
for (let id in workInfo) {
const type = utils.typeFromId(id);
for (let entity in entityAttribs) {
// don't bother asking for router.node info from edge routers
if (type !== "_edge" || entity !== "router.node") {
q.defer(
this.q_fetchNodeInfo.bind(this),
id,
entity,
entityAttribs[entity],
q,
gotResponse
);
}
}
}
q.await(() => {
// filter out nodes that have no connection info
if (this.filtering) {
for (var id in workInfo) {
if (!workInfo[id].connection) {
this.flux = true;
delete workInfo[id];
}
}
}
resolve(workInfo);
});
});
}
onDone(result) {
clearTimeout(this._getTimer);
if (this.updating)
this._getTimer = setTimeout(this.get.bind(this), this.updateInterval);
this.executeUpdatedActions(result);
}
startUpdating(filter, edgeList) {
return new Promise((resolve, reject) => {
if (!edgeList) edgeList = [];
if (this._getTimer) {
clearTimeout(this._getTimer);
this._getTimer = null;
}
this.updating = true;
// combine and dedup the existing edgeList and the
// passed in edgeList
this.edgeList = [...new Set([...edgeList, ...this.edgeList])];
this.filtering = filter;
this.get().then(resolve);
});
}
stopUpdating() {
this.updating = false;
if (this._getTimer) {
clearTimeout(this._getTimer);
this._getTimer = null;
}
this.removeEdgeIds();
}
removeEdgeIds() {
// remove the edge ids
this.edgeList = [];
const ids = Object.keys(this._nodeInfo);
ids.forEach(id => {
if (utils.typeFromId(id) === "_edge") {
delete this._nodeInfo[id];
}
});
}
removeTheseEdgeNames(names) {
this.edgeList = this.edgeList.filter(edge => {
const edgeName = utils.nameFromId(edge);
return !names.some(name => edgeName === name);
});
const ids = Object.keys(this._nodeInfo);
ids.forEach(id => {
if (names.some(name => name === utils.nameFromId(id))) {
delete this._nodeInfo[id];
}
});
}
fetchEntity(node, entity, attrs, callback) {
var results = {};
var gotResponse = function(nodeName, dotentity, response) {
results = response;
};
var q = queue(this.connection.availableQeueuDepth());
q.defer(this.q_fetchNodeInfo.bind(this), node, entity, attrs, q, gotResponse);
q.await(function() {
callback(node, entity, results);
});
}
// called from queue.defer so the last argument (callback) is supplied by d3
q_fetchNodeInfo(nodeId, entity, attrs, q, heartbeat, callback) {
this.getNodeInfo(nodeId, entity, attrs, q, function(nodeName, dotentity, response) {
heartbeat(nodeName, dotentity, response);
callback(null);
});
}
// get all the requested entities/attributes for specific router(s)
fetchEntities(node, entityAttribs, doneCallback, resultCallback) {
const nodes =
Object.prototype.toString.call(node) !== "[object Array]" ? [node] : node;
var q = queue(this.connection.availableQeueuDepth());
var results = {};
if (!resultCallback) {
resultCallback = function(nodeName, dotentity, response) {
if (!results[nodeName]) results[nodeName] = {};
results[nodeName][dotentity] = response;
};
}
var gotAResponse = function(nodeName, dotentity, response) {
resultCallback(nodeName, dotentity, response);
};
if (Object.prototype.toString.call(entityAttribs) !== "[object Array]") {
entityAttribs = [entityAttribs];
}
for (var n = 0; n < nodes.length; ++n) {
node = nodes[n];
for (var i = 0; i < entityAttribs.length; ++i) {
var ea = entityAttribs[i];
q.defer(
this.q_fetchNodeInfo.bind(this),
node,
ea.entity,
ea.attrs || [],
q,
gotAResponse
);
}
}
q.await(function() {
doneCallback(results);
});
}
// get all the requested entities for all known routers
fetchAllEntities(entityAttribs, doneCallback, resultCallback) {
var q = queue(this.connection.availableQeueuDepth());
var results = {};
if (!resultCallback) {
resultCallback = function(nodeName, dotentity, response) {
if (!results[nodeName]) results[nodeName] = {};
results[nodeName][dotentity] = response;
};
}
var gotAResponse = function(nodeName, dotentity, response) {
resultCallback(nodeName, dotentity, response);
};
if (Object.prototype.toString.call(entityAttribs) !== "[object Array]") {
entityAttribs = [entityAttribs];
}
var nodes = Object.keys(this._nodeInfo);
for (var n = 0; n < nodes.length; ++n) {
for (var i = 0; i < entityAttribs.length; ++i) {
var ea = entityAttribs[i];
q.defer(
this.q_fetchNodeInfo.bind(this),
nodes[n],
ea.entity,
ea.attrs || [],
q,
gotAResponse
);
}
}
q.await(function() {
doneCallback(results);
});
}
// ensure all the topology nones have all these entities
ensureAllEntities(entityAttribs, callback, extra) {
this.ensureEntities(Object.keys(this._nodeInfo), entityAttribs, callback, extra);
}
// ensure these nodes have all these entities. don't fetch unless forced to
ensureEntities(nodes, entityAttribs, callback, extra) {
// make sure nodes is an array
if (Object.prototype.toString.call(nodes) !== "[object Array]") {
nodes = [nodes];
}
// make sure entityAttribs is an array
if (Object.prototype.toString.call(entityAttribs) !== "[object Array]") {
entityAttribs = [entityAttribs];
}
// if all the requested nodes are already in the latest results
if (entityAttribs.every(eattr => eattr.entity in this.entityAttribs)) {
// only return the info for the requested nodes
const results = {};
nodes.forEach(node => {
if (node in this._nodeInfo) {
results[node] = this._nodeInfo[node];
}
});
callback(extra, results);
} else {
// we need to add the entities to the list that will be fetched each update
this.addUpdateEntities(entityAttribs);
// and then get all the entities from the requested nodes
this.doget(nodes).then(results => {
this.saveResults(results, false);
callback(extra, results);
});
}
}
addNodeInfo(id, entity, values) {
// save the results in the nodeInfo object
if (id) {
if (!(id in this._nodeInfo)) {
this._nodeInfo[id] = {};
}
// copy the values to allow garbage collection
this._nodeInfo[id][entity] = values;
this._nodeInfo[id][entity]["timestamp"] = new Date();
}
}
isLargeNetwork() {
return Object.keys(this._nodeInfo).length >= 12;
}
getConnForLink(link) {
// find the connection for this link
var conns = this._nodeInfo[link.nodeId].connection;
if (!conns) return {};
var connIndex = conns.attributeNames.indexOf("identity");
var linkCons = conns.results.filter(function(conn) {
return conn[connIndex] === link.connectionId;
});
return utils.flatten(conns.attributeNames, linkCons[0]);
}
nodeNameList() {
var nl = [];
for (var id in this._nodeInfo) {
nl.push(utils.nameFromId(id));
}
return nl.sort();
}
nodeIdList() {
return Object.keys(this._nodeInfo).sort();
}
nodeList() {
var nl = [];
for (var id in this._nodeInfo) {
nl.push({
name: utils.nameFromId(id),
id: id
});
}
return nl;
}
getSingelRouterNode(nodeName, attrs) {
let node = {
id: utils.nameFromId(nodeName),
protocolVersion: 1,
instance: 0,
linkState: [],
nextHop: "(self)",
validOrigins: [],
address: nodeName,
routerLink: "",
cost: 0,
lastTopoChange: 0,
index: 0,
name: nodeName,
identity: nodeName,
type: "org.apache.qpid.dispatch.router.node"
};
let result = [];
if (attrs.length === 0) {
attrs = Object.keys(node);
}
attrs.forEach(attr => {
result.push(node[attr]);
});
return result;
}
getNodeInfo(nodeName, entity, attrs, q, callback) {
const self = this;
var timedOut = function(q) {
q.abort();
};
var atimer = setTimeout(timedOut, this.timeout, q);
this.connection.sendQuery(nodeName, entity, attrs).then(
function(response) {
clearTimeout(atimer);
if (
entity === "router.node" &&
response.response.results.length === 0 &&
Object.keys(self._nodeInfo).length === 1
) {
response.response.results = [self.getSingelRouterNode(nodeName, attrs)];
}
callback(nodeName, entity, response.response);
},
function() {
q.abort();
}
);
}
quiesceLink(nodeId, name) {
var attributes = {
adminStatus: "disabled",
name: name
};
return this.connection.sendMethod(nodeId, "router.link", attributes, "UPDATE");
}
}
export default Topology;