| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| 'use strict'; |
| |
| import * as tls from 'tls'; |
| import * as net from 'net'; |
| const Long = require('long'); |
| import * as Util from 'util'; |
| import BinaryUtils from "./BinaryUtils"; |
| |
| import Logger from "./Logger"; |
| import ArgumentChecker from "./ArgumentChecker"; |
| import BinaryCommunicator from "./BinaryCommunicator"; |
| import MessageBuffer from "./MessageBuffer"; |
| import {NetConnectOpts, Socket } from "net"; |
| import { LostConnectionError, OperationError, IllegalStateError, IgniteClientError } from '../Errors'; |
| import { AffinityTopologyVersion } from './PartitionAwarenessUtils'; |
| import { IgniteClientConfiguration } from "../IgniteClientConfiguration"; |
| import { ConnectionOptions } from 'tls'; |
| |
| const HANDSHAKE_SUCCESS_STATUS_CODE = 1; |
| const REQUEST_SUCCESS_STATUS_CODE = 0; |
| const PORT_DEFAULT = 10800; |
| const FLAG_ERROR = 1; |
| const FLAG_TOPOLOGY_CHANGED = 2; |
| |
| class ProtocolVersion { |
| |
| private _major: number; |
| |
| private _minor: number; |
| |
| private _patch: number; |
| |
| constructor(major = null, minor = null, patch = null) { |
| this._major = major; |
| this._minor = minor; |
| this._patch = patch; |
| } |
| |
| compareTo(other) { |
| let diff = this._major - other._major; |
| if (diff !== 0) { |
| return diff; |
| } |
| diff = this._minor - other._minor; |
| if (diff !== 0) { |
| return diff; |
| } |
| return this._patch - other._patch; |
| } |
| |
| equals(other) { |
| return this.compareTo(other) === 0; |
| } |
| |
| toString() { |
| return Util.format('%d.%d.%d', this._major, this._minor, this._patch); |
| } |
| |
| read(buffer) { |
| this._major = buffer.readShort(); |
| this._minor = buffer.readShort(); |
| this._patch = buffer.readShort(); |
| } |
| |
| write(buffer) { |
| buffer.writeShort(this._major); |
| buffer.writeShort(this._minor); |
| buffer.writeShort(this._patch); |
| } |
| } |
| |
| const PROTOCOL_VERSION_1_0_0 = new ProtocolVersion(1, 0, 0); |
| const PROTOCOL_VERSION_1_1_0 = new ProtocolVersion(1, 1, 0); |
| const PROTOCOL_VERSION_1_2_0 = new ProtocolVersion(1, 2, 0); |
| const PROTOCOL_VERSION_1_3_0 = new ProtocolVersion(1, 3, 0); |
| const PROTOCOL_VERSION_1_4_0 = new ProtocolVersion(1, 4, 0); |
| |
| const SUPPORTED_VERSIONS = [ |
| // PROTOCOL_VERSION_1_0_0, // Support for QueryField precision/scale fields breaks 1.0.0 compatibility |
| PROTOCOL_VERSION_1_1_0, |
| PROTOCOL_VERSION_1_2_0, |
| PROTOCOL_VERSION_1_3_0, |
| PROTOCOL_VERSION_1_4_0 |
| ]; |
| |
| const CURRENT_VERSION = PROTOCOL_VERSION_1_4_0; |
| |
| export enum STATE { |
| INITIAL = 0, |
| HANDSHAKE = 1, |
| CONNECTED = 2, |
| DISCONNECTED = 3 |
| } |
| |
| export default class ClientSocket { |
| |
| private _socket: Socket; |
| |
| private _host: string; |
| |
| private _buffer: MessageBuffer; |
| |
| private _requests: Map<string, Request>; |
| |
| private _nodeUuid: string; |
| |
| private _error: string | Error; |
| |
| private _endpoint: string; |
| private _config: IgniteClientConfiguration; |
| private _communicator: BinaryCommunicator; |
| private _onSocketDisconnect: Function; |
| private _onAffinityTopologyChange: Function; |
| private _state: STATE; |
| private _requestId: Long; |
| private _offset: number; |
| private _wasConnected: boolean; |
| private _handshakeRequestId: Long; |
| private _protocolVersion: ProtocolVersion; |
| private _port: number | string; |
| private _version: number; |
| |
| constructor(endpoint: string, config: IgniteClientConfiguration, communicator: BinaryCommunicator, onSocketDisconnect: Function, onAffinityTopologyChange: Function) { |
| ArgumentChecker.notEmpty(endpoint, 'endpoints'); |
| this._endpoint = endpoint; |
| this._parseEndpoint(endpoint); |
| this._config = config; |
| this._communicator = communicator; |
| this._onSocketDisconnect = onSocketDisconnect; |
| this._onAffinityTopologyChange = onAffinityTopologyChange; |
| |
| this._state = STATE.INITIAL; |
| this._requests = new Map<string, Request>(); |
| this._requestId = Long.ZERO; |
| this._handshakeRequestId = null; |
| this._protocolVersion = null; |
| this._wasConnected = false; |
| this._socket = null; |
| this._buffer = null; |
| this._offset = 0; |
| this._error = null; |
| |
| this._nodeUuid = null; |
| } |
| |
| async connect() { |
| return new Promise((resolve, reject) => { |
| this._connectSocket( |
| this._getHandshake(CURRENT_VERSION, resolve, reject)); |
| }); |
| } |
| |
| disconnect() { |
| this._disconnect(true, false); |
| } |
| |
| get requestId() { |
| const id = this._requestId; |
| this._requestId = this._requestId.add(1); |
| return id; |
| } |
| |
| get endpoint() { |
| return this._endpoint; |
| } |
| |
| get nodeUUID() { |
| return this._nodeUuid; |
| } |
| |
| async sendRequest(opCode, payloadWriter, payloadReader = null) { |
| if (this._state === STATE.CONNECTED) { |
| return new Promise(async (resolve, reject) => { |
| const request = new Request(this.requestId, opCode, payloadWriter, payloadReader, resolve, reject); |
| this._addRequest(request); |
| await this._sendRequest(request); |
| }); |
| } |
| else { |
| throw new IllegalStateError(this._state); |
| } |
| } |
| |
| _connectSocket(handshakeRequest) { |
| const onConnected = async () => { |
| this._state = STATE.HANDSHAKE; |
| // send handshake |
| await this._sendRequest(handshakeRequest); |
| }; |
| |
| const options: (NetConnectOpts | ConnectionOptions) = Object.assign({}, |
| this._config.options, |
| { host : this._host, port : this._port, version : this._version }); |
| |
| if (this._config.useTLS) { |
| this._socket = tls.connect(<ConnectionOptions>options, onConnected); |
| } |
| else { |
| this._socket = net.createConnection(<NetConnectOpts>options, onConnected); |
| } |
| |
| this._socket.on('data', async (data: Buffer) => { |
| try { |
| await this._processResponse(data); |
| } |
| catch (err) { |
| this._error = err.message; |
| this._disconnect(); |
| } |
| }); |
| this._socket.on('close', () => { |
| this._disconnect(false); |
| }); |
| this._socket.on('error', (error) => { |
| this._error = this._state === STATE.INITIAL ? |
| 'Connection failed: ' + error : error; |
| this._disconnect(); |
| }); |
| } |
| |
| _addRequest(request: Request) { |
| this._requests.set(request.id.toString(), request); |
| } |
| |
| async _sendRequest(request: Request) { |
| try { |
| const message = await request.getMessage(); |
| this._logMessage(request.id.toString(), true, message); |
| this._socket.write(message); |
| } |
| catch (err) { |
| this._requests.delete(request.id.toString()); |
| request.reject(err); |
| } |
| } |
| |
| async _processResponse(message: Buffer) { |
| if (this._state === STATE.DISCONNECTED) { |
| return; |
| } |
| |
| if (this._buffer) { |
| this._buffer.concat(message); |
| this._buffer.position = this._offset; |
| } |
| else { |
| this._buffer = MessageBuffer.from(message, 0); |
| } |
| |
| while (this._buffer && this._offset < this._buffer.length) { |
| const buffer = this._buffer; |
| // Response length |
| const length = buffer.readInteger() + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER); |
| |
| if (buffer.length < this._offset + length) { |
| break; |
| } |
| this._offset += length; |
| |
| let requestId; |
| const isHandshake = this._state === STATE.HANDSHAKE; |
| |
| if (isHandshake) { |
| // Handshake status |
| requestId = this._handshakeRequestId.toString(); |
| } |
| else { |
| // Request id |
| requestId = buffer.readLong().toString(); |
| } |
| |
| this._logMessage(requestId, false, buffer.getSlice(this._offset - length, length)); |
| |
| if (this._offset === buffer.length) { |
| this._buffer = null; |
| this._offset = 0; |
| } |
| |
| if (this._requests.has(requestId)) { |
| const request = this._requests.get(requestId); |
| this._requests.delete(requestId); |
| if (isHandshake) { |
| await this._finalizeHandshake(buffer, request); |
| } |
| else { |
| await this._finalizeResponse(buffer, request); |
| } |
| } |
| else { |
| throw IgniteClientError.internalError('Invalid response id: ' + requestId); |
| } |
| } |
| } |
| |
| async _finalizeHandshake(buffer: MessageBuffer, request: Request) { |
| const isSuccess = buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE; |
| |
| if (!isSuccess) { |
| // Server protocol version |
| const serverVersion = new ProtocolVersion(); |
| serverVersion.read(buffer); |
| // Error message |
| const errMessage = BinaryCommunicator.readString(buffer); |
| |
| if (!this._protocolVersion.equals(serverVersion)) { |
| if (!this._isSupportedVersion(serverVersion) || |
| serverVersion.compareTo(PROTOCOL_VERSION_1_1_0) < 0 && this._config.userName) { |
| request.reject(new OperationError( |
| Util.format('Protocol version mismatch: client %s / server %s. Server details: %s', |
| this._protocolVersion.toString(), serverVersion.toString(), errMessage))); |
| this._disconnect(); |
| } |
| else { |
| // retry handshake with server version |
| const handshakeRequest = this._getHandshake(serverVersion, request.resolve, request.reject); |
| await this._sendRequest(handshakeRequest); |
| } |
| } |
| else { |
| request.reject(new OperationError(errMessage)); |
| this._disconnect(); |
| } |
| } |
| else { |
| if (this._protocolVersion.compareTo(PROTOCOL_VERSION_1_4_0) >= 0) { |
| this._nodeUuid = await this._communicator.readObject(buffer, BinaryUtils.TYPE_CODE.UUID); |
| } |
| |
| this._state = STATE.CONNECTED; |
| this._wasConnected = true; |
| request.resolve(); |
| } |
| } |
| |
| async _finalizeResponse(buffer: MessageBuffer, request: Request) { |
| let statusCode, isSuccess; |
| |
| if (this._protocolVersion.compareTo(PROTOCOL_VERSION_1_4_0) < 0) { |
| // Check status code |
| statusCode = buffer.readInteger(); |
| isSuccess = statusCode === REQUEST_SUCCESS_STATUS_CODE; |
| } |
| else { |
| // Check flags |
| let flags = buffer.readShort(); |
| isSuccess = !(flags & FLAG_ERROR); |
| |
| if (flags & FLAG_TOPOLOGY_CHANGED) { |
| const newVersion = new AffinityTopologyVersion(buffer); |
| await this._onAffinityTopologyChange(newVersion); |
| } |
| |
| if (!isSuccess) { |
| statusCode = buffer.readInteger(); |
| } |
| } |
| |
| if (!isSuccess) { |
| // Error message |
| const errMessage = BinaryCommunicator.readString(buffer); |
| request.reject(new OperationError(errMessage)); |
| } |
| else { |
| try { |
| if (request.payloadReader) { |
| await request.payloadReader(buffer); |
| } |
| request.resolve(); |
| } |
| catch (err) { |
| request.reject(err); |
| } |
| } |
| } |
| |
| async _handshakePayloadWriter(payload) { |
| // Handshake code |
| payload.writeByte(1); |
| // Protocol version |
| this._protocolVersion.write(payload); |
| // Client code |
| payload.writeByte(2); |
| if (this._config.userName) { |
| BinaryCommunicator.writeString(payload, this._config.userName); |
| BinaryCommunicator.writeString(payload, this._config.password); |
| } |
| } |
| |
| _getHandshake(version: ProtocolVersion, resolve: Function, reject: Function) { |
| this._protocolVersion = version; |
| const handshakeRequest = new Request( |
| this.requestId, null, this._handshakePayloadWriter.bind(this), null, resolve, reject); |
| this._addRequest(handshakeRequest); |
| this._handshakeRequestId = handshakeRequest.id; |
| return handshakeRequest; |
| } |
| |
| _isSupportedVersion(protocolVersion) { |
| for (let version of SUPPORTED_VERSIONS) { |
| if (version.equals(protocolVersion)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| _disconnect(close = true, callOnDisconnect = true) { |
| this._state = STATE.DISCONNECTED; |
| this._requests.forEach((request, id) => { |
| request.reject(new LostConnectionError(this._error)); |
| this._requests.delete(id); |
| }); |
| if (this._wasConnected && callOnDisconnect && this._onSocketDisconnect) { |
| this._onSocketDisconnect(this, this._error); |
| } |
| if (close) { |
| this._onSocketDisconnect = null; |
| this._socket.end(); |
| } |
| } |
| |
| _parseEndpoint(endpoint) { |
| endpoint = endpoint.trim(); |
| this._host = endpoint; |
| this._port = null; |
| const colonCnt = endpoint.split(':').length - 1; |
| if (colonCnt > 1) { |
| // IPv6 address |
| this._version = 6; |
| const index = endpoint.lastIndexOf(']:'); |
| if (index >= 0) { |
| this._host = endpoint.substring(0, index + 1); |
| this._port = endpoint.substring(index + 2); |
| } |
| if (this._host.startsWith('[') || this._host.endsWith(']')) { |
| if (this._host.startsWith('[') && this._host.endsWith(']')) { |
| this._host = this._host.substring(1, this._host.length - 1); |
| } |
| else { |
| throw IgniteClientError.illegalArgumentError('Incorrect endpoint format: ' + endpoint); |
| } |
| } |
| } |
| else { |
| // IPv4 address |
| this._version = 4; |
| const index = endpoint.lastIndexOf(':'); |
| if (index >= 0) { |
| this._host = endpoint.substring(0, index); |
| this._port = endpoint.substring(index + 1); |
| } |
| } |
| if (!this._port) { |
| this._port = PORT_DEFAULT; |
| } |
| else { |
| this._port = parseInt(<string>this._port); |
| if (isNaN(this._port)) { |
| throw IgniteClientError.illegalArgumentError('Incorrect endpoint format: ' + endpoint); |
| } |
| } |
| } |
| |
| _logMessage(requestId, isRequest, message) { |
| if (Logger.debug) { |
| Logger.logDebug((isRequest ? 'Request: ' : 'Response: ') + requestId); |
| Logger.logDebug('[' + [...message] + ']'); |
| } |
| } |
| } |
| |
| class Request { |
| private _id: Long; |
| private _resolve: Function; |
| private _reject: Function; |
| private _payloadWriter: Function; |
| private _opCode: number; |
| private _payloadReader: Function; |
| constructor(id: Long, opCode, payloadWriter, payloadReader, resolve: Function, reject: Function) { |
| this._id = id; |
| this._opCode = opCode; |
| this._payloadWriter = payloadWriter; |
| this._payloadReader = payloadReader; |
| this._resolve = resolve; |
| this._reject = reject; |
| } |
| |
| get id(): Long { |
| return this._id; |
| } |
| |
| get payloadReader() { |
| return this._payloadReader; |
| } |
| |
| get resolve() { |
| return this._resolve; |
| } |
| |
| get reject() { |
| return this._reject; |
| } |
| |
| async getMessage() { |
| const message = new MessageBuffer(); |
| // Skip message length |
| const messageStartPos = BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER); |
| message.position = messageStartPos; |
| if (this._opCode !== null) { |
| // Op code |
| message.writeShort(this._opCode); |
| // Request id |
| message.writeLong(this._id); |
| } |
| if (this._payloadWriter) { |
| // Payload |
| await this._payloadWriter(message); |
| } |
| // Message length |
| message.position = 0; |
| message.writeInteger(message.length - messageStartPos); |
| return message.data; |
| } |
| } |