blob: 282a366b5e8437bf033eb29cba9abce5724f9792 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const Util = require('util');
const Errors = require('../Errors');
const IgniteClient = require('../IgniteClient');
const ClientSocket = require('./ClientSocket');
const PartitionAwarenessUtils = require('./PartitionAwarenessUtils');
const BinaryUtils = require('./BinaryUtils');
const BinaryObject = require('../BinaryObject');
const ArgumentChecker = require('./ArgumentChecker');
const Logger = require('./Logger');
// Number of tries to get cache partitions info
const GET_CACHE_PARTITIONS_RETRIES = 3;
// Delay (in milliseconds) between tries to get cache partitions info
const GET_CACHE_PARTITIONS_DELAY = 100;
class Router {
constructor(onStateChanged) {
this._state = IgniteClient.STATE.DISCONNECTED;
this._onStateChanged = onStateChanged;
this._partitionAwarenessAllowed = false;
// ClientSocket instance with no node UUID
this._legacyConnection = null;
// Array of endpoints which we are not connected to. Mostly used when Partition Awareness is on
this._inactiveEndpoints = [];
/** Partition Awareness only fields */
// This flag indicates if we have at least two alive connections
this._partitionAwarenessActive = false;
// Contains the background task (promise) or null
this._backgroundConnectTask = null;
// {Node UUID -> ClientSocket instance}
this._connections = {};
// {cacheId -> CacheAffinityMap}
this._distributionMap = new Map();
this._affinityTopologyVer = null;
}
async connect(communicator, config) {
if (this._state !== IgniteClient.STATE.DISCONNECTED) {
throw new Errors.IllegalStateError(this._state);
}
// Wait for background task to stop before we move forward
await this._waitBackgroundConnect();
this._communicator = communicator;
this._config = config;
this._partitionAwarenessAllowed = config._partitionAwareness;
this._inactiveEndpoints = [...config._endpoints];
await this._connect();
}
disconnect() {
if (this._state !== IgniteClient.STATE.DISCONNECTED) {
this._changeState(IgniteClient.STATE.DISCONNECTED);
for (const socket of this._getAllConnections()) {
socket.disconnect();
}
this._cleanUp();
}
}
async send(opCode, payloadWriter, payloadReader = null, affinityHint = null) {
if (this._state !== IgniteClient.STATE.CONNECTED) {
throw new Errors.IllegalStateError(this._state);
}
if (this._partitionAwarenessActive && affinityHint) {
await this._affinitySend(opCode, payloadWriter, payloadReader, affinityHint);
}
else {
// If _partitionAwarenessActive flag is not set, we have exactly one connection
// but it can be either a legacy one or a modern one (with node UUID)
// If affinityHint has not been passed, we want to always use one socket (as long as it is alive)
// because some requests (e.g., SQL cursor-related) require to be sent to the same cluster node
await this._getAllConnections()[0].sendRequest(opCode, payloadWriter, payloadReader);
}
}
async _connect() {
const errors = new Array();
const endpoints = this._inactiveEndpoints;
const config = this._config;
const communicator = this._communicator;
const onSocketDisconnect = this._onSocketDisconnect.bind(this);
const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this);
const endpointsNum = endpoints.length;
const random = this._getRandomInt(endpointsNum);
this._changeState(IgniteClient.STATE.CONNECTING);
for (let i = 0; i < endpoints.length; i++) {
const index = (i + random) % endpointsNum;
const endpoint = endpoints[index];
try {
const socket = new ClientSocket(
endpoint, config, communicator,
onSocketDisconnect,
onAffinityTopologyChange);
await socket.connect();
Logger.logDebug(Util.format('Connected to %s', endpoint));
this._changeState(IgniteClient.STATE.CONNECTED);
this._addConnection(socket);
this._runBackgroundConnect();
return;
}
catch (err) {
Logger.logDebug(Util.format('Could not connect to %s. Error: "%s"', endpoint, err.message));
errors.push(Util.format('[%s] %s', endpoint, err.message));
}
}
const error = errors.join('; ');
this._changeState(IgniteClient.STATE.DISCONNECTED, error);
throw new Errors.IgniteClientError(error);
}
// Can be called when there are no alive connections left
async _reconnect() {
await this._waitBackgroundConnect();
await this._connect();
}
_runBackgroundConnect() {
if (this._partitionAwarenessAllowed && !this._backgroundConnectTask) {
// Only one task can be active
this._backgroundConnectTask = this._backgroundConnect();
this._backgroundConnectTask.then(() => this._backgroundConnectTask = null);
}
}
async _waitBackgroundConnect() {
if (this._backgroundConnectTask) {
await this._backgroundConnectTask;
}
}
async _backgroundConnect() {
// Local copy of _inactiveEndpoints to make sure the array is not being changed during the 'for' cycle
const endpoints = [...this._inactiveEndpoints];
const config = this._config;
const communicator = this._communicator;
const onSocketDisconnect = this._onSocketDisconnect.bind(this);
const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this);
for (const endpoint of endpoints) {
const socket = new ClientSocket(
endpoint, config, communicator,
onSocketDisconnect,
onAffinityTopologyChange);
try {
await socket.connect();
Logger.logDebug(Util.format('Connected (in background) to %s', endpoint));
// While we were waiting for socket to connect, someone could call disconnect()
if (this._state !== IgniteClient.STATE.CONNECTED) {
// If became not connected, stop this task
socket.disconnect();
return;
}
this._addConnection(socket);
}
catch (err) {
Logger.logDebug(Util.format('Could not connect (in background) to %s. Error: "%s"', endpoint, err.message));
// While we were waiting for socket to connect, someone could call disconnect()
if (this._state !== IgniteClient.STATE.CONNECTED) {
// If became not connected, stop this task
socket.disconnect();
return;
}
}
}
}
_cleanUp() {
this._legacyConnection = null;
this._inactiveEndpoints = [];
this._partitionAwarenessActive = false;
this._connections = {};
this._distributionMap = new Map();
this._affinityTopologyVer = null;
}
_getAllConnections() {
const allConnections = Object.values(this._connections);
if (this._legacyConnection) {
allConnections.push(this._legacyConnection);
}
return allConnections;
}
_addConnection(socket) {
const nodeUUID = socket.nodeUUID;
if (this._partitionAwarenessAllowed && nodeUUID) {
if (nodeUUID in this._connections) {
// This can happen if the same node has several IPs
// We will keep more fresh connection alive
this._connections[nodeUUID].disconnect();
}
this._connections[nodeUUID] = socket;
}
else {
if (this._legacyConnection) {
// We already have a legacy connection
// We will keep more fresh connection alive
this._legacyConnection.disconnect();
}
this._legacyConnection = socket;
}
// Remove the endpoint from _inactiveEndpoints
const index = this._inactiveEndpoints.indexOf(socket.endpoint);
if (index > -1) {
this._inactiveEndpoints.splice(index, 1);
}
if (!this._partitionAwarenessActive &&
this._getAllConnections().length >= 2) {
this._partitionAwarenessActive = true;
}
}
_removeConnection(socket) {
if (socket.nodeUUID in this._connections) {
delete this._connections[socket.nodeUUID];
// Add the endpoint to _inactiveEndpoints
this._inactiveEndpoints.push(socket.endpoint);
}
else if (this._legacyConnection == socket) {
this._legacyConnection = null;
// Add the endpoint to _inactiveEndpoints
this._inactiveEndpoints.push(socket.endpoint);
}
if (this._partitionAwarenessActive &&
this._getAllConnections().length < 2) {
this._partitionAwarenessActive = false;
}
}
async _onSocketDisconnect(socket, error = null) {
this._removeConnection(socket);
if (this._getAllConnections().length != 0) {
// We had more than one connection before this disconnection
this._runBackgroundConnect();
return;
}
try {
await this._reconnect();
}
catch (err) {
this._cleanUp();
}
}
/** Partition Awareness methods */
async _affinitySend(opCode, payloadWriter, payloadReader, affinityHint) {
let connection = await this._chooseConnection(affinityHint);
while (true) {
Logger.logDebug('Endpoint chosen: ' + connection.endpoint);
try {
await connection.sendRequest(opCode, payloadWriter, payloadReader);
return;
}
catch (err) {
if (!(err instanceof Errors.LostConnectionError)) {
throw err;
}
Logger.logDebug(connection.endpoint + ' is unavailable');
this._removeConnection(connection);
if (this._getAllConnections().length == 0) {
throw new Errors.LostConnectionError('Cluster is unavailable');
}
}
connection = this._getRandomConnection();
Logger.logDebug('Node has been chosen randomly');
}
}
async _chooseConnection(affinityHint) {
const cacheId = affinityHint.cacheId;
if (!this._distributionMap.has(cacheId)) {
Logger.logDebug('Distribution map does not have info for the cache ' + cacheId);
Logger.logDebug('Node has been chosen randomly');
// We are not awaiting here in order to not increase latency of requests
this._getCachePartitions(cacheId);
return this._getRandomConnection();
}
const cacheAffinityMap = this._distributionMap.get(cacheId);
const nodeId = await this._determineNodeId(cacheAffinityMap,
affinityHint.key,
affinityHint.keyType);
if (nodeId in this._connections) {
Logger.logDebug('Node has been chosen by affinity');
return this._connections[nodeId];
}
Logger.logDebug('Node has been chosen randomly');
return this._getRandomConnection();
}
async _determineNodeId(cacheAffinityMap, key, keyType) {
const partitionMap = cacheAffinityMap.partitionMapping;
if (partitionMap.size == 0) {
return null;
}
const keyAffinityMap = cacheAffinityMap.keyConfig;
const affinityKeyInfo = await this._affinityKeyInfo(key, keyType);
let affinityKey = affinityKeyInfo.key;
let affinityKeyTypeCode = affinityKeyInfo.typeCode;
if ('typeId' in affinityKeyInfo && keyAffinityMap.has(affinityKeyInfo.typeId)) {
const affinityKeyTypeId = keyAffinityMap.get(affinityKeyInfo.typeId);
if (affinityKey instanceof BinaryObject &&
affinityKey._fields.has(affinityKeyTypeId)) {
const field = affinityKey._fields.get(affinityKeyTypeId);
affinityKey = await field.getValue();
affinityKeyTypeCode = field.typeCode;
}
}
const keyHash = await BinaryUtils.hashCode(affinityKey, this._communicator, affinityKeyTypeCode);
const partition = PartitionAwarenessUtils.RendezvousAffinityFunction.calcPartition(keyHash, partitionMap.size);
Logger.logDebug('Partition = ' + partition);
const nodeId = partitionMap.get(partition);
Logger.logDebug('Node ID = ' + nodeId);
return nodeId;
}
async _affinityKeyInfo(key, keyType) {
let typeCode = BinaryUtils.getTypeCode(keyType ? keyType : BinaryUtils.calcObjectType(key));
if (typeCode == BinaryUtils.TYPE_CODE.BINARY_OBJECT) {
return {'key': key, 'typeCode': typeCode, 'typeId': key._getTypeId()};
}
if (typeCode == BinaryUtils.TYPE_CODE.COMPLEX_OBJECT) {
const binObj = await BinaryObject.fromObject(key, keyType);
typeCode = BinaryUtils.TYPE_CODE.BINARY_OBJECT;
return {'key': binObj, 'typeCode': typeCode, 'typeId': binObj._getTypeId()};
}
return {'key': key, 'typeCode': typeCode};
}
async _onAffinityTopologyChange(newVersion) {
if (!this._versionIsNewer(newVersion)) {
return;
}
Logger.logDebug('New topology version reported: ' + newVersion);
this._affinityTopologyVer = newVersion;
this._distributionMap = new Map();
this._runBackgroundConnect();
}
async _getCachePartitions(cacheId, tries = GET_CACHE_PARTITIONS_RETRIES) {
if (tries <= 0) {
return;
}
Logger.logDebug('Getting cache partitions info...');
try {
await this.send(
BinaryUtils.OPERATION.CACHE_PARTITIONS,
async (payload) => {
// We always request partition map for one cache
payload.writeInteger(1);
payload.writeInteger(cacheId);
},
this._handleCachePartitions.bind(this));
}
catch (err) {
if (err instanceof Errors.LostConnectionError) {
return;
}
// Retries in case of an error (most probably
// "Getting affinity for topology version earlier than affinity is calculated")
await this._sleep(GET_CACHE_PARTITIONS_DELAY);
this._getCachePartitions(cacheId, tries - 1);
}
}
async _handleCachePartitions(payload) {
const affinityTopologyVer = new PartitionAwarenessUtils.AffinityTopologyVersion(payload);
Logger.logDebug('Partitions info for topology version ' + affinityTopologyVer);
if (this._versionIsNewer(affinityTopologyVer)) {
this._distributionMap = new Map();
this._affinityTopologyVer = affinityTopologyVer;
Logger.logDebug('New affinity topology version: ' + affinityTopologyVer);
} else if (this._versionIsOlder(affinityTopologyVer)) {
Logger.logDebug('Topology version is outdated. Actual version: ' + this._affinityTopologyVer);
return;
}
const groupsNum = payload.readInteger();
Logger.logDebug('Partitions info for ' + groupsNum + ' cache groups received');
for (let i = 0; i < groupsNum; i++) {
const group = await PartitionAwarenessUtils.PartitionAwarenessCacheGroup.build(this._communicator, payload);
// {partition -> nodeId}
const partitionMapping = new Map();
for (const [nodeId, partitions] of group.partitionMap) {
for (const partition of partitions) {
partitionMapping.set(partition, nodeId);
}
}
for (const [cacheId, config] of group.caches) {
const cacheAffinityMap = new PartitionAwarenessUtils.CacheAffinityMap(cacheId, partitionMapping, config);
this._distributionMap.set(cacheId, cacheAffinityMap);
Logger.logDebug('Partitions info for cache: ' + cacheId);
}
}
Logger.logDebug('Got cache partitions info');
}
_getRandomConnection() {
const allConnections = this._getAllConnections();
return allConnections[this._getRandomInt(allConnections.length)];
}
_changeState(state, reason = null) {
if (Logger.debug) {
Logger.logDebug(Util.format('Router state: %s -> %s'),
this._getState(this._state),
this._getState(state));
}
if (this._state !== state) {
this._state = state;
if (this._onStateChanged) {
this._onStateChanged(state, reason);
}
}
}
_getState(state) {
switch (state) {
case IgniteClient.STATE.DISCONNECTED:
return 'DISCONNECTED';
case IgniteClient.STATE.CONNECTING:
return 'CONNECTING';
case IgniteClient.STATE.CONNECTED:
return 'CONNECTED';
default:
return 'UNKNOWN';
}
}
_versionIsNewer(version) {
return this._affinityTopologyVer === null ||
this._affinityTopologyVer.compareTo(version) < 0;
}
_versionIsOlder(version) {
return this._affinityTopologyVer !== null &&
this._affinityTopologyVer.compareTo(version) > 0;
}
// Returns a random integer between 0 and max - 1
_getRandomInt(max) {
if (max === 0) {
return 0;
}
return Math.floor(Math.random() * max);
}
_sleep(milliseconds) {
return new Promise(resolve => setTimeout(resolve, milliseconds));
}
}
module.exports = Router;