| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| 'use strict'; |
| |
| const Util = require('util'); |
| const Errors = require('../Errors'); |
| const IgniteClient = require('../IgniteClient'); |
| const ClientSocket = require('./ClientSocket'); |
| const PartitionAwarenessUtils = require('./PartitionAwarenessUtils'); |
| const BinaryUtils = require('./BinaryUtils'); |
| const BinaryObject = require('../BinaryObject'); |
| const ArgumentChecker = require('./ArgumentChecker'); |
| const Logger = require('./Logger'); |
| |
| // Number of tries to get cache partitions info |
| const GET_CACHE_PARTITIONS_RETRIES = 3; |
| // Delay (in milliseconds) between tries to get cache partitions info |
| const GET_CACHE_PARTITIONS_DELAY = 100; |
| |
| class Router { |
| |
| constructor(onStateChanged) { |
| this._state = IgniteClient.STATE.DISCONNECTED; |
| this._onStateChanged = onStateChanged; |
| |
| this._partitionAwarenessAllowed = false; |
| // ClientSocket instance with no node UUID |
| this._legacyConnection = null; |
| // Array of endpoints which we are not connected to. Mostly used when Partition Awareness is on |
| this._inactiveEndpoints = []; |
| |
| /** Partition Awareness only fields */ |
| // This flag indicates if we have at least two alive connections |
| this._partitionAwarenessActive = false; |
| // Contains the background task (promise) or null |
| this._backgroundConnectTask = null; |
| // {Node UUID -> ClientSocket instance} |
| this._connections = {}; |
| // {cacheId -> CacheAffinityMap} |
| this._distributionMap = new Map(); |
| this._affinityTopologyVer = null; |
| } |
| |
| async connect(communicator, config) { |
| if (this._state !== IgniteClient.STATE.DISCONNECTED) { |
| throw new Errors.IllegalStateError(this._state); |
| } |
| |
| // Wait for background task to stop before we move forward |
| await this._waitBackgroundConnect(); |
| |
| this._communicator = communicator; |
| this._config = config; |
| this._partitionAwarenessAllowed = config._partitionAwareness; |
| this._inactiveEndpoints = [...config._endpoints]; |
| |
| await this._connect(); |
| } |
| |
| disconnect() { |
| if (this._state !== IgniteClient.STATE.DISCONNECTED) { |
| this._changeState(IgniteClient.STATE.DISCONNECTED); |
| |
| for (const socket of this._getAllConnections()) { |
| socket.disconnect(); |
| } |
| |
| this._cleanUp(); |
| } |
| } |
| |
| async send(opCode, payloadWriter, payloadReader = null, affinityHint = null) { |
| if (this._state !== IgniteClient.STATE.CONNECTED) { |
| throw new Errors.IllegalStateError(this._state); |
| } |
| |
| if (this._partitionAwarenessActive && affinityHint) { |
| await this._affinitySend(opCode, payloadWriter, payloadReader, affinityHint); |
| } |
| else { |
| // If _partitionAwarenessActive flag is not set, we have exactly one connection |
| // but it can be either a legacy one or a modern one (with node UUID) |
| // If affinityHint has not been passed, we want to always use one socket (as long as it is alive) |
| // because some requests (e.g., SQL cursor-related) require to be sent to the same cluster node |
| await this._getAllConnections()[0].sendRequest(opCode, payloadWriter, payloadReader); |
| } |
| } |
| |
| async _connect() { |
| const errors = new Array(); |
| const endpoints = this._inactiveEndpoints; |
| const config = this._config; |
| const communicator = this._communicator; |
| const onSocketDisconnect = this._onSocketDisconnect.bind(this); |
| const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this); |
| const endpointsNum = endpoints.length; |
| const random = this._getRandomInt(endpointsNum); |
| |
| this._changeState(IgniteClient.STATE.CONNECTING); |
| |
| for (let i = 0; i < endpoints.length; i++) { |
| const index = (i + random) % endpointsNum; |
| const endpoint = endpoints[index]; |
| |
| try { |
| const socket = new ClientSocket( |
| endpoint, config, communicator, |
| onSocketDisconnect, |
| onAffinityTopologyChange); |
| |
| await socket.connect(); |
| Logger.logDebug(Util.format('Connected to %s', endpoint)); |
| this._changeState(IgniteClient.STATE.CONNECTED); |
| this._addConnection(socket); |
| |
| this._runBackgroundConnect(); |
| |
| return; |
| } |
| catch (err) { |
| Logger.logDebug(Util.format('Could not connect to %s. Error: "%s"', endpoint, err.message)); |
| errors.push(Util.format('[%s] %s', endpoint, err.message)); |
| } |
| } |
| |
| const error = errors.join('; '); |
| this._changeState(IgniteClient.STATE.DISCONNECTED, error); |
| throw new Errors.IgniteClientError(error); |
| } |
| |
| // Can be called when there are no alive connections left |
| async _reconnect() { |
| await this._waitBackgroundConnect(); |
| await this._connect(); |
| } |
| |
| _runBackgroundConnect() { |
| if (this._partitionAwarenessAllowed && !this._backgroundConnectTask) { |
| // Only one task can be active |
| this._backgroundConnectTask = this._backgroundConnect(); |
| this._backgroundConnectTask.then(() => this._backgroundConnectTask = null); |
| } |
| } |
| |
| async _waitBackgroundConnect() { |
| if (this._backgroundConnectTask) { |
| await this._backgroundConnectTask; |
| } |
| } |
| |
| async _backgroundConnect() { |
| // Local copy of _inactiveEndpoints to make sure the array is not being changed during the 'for' cycle |
| const endpoints = [...this._inactiveEndpoints]; |
| const config = this._config; |
| const communicator = this._communicator; |
| const onSocketDisconnect = this._onSocketDisconnect.bind(this); |
| const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this); |
| |
| for (const endpoint of endpoints) { |
| const socket = new ClientSocket( |
| endpoint, config, communicator, |
| onSocketDisconnect, |
| onAffinityTopologyChange); |
| |
| try { |
| await socket.connect(); |
| Logger.logDebug(Util.format('Connected (in background) to %s', endpoint)); |
| |
| // While we were waiting for socket to connect, someone could call disconnect() |
| if (this._state !== IgniteClient.STATE.CONNECTED) { |
| // If became not connected, stop this task |
| socket.disconnect(); |
| return; |
| } |
| |
| this._addConnection(socket); |
| } |
| catch (err) { |
| Logger.logDebug(Util.format('Could not connect (in background) to %s. Error: "%s"', endpoint, err.message)); |
| |
| // While we were waiting for socket to connect, someone could call disconnect() |
| if (this._state !== IgniteClient.STATE.CONNECTED) { |
| // If became not connected, stop this task |
| socket.disconnect(); |
| return; |
| } |
| } |
| } |
| } |
| |
| _cleanUp() { |
| this._legacyConnection = null; |
| this._inactiveEndpoints = []; |
| |
| this._partitionAwarenessActive = false; |
| this._connections = {}; |
| this._distributionMap = new Map(); |
| this._affinityTopologyVer = null; |
| } |
| |
| _getAllConnections() { |
| const allConnections = Object.values(this._connections); |
| |
| if (this._legacyConnection) { |
| allConnections.push(this._legacyConnection); |
| } |
| |
| return allConnections; |
| } |
| |
| _addConnection(socket) { |
| const nodeUUID = socket.nodeUUID; |
| |
| if (this._partitionAwarenessAllowed && nodeUUID) { |
| if (nodeUUID in this._connections) { |
| // This can happen if the same node has several IPs |
| // We will keep more fresh connection alive |
| this._connections[nodeUUID].disconnect(); |
| } |
| this._connections[nodeUUID] = socket; |
| } |
| else { |
| if (this._legacyConnection) { |
| // We already have a legacy connection |
| // We will keep more fresh connection alive |
| this._legacyConnection.disconnect(); |
| } |
| this._legacyConnection = socket; |
| } |
| // Remove the endpoint from _inactiveEndpoints |
| const index = this._inactiveEndpoints.indexOf(socket.endpoint); |
| if (index > -1) { |
| this._inactiveEndpoints.splice(index, 1); |
| } |
| |
| if (!this._partitionAwarenessActive && |
| this._getAllConnections().length >= 2) { |
| this._partitionAwarenessActive = true; |
| } |
| } |
| |
| _removeConnection(socket) { |
| if (socket.nodeUUID in this._connections) { |
| delete this._connections[socket.nodeUUID]; |
| // Add the endpoint to _inactiveEndpoints |
| this._inactiveEndpoints.push(socket.endpoint); |
| } |
| else if (this._legacyConnection == socket) { |
| this._legacyConnection = null; |
| // Add the endpoint to _inactiveEndpoints |
| this._inactiveEndpoints.push(socket.endpoint); |
| } |
| |
| if (this._partitionAwarenessActive && |
| this._getAllConnections().length < 2) { |
| this._partitionAwarenessActive = false; |
| } |
| } |
| |
| async _onSocketDisconnect(socket, error = null) { |
| this._removeConnection(socket); |
| |
| if (this._getAllConnections().length != 0) { |
| // We had more than one connection before this disconnection |
| this._runBackgroundConnect(); |
| return; |
| } |
| |
| try { |
| await this._reconnect(); |
| } |
| catch (err) { |
| this._cleanUp(); |
| } |
| } |
| |
| /** Partition Awareness methods */ |
| |
| async _affinitySend(opCode, payloadWriter, payloadReader, affinityHint) { |
| let connection = await this._chooseConnection(affinityHint); |
| |
| while (true) { |
| Logger.logDebug('Endpoint chosen: ' + connection.endpoint); |
| |
| try { |
| await connection.sendRequest(opCode, payloadWriter, payloadReader); |
| return; |
| } |
| catch (err) { |
| if (!(err instanceof Errors.LostConnectionError)) { |
| throw err; |
| } |
| |
| Logger.logDebug(connection.endpoint + ' is unavailable'); |
| |
| this._removeConnection(connection); |
| |
| if (this._getAllConnections().length == 0) { |
| throw new Errors.LostConnectionError('Cluster is unavailable'); |
| } |
| } |
| |
| connection = this._getRandomConnection(); |
| Logger.logDebug('Node has been chosen randomly'); |
| } |
| } |
| |
| async _chooseConnection(affinityHint) { |
| const cacheId = affinityHint.cacheId; |
| |
| if (!this._distributionMap.has(cacheId)) { |
| Logger.logDebug('Distribution map does not have info for the cache ' + cacheId); |
| Logger.logDebug('Node has been chosen randomly'); |
| // We are not awaiting here in order to not increase latency of requests |
| this._getCachePartitions(cacheId); |
| return this._getRandomConnection(); |
| } |
| |
| const cacheAffinityMap = this._distributionMap.get(cacheId); |
| |
| const nodeId = await this._determineNodeId(cacheAffinityMap, |
| affinityHint.key, |
| affinityHint.keyType); |
| |
| if (nodeId in this._connections) { |
| Logger.logDebug('Node has been chosen by affinity'); |
| return this._connections[nodeId]; |
| } |
| |
| Logger.logDebug('Node has been chosen randomly'); |
| return this._getRandomConnection(); |
| } |
| |
| async _determineNodeId(cacheAffinityMap, key, keyType) { |
| const partitionMap = cacheAffinityMap.partitionMapping; |
| |
| if (partitionMap.size == 0) { |
| return null; |
| } |
| |
| const keyAffinityMap = cacheAffinityMap.keyConfig; |
| |
| const affinityKeyInfo = await this._affinityKeyInfo(key, keyType); |
| |
| let affinityKey = affinityKeyInfo.key; |
| let affinityKeyTypeCode = affinityKeyInfo.typeCode; |
| |
| if ('typeId' in affinityKeyInfo && keyAffinityMap.has(affinityKeyInfo.typeId)) { |
| const affinityKeyTypeId = keyAffinityMap.get(affinityKeyInfo.typeId); |
| |
| if (affinityKey instanceof BinaryObject && |
| affinityKey._fields.has(affinityKeyTypeId)) { |
| const field = affinityKey._fields.get(affinityKeyTypeId); |
| affinityKey = await field.getValue(); |
| affinityKeyTypeCode = field.typeCode; |
| } |
| } |
| |
| const keyHash = await BinaryUtils.hashCode(affinityKey, this._communicator, affinityKeyTypeCode); |
| const partition = PartitionAwarenessUtils.RendezvousAffinityFunction.calcPartition(keyHash, partitionMap.size); |
| Logger.logDebug('Partition = ' + partition); |
| |
| const nodeId = partitionMap.get(partition); |
| Logger.logDebug('Node ID = ' + nodeId); |
| |
| return nodeId; |
| } |
| |
| async _affinityKeyInfo(key, keyType) { |
| let typeCode = BinaryUtils.getTypeCode(keyType ? keyType : BinaryUtils.calcObjectType(key)); |
| |
| if (typeCode == BinaryUtils.TYPE_CODE.BINARY_OBJECT) { |
| return {'key': key, 'typeCode': typeCode, 'typeId': key._getTypeId()}; |
| } |
| |
| if (typeCode == BinaryUtils.TYPE_CODE.COMPLEX_OBJECT) { |
| const binObj = await BinaryObject.fromObject(key, keyType); |
| typeCode = BinaryUtils.TYPE_CODE.BINARY_OBJECT; |
| |
| return {'key': binObj, 'typeCode': typeCode, 'typeId': binObj._getTypeId()}; |
| } |
| |
| return {'key': key, 'typeCode': typeCode}; |
| } |
| |
| async _onAffinityTopologyChange(newVersion) { |
| if (!this._versionIsNewer(newVersion)) { |
| return; |
| } |
| |
| Logger.logDebug('New topology version reported: ' + newVersion); |
| |
| this._affinityTopologyVer = newVersion; |
| this._distributionMap = new Map(); |
| |
| this._runBackgroundConnect(); |
| } |
| |
| async _getCachePartitions(cacheId, tries = GET_CACHE_PARTITIONS_RETRIES) { |
| if (tries <= 0) { |
| return; |
| } |
| |
| Logger.logDebug('Getting cache partitions info...'); |
| |
| try { |
| await this.send( |
| BinaryUtils.OPERATION.CACHE_PARTITIONS, |
| async (payload) => { |
| // We always request partition map for one cache |
| payload.writeInteger(1); |
| payload.writeInteger(cacheId); |
| }, |
| this._handleCachePartitions.bind(this)); |
| } |
| catch (err) { |
| if (err instanceof Errors.LostConnectionError) { |
| return; |
| } |
| |
| // Retries in case of an error (most probably |
| // "Getting affinity for topology version earlier than affinity is calculated") |
| await this._sleep(GET_CACHE_PARTITIONS_DELAY); |
| this._getCachePartitions(cacheId, tries - 1); |
| } |
| } |
| |
| async _handleCachePartitions(payload) { |
| const affinityTopologyVer = new PartitionAwarenessUtils.AffinityTopologyVersion(payload); |
| Logger.logDebug('Partitions info for topology version ' + affinityTopologyVer); |
| |
| if (this._versionIsNewer(affinityTopologyVer)) { |
| this._distributionMap = new Map(); |
| this._affinityTopologyVer = affinityTopologyVer; |
| Logger.logDebug('New affinity topology version: ' + affinityTopologyVer); |
| } else if (this._versionIsOlder(affinityTopologyVer)) { |
| Logger.logDebug('Topology version is outdated. Actual version: ' + this._affinityTopologyVer); |
| return; |
| } |
| |
| const groupsNum = payload.readInteger(); |
| Logger.logDebug('Partitions info for ' + groupsNum + ' cache groups received'); |
| |
| for (let i = 0; i < groupsNum; i++) { |
| const group = await PartitionAwarenessUtils.PartitionAwarenessCacheGroup.build(this._communicator, payload); |
| // {partition -> nodeId} |
| const partitionMapping = new Map(); |
| |
| for (const [nodeId, partitions] of group.partitionMap) { |
| for (const partition of partitions) { |
| partitionMapping.set(partition, nodeId); |
| } |
| } |
| |
| for (const [cacheId, config] of group.caches) { |
| const cacheAffinityMap = new PartitionAwarenessUtils.CacheAffinityMap(cacheId, partitionMapping, config); |
| this._distributionMap.set(cacheId, cacheAffinityMap); |
| Logger.logDebug('Partitions info for cache: ' + cacheId); |
| } |
| } |
| |
| Logger.logDebug('Got cache partitions info'); |
| } |
| |
| _getRandomConnection() { |
| const allConnections = this._getAllConnections(); |
| return allConnections[this._getRandomInt(allConnections.length)]; |
| } |
| |
| _changeState(state, reason = null) { |
| if (Logger.debug) { |
| Logger.logDebug(Util.format('Router state: %s -> %s'), |
| this._getState(this._state), |
| this._getState(state)); |
| } |
| if (this._state !== state) { |
| this._state = state; |
| if (this._onStateChanged) { |
| this._onStateChanged(state, reason); |
| } |
| } |
| } |
| |
| _getState(state) { |
| switch (state) { |
| case IgniteClient.STATE.DISCONNECTED: |
| return 'DISCONNECTED'; |
| case IgniteClient.STATE.CONNECTING: |
| return 'CONNECTING'; |
| case IgniteClient.STATE.CONNECTED: |
| return 'CONNECTED'; |
| default: |
| return 'UNKNOWN'; |
| } |
| } |
| |
| _versionIsNewer(version) { |
| return this._affinityTopologyVer === null || |
| this._affinityTopologyVer.compareTo(version) < 0; |
| } |
| |
| _versionIsOlder(version) { |
| return this._affinityTopologyVer !== null && |
| this._affinityTopologyVer.compareTo(version) > 0; |
| } |
| |
| // Returns a random integer between 0 and max - 1 |
| _getRandomInt(max) { |
| if (max === 0) { |
| return 0; |
| } |
| return Math.floor(Math.random() * max); |
| } |
| |
| _sleep(milliseconds) { |
| return new Promise(resolve => setTimeout(resolve, milliseconds)); |
| } |
| } |
| |
| module.exports = Router; |