IGNITE-13794: Partition awareness for Node.js
This closes #2
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..58ca0e8
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+.idea
+node_modules
+build
+npm-debug.log
+/logs
+package-lock.json
\ No newline at end of file
diff --git a/README.md b/README.md
index 4792caa..e02e55a 100644
--- a/README.md
+++ b/README.md
@@ -30,3 +30,86 @@
```
For more information, see [Apache Ignite Node.JS Thin Client documentation](https://apacheignite.readme.io/docs/nodejs-thin-client).
+
+## Tests Installation ##
+
+Tests are installed along with the client.
+Follow the [Installation instructions](#installation).
+
+## Tests Running ##
+
+1. Run Ignite server locally or remotely with default configuration.
+2. Set the environment variable:
+ - **APACHE_IGNITE_CLIENT_ENDPOINTS** - comma separated list of Ignite node endpoints.
+ - **APACHE_IGNITE_CLIENT_DEBUG** - (optional) if *true*, tests will display additional output (default: *false*).
+3. Alternatively, instead of the environment variables setting, you can directly specify the values of the corresponding variables in [nodejs-thin-client/spec/config.js](./spec/config.js) file.
+4. Run the tests:
+
+### Run Functional Tests ###
+
+Call `npm test` command from `nodejs-thin-client` folder.
+
+### Run Examples Executors ###
+
+Call `npm run test:examples` command from `nodejs-thin-client` folder.
+
+### Run AuthTlsExample Executor ###
+
+Active Ignite server node with non-default configuration is required (authentication and TLS switched on).
+
+If the server runs locally:
+- setup the server to accept TLS. During the setup use `keystore.jks` and `truststore.jks` certificates from `nodejs-thin-client/examples/certs/` folder. Password for the files: `123456`
+- switch on the authentication on the server. Use the default username/password.
+
+If the server runs remotely, and/or other certificates are required, and/or non-default username/password is required - see this [instruction](#additional-setup-for-authtlsexample).
+
+Call `npm run test:auth_example` command from `nodejs-thin-client` folder.
+
+## Additional Setup for AuthTlsExample ##
+
+1. Obtain certificates required for TLS:
+ - either use pre-generated certificates provided in the [examples/certs](./examples/certs) folder. Password for the files: `123456`. Note, these certificates work for an Ignite server installed locally only.
+ - or obtain other existing certificates applicable for a concrete Ignite server.
+ - or generate new certificates applicable for a concrete Ignite server.
+
+ - The following files are needed:
+ - keystore.jks, truststore.jks - for the server side
+ - client.key, client.crt, ca.crt - for the client side
+
+2. Place client.key, client.crt and ca.crt files somewhere locally, eg. into the [examples/certs](./examples/certs) folder.
+
+3. If needed, modify `TLS_KEY_FILE_NAME`, `TLS_CERT_FILE_NAME` and `TLS_CA_FILE_NAME` constants in the example source file. The default values point to the files in the [examples/certs](./examples/certs) folder.
+
+4. Setup Ignite server to accept TLS - see appropriate [Ignite documentation](https://www.Ignite.com/docs/latest/developers-guide/thin-clients/getting-started-with-thin-clients#cluster-configuration). Provide the obtained keystore.jks and truststore.jks certificates during the setup.
+
+5. Switch on and setup authentication in Ignite server - see appropriate [Ignite documentation](https://www.Ignite.com/docs/latest/developers-guide/thin-clients/getting-started-with-thin-clients#cluster-configuration).
+
+6. If needed, modify `USER_NAME` and `PASSWORD` constants in the example source file. The default values are the default Ignite username/password.
+
+## Additional Setup for FailoverExample ##
+
+1. Start three Ignite server nodes.
+
+2. If needed, modify `ENDPOINT1`, `ENDPOINT2`, `ENDPOINT2` constants in an example source file - Ignite node endpoints.
+Default values are `localhost:10800`, `localhost:10801`, `localhost:10802` respectively.
+
+2. Run an example by calling `node FailoverExample.js`.
+
+3. Shut down the node the client is connected to (you can find it out from the client logs in the console).
+
+4. From the logs, you will see that the client automatically reconnects to another node which is available.
+
+5. Shut down all the nodes. You will see the client being stopped after failing to connect to each of the nodes.
+
+---------------------------------------------------------------------
+
+# API spec generation: instruction #
+
+It should be done if a public API class/method has been changed.
+1. Execute `npm install -g jsdoc` to install jsdoc (https://www.npmjs.com/package/jsdoc)
+2. Go to `nodejs-thin-client/api_spec`
+3. Execute `jsdoc -c conf.json --readme index.md` command.
+
+Note: `nodejs-thin-client/api_spec/conf.json` is a file with jsdoc configuration.
+
+For more information, see [Ignite Node.js Thin Client documentation](https://www.Ignite.com/docs/latest/developers-guide/thin-clients/nodejs-thin-client).
\ No newline at end of file
diff --git a/lib/BinaryObject.js b/lib/BinaryObject.js
index 478dbaf..b7da8d6 100644
--- a/lib/BinaryObject.js
+++ b/lib/BinaryObject.js
@@ -26,6 +26,7 @@
const BinaryField = require('./internal/BinaryType').BinaryField;
const BinaryTypeBuilder = require('./internal/BinaryType').BinaryTypeBuilder;
const ArgumentChecker = require('./internal/ArgumentChecker');
+const MessageBuffer = require('./internal/MessageBuffer');
const Logger = require('./internal/Logger');
const HEADER_LENGTH = 24;
@@ -80,6 +81,7 @@
this._hasSchema = false;
this._compactFooter = false;
this._hasRawData = false;
+ this._hashCode = null;
}
/**
@@ -295,6 +297,25 @@
/**
* @ignore
*/
+ async _getHashCode(communicator) {
+ if (this._hashCode !== null && !this._modified) {
+ return this._hashCode;
+ }
+
+ await this._write(communicator, new MessageBuffer());
+ return this._hashCode;
+ }
+
+ /**
+ * @ignore
+ */
+ _getTypeId() {
+ return this._typeBuilder.getTypeId();
+ }
+
+ /**
+ * @ignore
+ */
async _write(communicator, buffer) {
if (this._buffer && !this._modified) {
buffer.writeBuffer(this._buffer.buffer, this._startPos, this._startPos + this._length);
@@ -357,8 +378,9 @@
// type id
this._buffer.writeInteger(this._typeBuilder.getTypeId());
// hash code
- this._buffer.writeInteger(BinaryUtils.contentHashCode(
- this._buffer, this._startPos + HEADER_LENGTH, this._schemaOffset - 1));
+ this._hashCode = BinaryUtils.contentHashCode(
+ this._buffer, this._startPos + HEADER_LENGTH, this._schemaOffset - 1);
+ this._buffer.writeInteger(this._hashCode);
// length
this._buffer.writeInteger(this._length);
// schema id
@@ -429,7 +451,7 @@
// type id
const typeId = this._buffer.readInteger();
// hash code
- this._buffer.readInteger();
+ this._hashCode = this._buffer.readInteger();
// length
this._length = this._buffer.readInteger();
// schema id
diff --git a/lib/CacheClient.js b/lib/CacheClient.js
index fad8e71..597cda0 100644
--- a/lib/CacheClient.js
+++ b/lib/CacheClient.js
@@ -381,7 +381,8 @@
},
async (payload) => {
result = payload.readBoolean();
- });
+ },
+ this._createAffinityHint(key));
return result;
}
@@ -564,8 +565,30 @@
/**
* @ignore
*/
+ async _localPeek(socket, key, peekModes = []) {
+ ArgumentChecker.notNull(key, 'key');
+ let value = null;
+ await socket.sendRequest(
+ BinaryUtils.OPERATION.CACHE_LOCAL_PEEK,
+ async (payload) => {
+ this._writeCacheInfo(payload);
+ await this._communicator.writeObject(payload, key, this._getKeyType());
+ payload.writeInteger(peekModes.length);
+ for (let mode of peekModes) {
+ payload.writeByte(mode);
+ }
+ },
+ async (payload) => {
+ value = await this._communicator.readObject(payload, this._getValueType());
+ });
+ return value;
+ }
+
+ /**
+ * @ignore
+ */
static _calculateId(name) {
- return BinaryUtils.hashCode(name);
+ return BinaryUtils.strHashCode(name);
}
/**
@@ -620,7 +643,8 @@
this._writeCacheInfo(payload);
await this._writeKeyValue(payload, key, value);
},
- payloadReader);
+ payloadReader,
+ this._createAffinityHint(key));
}
/**
@@ -660,7 +684,8 @@
this._writeCacheInfo(payload);
await this._communicator.writeObject(payload, key, this._getKeyType());
},
- payloadReader);
+ payloadReader,
+ this._createAffinityHint(key));
}
/**
@@ -716,6 +741,17 @@
});
return result;
}
+
+ /**
+ * @ignore
+ */
+ _createAffinityHint(key) {
+ const affinityHint = {};
+ affinityHint.cacheId = this._cacheId;
+ affinityHint.key = key;
+ affinityHint.keyType = this._keyType;
+ return affinityHint;
+ }
}
/**
diff --git a/lib/Errors.js b/lib/Errors.js
index 89baf38..e7a1a9c 100644
--- a/lib/Errors.js
+++ b/lib/Errors.js
@@ -112,8 +112,8 @@
* @extends IgniteClientError
*/
class IllegalStateError extends IgniteClientError {
- constructor(message = null) {
- super(message || 'Ignite client is not in an appropriate state for the requested operation');
+ constructor(state, message = null) {
+ super(message || 'Ignite client is not in an appropriate state for the requested operation. Current state: ' + state);
}
}
diff --git a/lib/IgniteClient.js b/lib/IgniteClient.js
index 1974352..f897f95 100644
--- a/lib/IgniteClient.js
+++ b/lib/IgniteClient.js
@@ -66,9 +66,9 @@
* @return {IgniteClient} - new IgniteClient instance.
*/
constructor(onStateChanged = null) {
- const ClientFailoverSocket = require('./internal/ClientFailoverSocket');
- this._socket = new ClientFailoverSocket(onStateChanged);
- this._communicator = new BinaryCommunicator(this._socket);
+ const Router = require('./internal/Router');
+ this._router = new Router(onStateChanged);
+ this._communicator = new BinaryCommunicator(this._router);
}
static get STATE() {
@@ -98,7 +98,7 @@
async connect(config) {
ArgumentChecker.notEmpty(config, 'config');
ArgumentChecker.hasType(config, 'config', false, IgniteClientConfiguration);
- await this._socket.connect(config);
+ await this._router.connect(this._communicator, config);
}
/**
@@ -108,9 +108,7 @@
* Does nothing if the client already disconnected.
*/
disconnect() {
- if (this._socket) {
- this._socket.disconnect();
- }
+ this._router.disconnect();
}
/**
@@ -138,6 +136,7 @@
async (payload) => {
await this._writeCacheNameOrConfig(payload, name, cacheConfig);
});
+
return this._getCache(name, cacheConfig);
}
@@ -197,10 +196,13 @@
*/
async destroyCache(name) {
ArgumentChecker.notEmpty(name, 'name');
+
+ const cacheId = CacheClient._calculateId(name);
+
await this._communicator.send(
BinaryUtils.OPERATION.CACHE_DESTROY,
async (payload) => {
- payload.writeInteger(CacheClient._calculateId(name));
+ payload.writeInteger(cacheId);
});
}
diff --git a/lib/IgniteClientConfiguration.js b/lib/IgniteClientConfiguration.js
index 5dab92a..6c05b24 100644
--- a/lib/IgniteClientConfiguration.js
+++ b/lib/IgniteClientConfiguration.js
@@ -38,6 +38,7 @@
* with the provided mandatory settings and default optional settings.
*
* By default, the client does not use authentication and secure connection.
+ * The Partition Awareness feature is disabled by default.
*
* @param {...string} endpoints - Ignite node endpoint(s).
* The client randomly connects/reconnects to one of the specified node.
@@ -53,6 +54,7 @@
this._password = null;
this._useTLS = false;
this._options = null;
+ this._partitionAwareness = false
}
@@ -99,12 +101,14 @@
* @param {object} [connectionOptions=null] - connection options.
* - For non-secure connection options defined here {@link https://nodejs.org/api/net.html#net_net_createconnection_options_connectlistener}
* - For secure connection options defined here {@link https://nodejs.org/api/tls.html#tls_tls_connect_options_callback}
+ * @param {boolean} [partitionAwareness=false] - if true, the Partition Awareness feature will be enabled. Otherwise, disabled.
*
* @return {IgniteClientConfiguration} - the same instance of the IgniteClientConfiguration.
*/
- setConnectionOptions(useTLS, connectionOptions = null) {
+ setConnectionOptions(useTLS, connectionOptions = null, partitionAwareness = false) {
this._useTLS = useTLS;
this._options = connectionOptions;
+ this._partitionAwareness = partitionAwareness;
return this;
}
}
diff --git a/lib/internal/BinaryCommunicator.js b/lib/internal/BinaryCommunicator.js
index 20ddcff..6c5d839 100644
--- a/lib/internal/BinaryCommunicator.js
+++ b/lib/internal/BinaryCommunicator.js
@@ -28,8 +28,8 @@
class BinaryCommunicator {
- constructor(socket) {
- this._socket = socket;
+ constructor(router) {
+ this._router = router;
this._typeStorage = new BinaryTypeStorage(this);
}
@@ -52,8 +52,8 @@
}
}
- async send(opCode, payloadWriter, payloadReader = null) {
- await this._socket.send(opCode, payloadWriter, payloadReader);
+ async send(opCode, payloadWriter, payloadReader = null, affinityHint = null) {
+ await this._router.send(opCode, payloadWriter, payloadReader, affinityHint);
}
get typeStorage() {
@@ -146,7 +146,7 @@
await this._writeMap(buffer, object, objectType);
break;
case BinaryUtils.TYPE_CODE.BINARY_OBJECT:
- await this._writeBinaryObject(buffer, object, objectType);
+ await this._writeBinaryObject(buffer, object);
break;
case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT:
await this._writeComplexObject(buffer, object, objectType);
diff --git a/lib/internal/BinaryType.js b/lib/internal/BinaryType.js
index 4a36426..bba5f93 100644
--- a/lib/internal/BinaryType.js
+++ b/lib/internal/BinaryType.js
@@ -115,7 +115,7 @@
}
static _calculateId(name) {
- return BinaryUtils.hashCodeLowerCase(name);
+ return BinaryUtils.strHashCodeLowerCase(name);
}
async _write(buffer) {
@@ -316,7 +316,7 @@
}
static _calculateId(name) {
- return BinaryUtils.hashCodeLowerCase(name);
+ return BinaryUtils.strHashCodeLowerCase(name);
}
async _write(buffer) {
diff --git a/lib/internal/BinaryUtils.js b/lib/internal/BinaryUtils.js
index fe1e403..0822bb4 100644
--- a/lib/internal/BinaryUtils.js
+++ b/lib/internal/BinaryUtils.js
@@ -18,6 +18,7 @@
'use strict';
const Decimal = require('decimal.js');
+const Long = require('long');
const ObjectType = require('../ObjectType').ObjectType;
const CompositeType = require('../ObjectType').CompositeType;
const MapObjectType = require('../ObjectType').MapObjectType;
@@ -53,6 +54,7 @@
CACHE_REMOVE_KEYS : 1018,
CACHE_REMOVE_ALL : 1019,
CACHE_GET_SIZE : 1020,
+ CACHE_LOCAL_PEEK : 1021,
// Cache Configuration
CACHE_GET_NAMES : 1050,
CACHE_CREATE_WITH_NAME : 1051,
@@ -61,6 +63,7 @@
CACHE_GET_OR_CREATE_WITH_CONFIGURATION : 1054,
CACHE_GET_CONFIGURATION : 1055,
CACHE_DESTROY : 1056,
+ CACHE_PARTITIONS : 1101,
// SQL and Scan Queries
QUERY_SCAN : 2000,
QUERY_SCAN_CURSOR_GET_PAGE : 2001,
@@ -573,7 +576,66 @@
return fields;
}
- static hashCode(str) {
+ static async hashCode(object, communicator, typeCode = null) {
+ if (typeCode === null) {
+ typeCode = BinaryUtils.getTypeCode(BinaryUtils.calcObjectType(object));
+ }
+
+ if (BinaryUtils.isStandardType(typeCode)) {
+ return BinaryUtils.standardHashCode(object, typeCode);
+ }
+ else {
+ return await object._getHashCode(communicator);
+ }
+ }
+
+ // Calculates hash code for an object of a standard type
+ static standardHashCode(object, typeCode = null) {
+ if (typeCode === null) {
+ typeCode = BinaryUtils.getTypeCode(BinaryUtils.calcObjectType(object));
+ }
+
+ switch (typeCode) {
+ case BinaryUtils.TYPE_CODE.BYTE:
+ case BinaryUtils.TYPE_CODE.SHORT:
+ case BinaryUtils.TYPE_CODE.INTEGER:
+ return this.intHashCode(object);
+ case BinaryUtils.TYPE_CODE.LONG:
+ return this.longHashCode(object);
+ case BinaryUtils.TYPE_CODE.FLOAT:
+ return this.floatHashCode(object);
+ case BinaryUtils.TYPE_CODE.DOUBLE:
+ return this.doubleHashCode(object);
+ case BinaryUtils.TYPE_CODE.CHAR:
+ return this.charHashCode(object);
+ case BinaryUtils.TYPE_CODE.BOOLEAN:
+ return this.boolHashCode(object);
+ case BinaryUtils.TYPE_CODE.STRING:
+ return this.strHashCode(object);
+ case BinaryUtils.TYPE_CODE.UUID:
+ return this.uuidHashCode(object);
+ case BinaryUtils.TYPE_CODE.TIME:
+ return this.timeHashCode(object);
+ case BinaryUtils.TYPE_CODE.DATE:
+ case BinaryUtils.TYPE_CODE.TIMESTAMP:
+ return this.datetimeHashCode(object);
+ default:
+ return 0;
+ }
+ }
+
+ static contentHashCode(buffer, startPos, endPos) {
+ let hash = 1;
+ for (let i = startPos; i <= endPos; i++) {
+ hash = 31 * hash + buffer._buffer[i];
+ hash |= 0; // Convert to 32bit integer
+ }
+ return hash;
+ }
+
+ static strHashCode(str) {
+ // This method calcuates hash code for the String Ignite type
+ // bool must be a js 'string'
let hash = 0, char;
if (str && str.length > 0) {
for (let i = 0; i < str.length; i++) {
@@ -585,17 +647,79 @@
return hash;
}
- static hashCodeLowerCase(str) {
- return BinaryUtils.hashCode(str ? str.toLowerCase() : str);
+ static strHashCodeLowerCase(str) {
+ return BinaryUtils.strHashCode(str ? str.toLowerCase() : str);
}
- static contentHashCode(buffer, startPos, endPos) {
- let hash = 1;
- for (let i = startPos; i <= endPos; i++) {
- hash = 31 * hash + buffer._buffer[i];
- hash |= 0; // Convert to 32bit integer
+ static charHashCode(char) {
+ // This method calcuates hash code for the Char Ignite type
+ // char must be a js 'string' of length 1
+ return char.charCodeAt(0);
+ }
+
+ static intHashCode(int) {
+ // This method calcuates hash code for Byte, Short or Integer Ignite types
+ // int must be a js 'number'
+ return int;
+ }
+
+ static longHashCode(long) {
+ // This method calcuates hash code for the Long Ignite type
+ // long must be a js 'number'
+ const longObj = Long.fromNumber(long);
+ return longObj.getLowBits() ^ longObj.getHighBits();
+ }
+
+ static boolHashCode(bool) {
+ // This method calcuates hash code for the Boolean Ignite type
+ // bool must be a js 'boolean'
+ return bool ? 1231 : 1237;
+ }
+
+ static floatHashCode(float) {
+ // This method calcuates hash code for the Float Ignite type
+ // float must be a js 'number'
+ const buf = new ArrayBuffer(4);
+ (new Float32Array(buf))[0] = float;
+ const int32arr = new Int32Array(buf);
+ return int32arr[0];
+ }
+
+ static doubleHashCode(double) {
+ // This method calcuates hash code for the Double Ignite type
+ // double must be a js 'number'
+ const buf = new ArrayBuffer(8);
+ (new Float64Array(buf))[0] = double;
+ const uint32arr = new Uint32Array(buf);
+ return uint32arr[0] ^ uint32arr[1];
+ }
+
+ static uuidHashCode(uuid) {
+ // This method calcuates hash code for the UUID Ignite type
+ // uuid must be a js Array of 'number' of length 16
+ const buf = Buffer.from(uuid);
+ let xor = 0;
+
+ for (let i = 0; i < 16; i += 4) {
+ xor ^= buf.readUInt32BE(i);
}
- return hash;
+
+ return xor;
+ }
+
+ static timeHashCode(time) {
+ // This method calcuates hash code for the Time Ignite type
+ // time must be an instance of Date
+ const midnight = new Date(time);
+ midnight.setHours(0, 0, 0, 0);
+ const totalmsec = time.getTime() - midnight.getTime();
+ return BinaryUtils.longHashCode(totalmsec);
+ }
+
+ static datetimeHashCode(date) {
+ // This method calcuates hash code for the Timestamp and Date Ignite types
+ // date must be an instance of Date or Timestamp
+ return BinaryUtils.longHashCode(date.getTime());
}
}
diff --git a/lib/internal/ClientFailoverSocket.js b/lib/internal/ClientFailoverSocket.js
deleted file mode 100644
index 770c5c6..0000000
--- a/lib/internal/ClientFailoverSocket.js
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-'use strict';
-
-const Util = require('util');
-const Errors = require('../Errors');
-const IgniteClient = require('../IgniteClient');
-const ClientSocket = require('./ClientSocket');
-const Logger = require('./Logger');
-
-/** Socket wrapper with failover functionality: reconnects on failure. */
-class ClientFailoverSocket {
-
- constructor(onStateChanged) {
- this._socket = null;
- this._state = IgniteClient.STATE.DISCONNECTED;
- this._onStateChanged = onStateChanged;
- }
-
- async connect(config) {
- if (this._state !== IgniteClient.STATE.DISCONNECTED) {
- throw new Errors.IllegalStateError();
- }
- this._config = config;
- this._endpointsNumber = this._config._endpoints.length;
- this._endpointIndex = this._getRandomInt(this._endpointsNumber - 1);
- await this._connect();
- }
-
- async send(opCode, payloadWriter, payloadReader = null) {
- if (this._state !== IgniteClient.STATE.CONNECTED) {
- throw new Errors.IllegalStateError();
- }
- await this._socket.sendRequest(opCode, payloadWriter, payloadReader);
- }
-
- disconnect() {
- if (this._state !== IgniteClient.STATE.DISCONNECTED) {
- this._changeState(IgniteClient.STATE.DISCONNECTED);
- if (this._socket) {
- this._socket.disconnect();
- this._socket = null;
- }
- }
- }
-
- async _onSocketDisconnect(error = null) {
- this._changeState(IgniteClient.STATE.CONNECTING, null, error);
- this._socket = null;
- this._endpointIndex++;
- try {
- await this._connect();
- }
- catch (err) {
- }
- }
-
- async _connect() {
- const errors = new Array();
- let index, endpoint;
- for (let i = 0; i < this._endpointsNumber; i++) {
- index = (this._endpointIndex + i) % this._endpointsNumber;
- endpoint = this._config._endpoints[index];
- try {
- this._changeState(IgniteClient.STATE.CONNECTING, endpoint);
- this._socket = new ClientSocket(
- endpoint, this._config, this._onSocketDisconnect.bind(this));
- await this._socket.connect();
- this._changeState(IgniteClient.STATE.CONNECTED, endpoint);
- return;
- }
- catch (err) {
- errors.push(Util.format('[%s] %s', endpoint, err.message));
- }
- }
- const error = errors.join('; ');
- this._changeState(IgniteClient.STATE.DISCONNECTED, endpoint, error);
- this._socket = null;
- throw new Errors.IgniteClientError(error);
- }
-
- _changeState(state, endpoint = null, reason = null) {
- if (Logger.debug) {
- Logger.logDebug(Util.format('Socket %s: %s -> %s'),
- endpoint ? endpoint : this._socket ? this._socket._endpoint : '',
- this._getState(this._state),
- this._getState(state));
- }
- if (this._state !== state) {
- this._state = state;
- if (this._onStateChanged) {
- this._onStateChanged(state, reason);
- }
- }
- }
-
- _getState(state) {
- switch (state) {
- case IgniteClient.STATE.DISCONNECTED:
- return 'DISCONNECTED';
- case IgniteClient.STATE.CONNECTING:
- return 'CONNECTING';
- case IgniteClient.STATE.CONNECTED:
- return 'CONNECTED';
- default:
- return 'UNKNOWN';
- }
- }
-
- // returns a random integer between 0 and max
- _getRandomInt(max) {
- if (max === 0) {
- return 0;
- }
- return Math.floor(Math.random() * (max + 1));
- }
-}
-
-module.exports = ClientFailoverSocket;
diff --git a/lib/internal/ClientSocket.js b/lib/internal/ClientSocket.js
index 73e11f0..8efa5d0 100644
--- a/lib/internal/ClientSocket.js
+++ b/lib/internal/ClientSocket.js
@@ -27,12 +27,15 @@
const MessageBuffer = require('./MessageBuffer');
const BinaryUtils = require('./BinaryUtils');
const BinaryCommunicator = require('./BinaryCommunicator');
+const PartitionAwarenessUtils = require('./PartitionAwarenessUtils');
const ArgumentChecker = require('./ArgumentChecker');
const Logger = require('./Logger');
const HANDSHAKE_SUCCESS_STATUS_CODE = 1;
const REQUEST_SUCCESS_STATUS_CODE = 0;
const PORT_DEFAULT = 10800;
+const FLAG_ERROR = 1;
+const FLAG_TOPOLOGY_CHANGED = 2;
class ProtocolVersion {
@@ -78,14 +81,18 @@
const PROTOCOL_VERSION_1_0_0 = new ProtocolVersion(1, 0, 0);
const PROTOCOL_VERSION_1_1_0 = new ProtocolVersion(1, 1, 0);
const PROTOCOL_VERSION_1_2_0 = new ProtocolVersion(1, 2, 0);
+const PROTOCOL_VERSION_1_3_0 = new ProtocolVersion(1, 3, 0);
+const PROTOCOL_VERSION_1_4_0 = new ProtocolVersion(1, 4, 0);
const SUPPORTED_VERSIONS = [
// PROTOCOL_VERSION_1_0_0, // Support for QueryField precision/scale fields breaks 1.0.0 compatibility
PROTOCOL_VERSION_1_1_0,
- PROTOCOL_VERSION_1_2_0
+ PROTOCOL_VERSION_1_2_0,
+ PROTOCOL_VERSION_1_3_0,
+ PROTOCOL_VERSION_1_4_0
];
-const CURRENT_VERSION = PROTOCOL_VERSION_1_2_0;
+const CURRENT_VERSION = PROTOCOL_VERSION_1_4_0;
const STATE = Object.freeze({
INITIAL : 0,
@@ -96,22 +103,27 @@
class ClientSocket {
- constructor(endpoint, config, onSocketDisconnect) {
+ constructor(endpoint, config, communicator, onSocketDisconnect, onAffinityTopologyChange) {
ArgumentChecker.notEmpty(endpoint, 'endpoints');
this._endpoint = endpoint;
this._parseEndpoint(endpoint);
this._config = config;
+ this._communicator = communicator;
+ this._onSocketDisconnect = onSocketDisconnect;
+ this._onAffinityTopologyChange = onAffinityTopologyChange;
+
this._state = STATE.INITIAL;
- this._socket = null;
+ this._requests = new Map();
this._requestId = Long.ZERO;
this._handshakeRequestId = null;
this._protocolVersion = null;
- this._requests = new Map();
- this._onSocketDisconnect = onSocketDisconnect;
- this._error = null;
this._wasConnected = false;
+ this._socket = null;
this._buffer = null;
this._offset = 0;
+ this._error = null;
+
+ this._nodeUuid = null;
}
async connect() {
@@ -131,6 +143,14 @@
return id;
}
+ get endpoint() {
+ return this._endpoint;
+ }
+
+ get nodeUUID() {
+ return this._nodeUuid;
+ }
+
async sendRequest(opCode, payloadWriter, payloadReader = null) {
if (this._state === STATE.CONNECTED) {
return new Promise(async (resolve, reject) => {
@@ -140,7 +160,7 @@
});
}
else {
- throw new Errors.IllegalStateError();
+ throw new Errors.IllegalStateError(this._state);
}
}
@@ -154,6 +174,7 @@
const options = Object.assign({},
this._config._options,
{ host : this._host, port : this._port, version : this._version });
+
if (this._config._useTLS) {
this._socket = tls.connect(options, onConnected);
}
@@ -200,6 +221,7 @@
if (this._state === STATE.DISCONNECTED) {
return;
}
+
if (this._buffer) {
this._buffer.concat(message);
this._buffer.position = this._offset;
@@ -207,45 +229,45 @@
else {
this._buffer = MessageBuffer.from(message, 0);
}
+
while (this._buffer && this._offset < this._buffer.length) {
+ const buffer = this._buffer;
// Response length
- const length = this._buffer.readInteger() + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER);
- if (this._buffer.length < this._offset + length) {
+ const length = buffer.readInteger() + BinaryUtils.getSize(BinaryUtils.TYPE_CODE.INTEGER);
+
+ if (buffer.length < this._offset + length) {
break;
}
this._offset += length;
- let requestId, isSuccess;
+ let requestId;
const isHandshake = this._state === STATE.HANDSHAKE;
if (isHandshake) {
// Handshake status
- isSuccess = (this._buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE);
requestId = this._handshakeRequestId.toString();
}
else {
// Request id
- requestId = this._buffer.readLong().toString();
- // Status code
- isSuccess = (this._buffer.readInteger() === REQUEST_SUCCESS_STATUS_CODE);
+ requestId = buffer.readLong().toString();
}
- this._logMessage(requestId, false, this._buffer.data);
+ this._logMessage(requestId, false, buffer.getSlice(this._offset - length, length));
- const buffer = this._buffer;
- if (this._offset === this._buffer.length) {
+ if (this._offset === buffer.length) {
this._buffer = null;
this._offset = 0;
}
+
if (this._requests.has(requestId)) {
const request = this._requests.get(requestId);
this._requests.delete(requestId);
if (isHandshake) {
- await this._finalizeHandshake(buffer, request, isSuccess);
+ await this._finalizeHandshake(buffer, request);
}
else {
- await this._finalizeResponse(buffer, request, isSuccess);
+ await this._finalizeResponse(buffer, request);
}
}
else {
@@ -254,7 +276,9 @@
}
}
- async _finalizeHandshake(buffer, request, isSuccess) {
+ async _finalizeHandshake(buffer, request) {
+ const isSuccess = buffer.readByte() === HANDSHAKE_SUCCESS_STATUS_CODE;
+
if (!isSuccess) {
// Server protocol version
const serverVersion = new ProtocolVersion();
@@ -282,13 +306,39 @@
}
}
else {
+ if (this._protocolVersion.compareTo(PROTOCOL_VERSION_1_4_0) >= 0) {
+ this._nodeUuid = await this._communicator.readObject(buffer, BinaryUtils.TYPE_CODE.UUID);
+ }
+
this._state = STATE.CONNECTED;
this._wasConnected = true;
request.resolve();
}
}
- async _finalizeResponse(buffer, request, isSuccess) {
+ async _finalizeResponse(buffer, request) {
+ let statusCode, isSuccess;
+
+ if (this._protocolVersion.compareTo(PROTOCOL_VERSION_1_4_0) < 0) {
+ // Check status code
+ statusCode = buffer.readInteger();
+ isSuccess = statusCode === REQUEST_SUCCESS_STATUS_CODE;
+ }
+ else {
+ // Check flags
+ let flags = buffer.readShort();
+ isSuccess = !(flags & FLAG_ERROR);
+
+ if (flags & FLAG_TOPOLOGY_CHANGED) {
+ const newVersion = new PartitionAwarenessUtils.AffinityTopologyVersion(buffer);
+ await this._onAffinityTopologyChange(newVersion);
+ }
+
+ if (!isSuccess) {
+ statusCode = buffer.readInteger();
+ }
+ }
+
if (!isSuccess) {
// Error message
const errMessage = BinaryCommunicator.readString(buffer);
@@ -345,7 +395,7 @@
this._requests.delete(id);
});
if (this._wasConnected && callOnDisconnect && this._onSocketDisconnect) {
- this._onSocketDisconnect(this._error);
+ this._onSocketDisconnect(this, this._error);
}
if (close) {
this._onSocketDisconnect = null;
diff --git a/lib/internal/MessageBuffer.js b/lib/internal/MessageBuffer.js
index e282967..ff13c4a 100644
--- a/lib/internal/MessageBuffer.js
+++ b/lib/internal/MessageBuffer.js
@@ -184,7 +184,7 @@
}
readLong() {
- const size = BinaryUtils.getSize(BinaryUtils.TYPE_CODE.LONG)
+ const size = BinaryUtils.getSize(BinaryUtils.TYPE_CODE.LONG);
this._ensureSize(size);
const value = Long.fromBytesLE([...this._buffer.slice(this._position, this._position + size)]);
this._position += size;
diff --git a/lib/internal/PartitionAwarenessUtils.js b/lib/internal/PartitionAwarenessUtils.js
new file mode 100644
index 0000000..29b6afe
--- /dev/null
+++ b/lib/internal/PartitionAwarenessUtils.js
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+const Util = require('util');
+const BinaryUtils = require('./BinaryUtils');
+
+class AffinityTopologyVersion {
+
+ constructor(payload) {
+ this._major = payload.readLong();
+ this._minor = payload.readInteger();
+ }
+
+ compareTo(other) {
+ let diff = this._major - other._major;
+ if (diff !== 0) {
+ return diff;
+ }
+ return this._minor - other._minor;
+ }
+
+ equals(other) {
+ return this.compareTo(other) === 0;
+ }
+
+ toString() {
+ return Util.format('%d.%d', this._major, this._minor);
+ }
+}
+
+class PartitionAwarenessCacheGroup {
+
+ constructor(caches, partitionMap) {
+ this._caches = caches;
+ this._partitionMap = partitionMap;
+ }
+
+ static async build(communicator, payload) {
+ const applicable = payload.readBoolean();
+
+ const cachesNum = payload.readInteger();
+ const caches = new Array(cachesNum);
+
+ for (let i = 0; i < cachesNum; i++) {
+ const cacheId = payload.readInteger();
+
+ if (!applicable) {
+ caches[i] = [cacheId, new Map()];
+ continue;
+ }
+
+ caches[i] = [cacheId, this._readCacheKeyConfig(payload)];
+ }
+
+ if (!applicable) {
+ return new PartitionAwarenessCacheGroup(caches, new Map());
+ }
+
+ const partitionMap = await this._readPartitionMap(communicator, payload);
+
+ return new PartitionAwarenessCacheGroup(caches, partitionMap);
+ }
+
+ get caches() {
+ // Array [[cacheId, cfg]]
+ return this._caches;
+ }
+
+ get partitionMap() {
+ // Array [[nodeId, [partitions]]]
+ return this._partitionMap;
+ }
+
+ static _readCacheKeyConfig(payload) {
+ const configsNum = payload.readInteger();
+ // {Key Type ID -> Affinity Key Field ID}
+ let configs = new Map();
+
+ if (configsNum > 0) {
+ for (let i = 0; i < configsNum; i++) {
+ const keyTypeId = payload.readInteger();
+ const affinityKeyFieldId = payload.readInteger();
+
+ configs.set(keyTypeId, affinityKeyFieldId);
+ }
+ }
+
+ return configs;
+ }
+
+ static async _readPartitionMap(communicator, payload) {
+ const partitionMapSize = payload.readInteger();
+ // [[nodeId, [partitions]]]
+ const partitionMap = new Array(partitionMapSize);
+
+ for (let i = 0; i < partitionMapSize; i++) {
+ const nodeId = await communicator.readObject(payload, BinaryUtils.TYPE_CODE.UUID);
+ const partitionsNum = payload.readInteger();
+ const partitions = new Array(partitionsNum);
+
+ for (let j = 0; j < partitionsNum; j++) {
+ partitions[j] = payload.readInteger();
+ }
+
+ partitionMap[i] = [nodeId, partitions];
+ }
+
+ return partitionMap;
+ }
+}
+
+class CacheAffinityMap {
+ constructor(cacheId, partitionMapping, keyConfig) {
+ this._cacheId = cacheId;
+ this._partitionMapping = partitionMapping;
+ this._keyConfig = keyConfig;
+ }
+
+ get cacheId() {
+ return this._cacheId;
+ }
+
+ get partitionMapping() {
+ // Map {partition -> nodeId}
+ return this._partitionMapping;
+ }
+
+ get keyConfig() {
+ // Map {Key Type ID -> Affinity Key Field ID}
+ return this._keyConfig;
+ }
+}
+
+class RendezvousAffinityFunction {
+ static calcPartition(keyHash, partitionsNum) {
+ const mask = (partitionsNum & (partitionsNum - 1)) == 0 ? partitionsNum - 1 : -1;
+
+ if (mask >= 0) {
+ return (keyHash ^ (keyHash >> 16)) & mask;
+ }
+
+ return Math.abs(keyHash % partitionsNum);
+ }
+}
+
+module.exports.AffinityTopologyVersion = AffinityTopologyVersion;
+module.exports.PartitionAwarenessCacheGroup = PartitionAwarenessCacheGroup;
+module.exports.CacheAffinityMap = CacheAffinityMap;
+module.exports.RendezvousAffinityFunction = RendezvousAffinityFunction;
diff --git a/lib/internal/Router.js b/lib/internal/Router.js
new file mode 100644
index 0000000..282a366
--- /dev/null
+++ b/lib/internal/Router.js
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+const Util = require('util');
+const Errors = require('../Errors');
+const IgniteClient = require('../IgniteClient');
+const ClientSocket = require('./ClientSocket');
+const PartitionAwarenessUtils = require('./PartitionAwarenessUtils');
+const BinaryUtils = require('./BinaryUtils');
+const BinaryObject = require('../BinaryObject');
+const ArgumentChecker = require('./ArgumentChecker');
+const Logger = require('./Logger');
+
+// Number of tries to get cache partitions info
+const GET_CACHE_PARTITIONS_RETRIES = 3;
+// Delay (in milliseconds) between tries to get cache partitions info
+const GET_CACHE_PARTITIONS_DELAY = 100;
+
+class Router {
+
+ constructor(onStateChanged) {
+ this._state = IgniteClient.STATE.DISCONNECTED;
+ this._onStateChanged = onStateChanged;
+
+ this._partitionAwarenessAllowed = false;
+ // ClientSocket instance with no node UUID
+ this._legacyConnection = null;
+ // Array of endpoints which we are not connected to. Mostly used when Partition Awareness is on
+ this._inactiveEndpoints = [];
+
+ /** Partition Awareness only fields */
+ // This flag indicates if we have at least two alive connections
+ this._partitionAwarenessActive = false;
+ // Contains the background task (promise) or null
+ this._backgroundConnectTask = null;
+ // {Node UUID -> ClientSocket instance}
+ this._connections = {};
+ // {cacheId -> CacheAffinityMap}
+ this._distributionMap = new Map();
+ this._affinityTopologyVer = null;
+ }
+
+ async connect(communicator, config) {
+ if (this._state !== IgniteClient.STATE.DISCONNECTED) {
+ throw new Errors.IllegalStateError(this._state);
+ }
+
+ // Wait for background task to stop before we move forward
+ await this._waitBackgroundConnect();
+
+ this._communicator = communicator;
+ this._config = config;
+ this._partitionAwarenessAllowed = config._partitionAwareness;
+ this._inactiveEndpoints = [...config._endpoints];
+
+ await this._connect();
+ }
+
+ disconnect() {
+ if (this._state !== IgniteClient.STATE.DISCONNECTED) {
+ this._changeState(IgniteClient.STATE.DISCONNECTED);
+
+ for (const socket of this._getAllConnections()) {
+ socket.disconnect();
+ }
+
+ this._cleanUp();
+ }
+ }
+
+ async send(opCode, payloadWriter, payloadReader = null, affinityHint = null) {
+ if (this._state !== IgniteClient.STATE.CONNECTED) {
+ throw new Errors.IllegalStateError(this._state);
+ }
+
+ if (this._partitionAwarenessActive && affinityHint) {
+ await this._affinitySend(opCode, payloadWriter, payloadReader, affinityHint);
+ }
+ else {
+ // If _partitionAwarenessActive flag is not set, we have exactly one connection
+ // but it can be either a legacy one or a modern one (with node UUID)
+ // If affinityHint has not been passed, we want to always use one socket (as long as it is alive)
+ // because some requests (e.g., SQL cursor-related) require to be sent to the same cluster node
+ await this._getAllConnections()[0].sendRequest(opCode, payloadWriter, payloadReader);
+ }
+ }
+
+ async _connect() {
+ const errors = new Array();
+ const endpoints = this._inactiveEndpoints;
+ const config = this._config;
+ const communicator = this._communicator;
+ const onSocketDisconnect = this._onSocketDisconnect.bind(this);
+ const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this);
+ const endpointsNum = endpoints.length;
+ const random = this._getRandomInt(endpointsNum);
+
+ this._changeState(IgniteClient.STATE.CONNECTING);
+
+ for (let i = 0; i < endpoints.length; i++) {
+ const index = (i + random) % endpointsNum;
+ const endpoint = endpoints[index];
+
+ try {
+ const socket = new ClientSocket(
+ endpoint, config, communicator,
+ onSocketDisconnect,
+ onAffinityTopologyChange);
+
+ await socket.connect();
+ Logger.logDebug(Util.format('Connected to %s', endpoint));
+ this._changeState(IgniteClient.STATE.CONNECTED);
+ this._addConnection(socket);
+
+ this._runBackgroundConnect();
+
+ return;
+ }
+ catch (err) {
+ Logger.logDebug(Util.format('Could not connect to %s. Error: "%s"', endpoint, err.message));
+ errors.push(Util.format('[%s] %s', endpoint, err.message));
+ }
+ }
+
+ const error = errors.join('; ');
+ this._changeState(IgniteClient.STATE.DISCONNECTED, error);
+ throw new Errors.IgniteClientError(error);
+ }
+
+ // Can be called when there are no alive connections left
+ async _reconnect() {
+ await this._waitBackgroundConnect();
+ await this._connect();
+ }
+
+ _runBackgroundConnect() {
+ if (this._partitionAwarenessAllowed && !this._backgroundConnectTask) {
+ // Only one task can be active
+ this._backgroundConnectTask = this._backgroundConnect();
+ this._backgroundConnectTask.then(() => this._backgroundConnectTask = null);
+ }
+ }
+
+ async _waitBackgroundConnect() {
+ if (this._backgroundConnectTask) {
+ await this._backgroundConnectTask;
+ }
+ }
+
+ async _backgroundConnect() {
+ // Local copy of _inactiveEndpoints to make sure the array is not being changed during the 'for' cycle
+ const endpoints = [...this._inactiveEndpoints];
+ const config = this._config;
+ const communicator = this._communicator;
+ const onSocketDisconnect = this._onSocketDisconnect.bind(this);
+ const onAffinityTopologyChange = this._onAffinityTopologyChange.bind(this);
+
+ for (const endpoint of endpoints) {
+ const socket = new ClientSocket(
+ endpoint, config, communicator,
+ onSocketDisconnect,
+ onAffinityTopologyChange);
+
+ try {
+ await socket.connect();
+ Logger.logDebug(Util.format('Connected (in background) to %s', endpoint));
+
+ // While we were waiting for socket to connect, someone could call disconnect()
+ if (this._state !== IgniteClient.STATE.CONNECTED) {
+ // If became not connected, stop this task
+ socket.disconnect();
+ return;
+ }
+
+ this._addConnection(socket);
+ }
+ catch (err) {
+ Logger.logDebug(Util.format('Could not connect (in background) to %s. Error: "%s"', endpoint, err.message));
+
+ // While we were waiting for socket to connect, someone could call disconnect()
+ if (this._state !== IgniteClient.STATE.CONNECTED) {
+ // If became not connected, stop this task
+ socket.disconnect();
+ return;
+ }
+ }
+ }
+ }
+
+ _cleanUp() {
+ this._legacyConnection = null;
+ this._inactiveEndpoints = [];
+
+ this._partitionAwarenessActive = false;
+ this._connections = {};
+ this._distributionMap = new Map();
+ this._affinityTopologyVer = null;
+ }
+
+ _getAllConnections() {
+ const allConnections = Object.values(this._connections);
+
+ if (this._legacyConnection) {
+ allConnections.push(this._legacyConnection);
+ }
+
+ return allConnections;
+ }
+
+ _addConnection(socket) {
+ const nodeUUID = socket.nodeUUID;
+
+ if (this._partitionAwarenessAllowed && nodeUUID) {
+ if (nodeUUID in this._connections) {
+ // This can happen if the same node has several IPs
+ // We will keep more fresh connection alive
+ this._connections[nodeUUID].disconnect();
+ }
+ this._connections[nodeUUID] = socket;
+ }
+ else {
+ if (this._legacyConnection) {
+ // We already have a legacy connection
+ // We will keep more fresh connection alive
+ this._legacyConnection.disconnect();
+ }
+ this._legacyConnection = socket;
+ }
+ // Remove the endpoint from _inactiveEndpoints
+ const index = this._inactiveEndpoints.indexOf(socket.endpoint);
+ if (index > -1) {
+ this._inactiveEndpoints.splice(index, 1);
+ }
+
+ if (!this._partitionAwarenessActive &&
+ this._getAllConnections().length >= 2) {
+ this._partitionAwarenessActive = true;
+ }
+ }
+
+ _removeConnection(socket) {
+ if (socket.nodeUUID in this._connections) {
+ delete this._connections[socket.nodeUUID];
+ // Add the endpoint to _inactiveEndpoints
+ this._inactiveEndpoints.push(socket.endpoint);
+ }
+ else if (this._legacyConnection == socket) {
+ this._legacyConnection = null;
+ // Add the endpoint to _inactiveEndpoints
+ this._inactiveEndpoints.push(socket.endpoint);
+ }
+
+ if (this._partitionAwarenessActive &&
+ this._getAllConnections().length < 2) {
+ this._partitionAwarenessActive = false;
+ }
+ }
+
+ async _onSocketDisconnect(socket, error = null) {
+ this._removeConnection(socket);
+
+ if (this._getAllConnections().length != 0) {
+ // We had more than one connection before this disconnection
+ this._runBackgroundConnect();
+ return;
+ }
+
+ try {
+ await this._reconnect();
+ }
+ catch (err) {
+ this._cleanUp();
+ }
+ }
+
+ /** Partition Awareness methods */
+
+ async _affinitySend(opCode, payloadWriter, payloadReader, affinityHint) {
+ let connection = await this._chooseConnection(affinityHint);
+
+ while (true) {
+ Logger.logDebug('Endpoint chosen: ' + connection.endpoint);
+
+ try {
+ await connection.sendRequest(opCode, payloadWriter, payloadReader);
+ return;
+ }
+ catch (err) {
+ if (!(err instanceof Errors.LostConnectionError)) {
+ throw err;
+ }
+
+ Logger.logDebug(connection.endpoint + ' is unavailable');
+
+ this._removeConnection(connection);
+
+ if (this._getAllConnections().length == 0) {
+ throw new Errors.LostConnectionError('Cluster is unavailable');
+ }
+ }
+
+ connection = this._getRandomConnection();
+ Logger.logDebug('Node has been chosen randomly');
+ }
+ }
+
+ async _chooseConnection(affinityHint) {
+ const cacheId = affinityHint.cacheId;
+
+ if (!this._distributionMap.has(cacheId)) {
+ Logger.logDebug('Distribution map does not have info for the cache ' + cacheId);
+ Logger.logDebug('Node has been chosen randomly');
+ // We are not awaiting here in order to not increase latency of requests
+ this._getCachePartitions(cacheId);
+ return this._getRandomConnection();
+ }
+
+ const cacheAffinityMap = this._distributionMap.get(cacheId);
+
+ const nodeId = await this._determineNodeId(cacheAffinityMap,
+ affinityHint.key,
+ affinityHint.keyType);
+
+ if (nodeId in this._connections) {
+ Logger.logDebug('Node has been chosen by affinity');
+ return this._connections[nodeId];
+ }
+
+ Logger.logDebug('Node has been chosen randomly');
+ return this._getRandomConnection();
+ }
+
+ async _determineNodeId(cacheAffinityMap, key, keyType) {
+ const partitionMap = cacheAffinityMap.partitionMapping;
+
+ if (partitionMap.size == 0) {
+ return null;
+ }
+
+ const keyAffinityMap = cacheAffinityMap.keyConfig;
+
+ const affinityKeyInfo = await this._affinityKeyInfo(key, keyType);
+
+ let affinityKey = affinityKeyInfo.key;
+ let affinityKeyTypeCode = affinityKeyInfo.typeCode;
+
+ if ('typeId' in affinityKeyInfo && keyAffinityMap.has(affinityKeyInfo.typeId)) {
+ const affinityKeyTypeId = keyAffinityMap.get(affinityKeyInfo.typeId);
+
+ if (affinityKey instanceof BinaryObject &&
+ affinityKey._fields.has(affinityKeyTypeId)) {
+ const field = affinityKey._fields.get(affinityKeyTypeId);
+ affinityKey = await field.getValue();
+ affinityKeyTypeCode = field.typeCode;
+ }
+ }
+
+ const keyHash = await BinaryUtils.hashCode(affinityKey, this._communicator, affinityKeyTypeCode);
+ const partition = PartitionAwarenessUtils.RendezvousAffinityFunction.calcPartition(keyHash, partitionMap.size);
+ Logger.logDebug('Partition = ' + partition);
+
+ const nodeId = partitionMap.get(partition);
+ Logger.logDebug('Node ID = ' + nodeId);
+
+ return nodeId;
+ }
+
+ async _affinityKeyInfo(key, keyType) {
+ let typeCode = BinaryUtils.getTypeCode(keyType ? keyType : BinaryUtils.calcObjectType(key));
+
+ if (typeCode == BinaryUtils.TYPE_CODE.BINARY_OBJECT) {
+ return {'key': key, 'typeCode': typeCode, 'typeId': key._getTypeId()};
+ }
+
+ if (typeCode == BinaryUtils.TYPE_CODE.COMPLEX_OBJECT) {
+ const binObj = await BinaryObject.fromObject(key, keyType);
+ typeCode = BinaryUtils.TYPE_CODE.BINARY_OBJECT;
+
+ return {'key': binObj, 'typeCode': typeCode, 'typeId': binObj._getTypeId()};
+ }
+
+ return {'key': key, 'typeCode': typeCode};
+ }
+
+ async _onAffinityTopologyChange(newVersion) {
+ if (!this._versionIsNewer(newVersion)) {
+ return;
+ }
+
+ Logger.logDebug('New topology version reported: ' + newVersion);
+
+ this._affinityTopologyVer = newVersion;
+ this._distributionMap = new Map();
+
+ this._runBackgroundConnect();
+ }
+
+ async _getCachePartitions(cacheId, tries = GET_CACHE_PARTITIONS_RETRIES) {
+ if (tries <= 0) {
+ return;
+ }
+
+ Logger.logDebug('Getting cache partitions info...');
+
+ try {
+ await this.send(
+ BinaryUtils.OPERATION.CACHE_PARTITIONS,
+ async (payload) => {
+ // We always request partition map for one cache
+ payload.writeInteger(1);
+ payload.writeInteger(cacheId);
+ },
+ this._handleCachePartitions.bind(this));
+ }
+ catch (err) {
+ if (err instanceof Errors.LostConnectionError) {
+ return;
+ }
+
+ // Retries in case of an error (most probably
+ // "Getting affinity for topology version earlier than affinity is calculated")
+ await this._sleep(GET_CACHE_PARTITIONS_DELAY);
+ this._getCachePartitions(cacheId, tries - 1);
+ }
+ }
+
+ async _handleCachePartitions(payload) {
+ const affinityTopologyVer = new PartitionAwarenessUtils.AffinityTopologyVersion(payload);
+ Logger.logDebug('Partitions info for topology version ' + affinityTopologyVer);
+
+ if (this._versionIsNewer(affinityTopologyVer)) {
+ this._distributionMap = new Map();
+ this._affinityTopologyVer = affinityTopologyVer;
+ Logger.logDebug('New affinity topology version: ' + affinityTopologyVer);
+ } else if (this._versionIsOlder(affinityTopologyVer)) {
+ Logger.logDebug('Topology version is outdated. Actual version: ' + this._affinityTopologyVer);
+ return;
+ }
+
+ const groupsNum = payload.readInteger();
+ Logger.logDebug('Partitions info for ' + groupsNum + ' cache groups received');
+
+ for (let i = 0; i < groupsNum; i++) {
+ const group = await PartitionAwarenessUtils.PartitionAwarenessCacheGroup.build(this._communicator, payload);
+ // {partition -> nodeId}
+ const partitionMapping = new Map();
+
+ for (const [nodeId, partitions] of group.partitionMap) {
+ for (const partition of partitions) {
+ partitionMapping.set(partition, nodeId);
+ }
+ }
+
+ for (const [cacheId, config] of group.caches) {
+ const cacheAffinityMap = new PartitionAwarenessUtils.CacheAffinityMap(cacheId, partitionMapping, config);
+ this._distributionMap.set(cacheId, cacheAffinityMap);
+ Logger.logDebug('Partitions info for cache: ' + cacheId);
+ }
+ }
+
+ Logger.logDebug('Got cache partitions info');
+ }
+
+ _getRandomConnection() {
+ const allConnections = this._getAllConnections();
+ return allConnections[this._getRandomInt(allConnections.length)];
+ }
+
+ _changeState(state, reason = null) {
+ if (Logger.debug) {
+ Logger.logDebug(Util.format('Router state: %s -> %s'),
+ this._getState(this._state),
+ this._getState(state));
+ }
+ if (this._state !== state) {
+ this._state = state;
+ if (this._onStateChanged) {
+ this._onStateChanged(state, reason);
+ }
+ }
+ }
+
+ _getState(state) {
+ switch (state) {
+ case IgniteClient.STATE.DISCONNECTED:
+ return 'DISCONNECTED';
+ case IgniteClient.STATE.CONNECTING:
+ return 'CONNECTING';
+ case IgniteClient.STATE.CONNECTED:
+ return 'CONNECTED';
+ default:
+ return 'UNKNOWN';
+ }
+ }
+
+ _versionIsNewer(version) {
+ return this._affinityTopologyVer === null ||
+ this._affinityTopologyVer.compareTo(version) < 0;
+ }
+
+ _versionIsOlder(version) {
+ return this._affinityTopologyVer !== null &&
+ this._affinityTopologyVer.compareTo(version) > 0;
+ }
+
+ // Returns a random integer between 0 and max - 1
+ _getRandomInt(max) {
+ if (max === 0) {
+ return 0;
+ }
+ return Math.floor(Math.random() * max);
+ }
+
+ _sleep(milliseconds) {
+ return new Promise(resolve => setTimeout(resolve, milliseconds));
+ }
+}
+
+module.exports = Router;
diff --git a/package.json b/package.json
index dc6898c..6497f67 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "apache-ignite-client",
- "version": "1.0.1",
+ "version": "1.1.0",
"description": "NodeJS Client for Apache Ignite",
"main": "index.js",
"files": [
@@ -29,12 +29,16 @@
"scripts": {
"test": "jasmine",
"test:examples": "node ./spec/ExamplesExecutor.js Examples",
- "test:auth_example": "node ./spec/ExamplesExecutor.js AuthExample"
+ "test:auth_example": "node ./spec/ExamplesExecutor.js AuthExample",
+ "test:affinity_awareness": "APACHE_IGNITE_CLIENT_AFFINITY_AWARENESS=true node ./spec/PartitionAwarenessExecutor.js",
+ "test:partition_awareness": "APACHE_IGNITE_CLIENT_AFFINITY_AWARENESS=true node ./spec/PartitionAwarenessExecutor.js"
},
"devDependencies": {
"jasmine": "3.6.1",
"jasmine-expect": "4.0.0",
- "jasmine-reporters": "2.1.1"
+ "jasmine-reporters": "2.1.1",
+ "ps-tree": "latest",
+ "process-exists": "latest",
+ "glob": "7.1.6"
}
}
-
diff --git a/spec/LogReader.js b/spec/LogReader.js
new file mode 100644
index 0000000..5abce5c
--- /dev/null
+++ b/spec/LogReader.js
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+const fs = require('fs');
+const readline = require('readline');
+
+// Helper class for working with GG logs
+class LogReader {
+ constructor(file) {
+ this._lastLine = 0;
+ this._file = file;
+ }
+
+ async nextRequest() {
+ let stream = null;
+ let readInterface = null;
+
+ let cleanUp = () => {
+ if (stream) {
+ stream.close();
+ stream = null;
+ }
+
+ if (readInterface) {
+ readInterface.close();
+ readInterface = null;
+ }
+ }
+
+ return await new Promise((resolve) => {
+ stream = fs.createReadStream(this._file);
+ readInterface = readline.createInterface({
+ input: stream,
+ crlfDelay: Infinity
+ });
+
+ let resolved = false;
+
+ let i = -1;
+ readInterface.on('line', (line) => {
+ if (resolved)
+ return;
+
+ ++i;
+ if (i <= this._lastLine)
+ return;
+
+ this._lastLine = i;
+
+ const res = line.match(/Client request received .*?req=org.apache.ignite.internal.processors.platform.client.cache.ClientCache([a-zA-Z]+)Request@/);
+ if (res) {
+ resolved = true;
+ cleanUp();
+ resolve(res[1].normalize());
+ }
+ });
+
+ readInterface.on('close', () => {
+ cleanUp();
+ if (!resolved)
+ resolve(null);
+ });
+ })
+ .catch((_err) => {});
+ }
+}
+
+module.exports = LogReader;
diff --git a/spec/PartitionAwarenessExecutor.js b/spec/PartitionAwarenessExecutor.js
new file mode 100644
index 0000000..f546d43
--- /dev/null
+++ b/spec/PartitionAwarenessExecutor.js
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const Jasmine = require('jasmine');
+
+const jasmine = new Jasmine();
+jasmine.loadConfig({
+ 'spec_dir': 'spec',
+ 'spec_files': [
+ "partition_awareness/**/*[sS]pec.js",
+ "cache/**/*[sS]pec.js",
+ "query/**/*[sS]pec.js"
+ ],
+ "random": false,
+ // If this is set to true, we won't clean up environment, i.e. stop nodes
+ "stopOnSpecFailure": false
+});
+// We exclude the "scan query test suite > scan query settings" spec because sometimes it fails with more than one node cluster
+jasmine.execute(null, "(?!^scan query test suite > scan query settings$)(^.*$)");
\ No newline at end of file
diff --git a/spec/TestingHelper.js b/spec/TestingHelper.js
index 25465bb..fee97bc 100644
--- a/spec/TestingHelper.js
+++ b/spec/TestingHelper.js
@@ -20,10 +20,14 @@
require('jasmine-expect');
const JasmineReporters = require('jasmine-reporters');
+const psTree = require('ps-tree');
const Util = require('util');
-const exec = require('child_process').exec;
+const path = require('path');
+const fs = require('fs');
+const child_process = require('child_process');
const config = require('./config');
const IgniteClient = require('apache-ignite-client');
+const LogReader = require('./LogReader');
const IgniteClientConfiguration = IgniteClient.IgniteClientConfiguration;
const Errors = IgniteClient.Errors;
const EnumItem = IgniteClient.EnumItem;
@@ -182,20 +186,50 @@
return arrayValues;
}
- // Initializes testing environment: creates and starts the library client, sets default jasmine test timeout.
- // Should be called from any test suite beforeAll method.
- static async init() {
+ // Initializes only cluster
+ static async initClusterOnly(serversNum = 1, needLogging = false) {
jasmine.DEFAULT_TIMEOUT_INTERVAL = TIMEOUT_MS;
- TestingHelper._igniteClient = new IgniteClient();
- TestingHelper._igniteClient.setDebug(config.debug);
- await TestingHelper._igniteClient.connect(new IgniteClientConfiguration(...config.endpoints));
+ await TestingHelper.startTestServers(needLogging, serversNum);
+ }
+
+ // Create test client instance
+ static makeClient() {
+ const client = new IgniteClient();
+ client.setDebug(config.debug);
+ return client;
+ }
+
+ // Initializes testing environment: creates and starts the library client, sets default jasmine test timeout.
+ // Should be called from any test suite beforeAll method.
+ static async init(partitionAwareness = config.partitionAwareness, serversNum = 1, needLogging = false, endpoints) {
+ jasmine.DEFAULT_TIMEOUT_INTERVAL = TIMEOUT_MS;
+
+ if (!endpoints)
+ endpoints = TestingHelper.getEndpoints(serversNum);
+
+ await TestingHelper.startTestServers(needLogging, serversNum);
+
+ TestingHelper._igniteClient = TestingHelper.makeClient();
+ await TestingHelper._igniteClient.connect(new IgniteClientConfiguration(...endpoints).
+ setConnectionOptions(false, null, partitionAwareness));
}
// Cleans up testing environment.
// Should be called from any test suite afterAll method.
static async cleanUp() {
- await TestingHelper.igniteClient.disconnect();
+ try {
+ if (TestingHelper._igniteClient) {
+ await TestingHelper._igniteClient.disconnect();
+ delete TestingHelper._igniteClient;
+ }
+
+ if (TestingHelper._logReaders)
+ delete TestingHelper._logReaders;
+ }
+ finally {
+ await TestingHelper.stopTestServers();
+ }
}
static get igniteClient() {
@@ -211,9 +245,296 @@
}
}
+ static getEndpoints(serversNum) {
+ if (serversNum < 1)
+ throw 'Wrong number of nodes: ' + serversNum;
+
+ let res = [];
+ for (let i = 1; i < serversNum + 1; ++i)
+ res.push('127.0.0.1:' + (10800 + i));
+
+ return res;
+ }
+
+ static isWindows() {
+ return process.platform === 'win32';
+ }
+
+ static getNodeRunner() {
+ if (!config.igniteHome)
+ throw 'Can not start node: IGNITE_HOME is not set';
+
+ const ext = TestingHelper.isWindows() ? '.bat' : '.sh';
+ const runner = path.join(config.igniteHome, 'bin', 'ignite' + ext);
+ if (!fs.existsSync(runner))
+ throw 'Can not find ' + runner + '. Please, check your IGNITE_HOME environment variable';
+
+ return runner;
+ }
+
+ static getConfigPath(needLogging, idx = 1) {
+ if (!needLogging)
+ return path.join(__dirname, 'configs', 'ignite-config-default.xml');
+
+ return path.join(__dirname, 'configs', Util.format('ignite-config-%d.xml', idx));
+ }
+
+ static async sleep(milliseconds) {
+ return new Promise(resolve => setTimeout(resolve, milliseconds));
+ }
+
+ static async waitForCondition(cond, timeout) {
+ const startTime = Date.now();
+ let now = startTime;
+ do {
+ const ok = await cond();
+ if (ok)
+ return true;
+
+ await TestingHelper.sleep(100);
+ now = Date.now();
+ } while ((now - startTime) < timeout);
+
+ return await cond();
+ }
+
+ static async waitForConditionOrThrow(cond, timeout) {
+ const startTime = Date.now();
+
+ while (!await cond()) {
+ if (Date.now() - startTime > timeout) {
+ throw 'Failed to achive condition within timeout ' + timeout;
+ }
+
+ await TestingHelper.sleep(100);
+ }
+ }
+
+ static async tryConnectClient(idx = 1, debug = false) {
+ const endPoint = Util.format('127.0.0.1:%d', 10800 + idx);
+
+ TestingHelper.logDebug('Checking endpoint: ' + endPoint);
+
+ let cli = new IgniteClient();
+ cli.setDebug(debug);
+
+ return await cli.connect(new IgniteClientConfiguration(endPoint).
+ setConnectionOptions(false, null, false)).
+ then(() => {
+ TestingHelper.logDebug('Successfully connected');
+ cli.disconnect();
+ return true;
+ }).
+ catch(error => {
+ TestingHelper.logDebug('Error while connecting: ' + error.toString());
+ return false;
+ });
+ }
+
+ static async startTestServers(needLogging, serversNum) {
+ TestingHelper.logDebug('Starting ' + serversNum + ' node[s]');
+ if (serversNum < 0)
+ throw 'Wrong number of servers to start: ' + serversNum;
+
+ for (let i = 1; i < serversNum + 1; ++i)
+ await TestingHelper.startTestServer(needLogging, i);
+ }
+
+ static async startTestServer(needLogging, idx) {
+ if (!TestingHelper._servers)
+ TestingHelper._servers = [];
+
+ if (!TestingHelper._logReaders)
+ TestingHelper._logReaders = new Map();
+
+ TestingHelper._servers.push(await TestingHelper._startNode(needLogging, idx));
+
+ const logs = TestingHelper.getLogFiles(idx);
+ if (!needLogging && logs.length > 0)
+ throw 'Unexpected log file for node ' + idx;
+
+ if (needLogging) {
+ if (logs.length != 1)
+ throw 'Unexpected number of log files for node ' + idx + ': ' + logs.length;
+
+ TestingHelper._logReaders.set(idx, new LogReader(logs[0]));
+ }
+ }
+
+ static async stopTestServers() {
+ if (TestingHelper._servers) {
+ for (let server of TestingHelper._servers) {
+ await TestingHelper.killNodeAndWait(server);
+ }
+
+ delete TestingHelper._servers;
+ }
+ }
+
+ static async killNodeByIdAndWait(idx) {
+ if (!TestingHelper._servers || idx < 0 || idx > TestingHelper._servers.length)
+ throw 'Invalid index';
+
+ const srv = TestingHelper._servers[idx - 1];
+ if (srv)
+ await TestingHelper.killNodeAndWait(srv);
+ }
+
+ static async killNodeAndWait(proc) {
+ const ProcessExists = require('process-exists');
+
+ const pid = proc.pid;
+ TestingHelper.killNode(proc);
+
+ await TestingHelper.waitForConditionOrThrow(async () => {
+ return !(await ProcessExists(pid));
+ }, 5000);
+ }
+
+ static killNode(proc) {
+ if (TestingHelper.isWindows()) {
+ child_process.spawnSync('taskkill', ['/F', '/T', '/PID', proc.pid.toString()])
+ }
+ psTree(proc.pid, function (err, children) {
+ children.map((p) => {
+ try {
+ process.kill(p.PID, 'SIGKILL');
+ }
+ catch (_error) {
+ // No-op.
+ }
+ });
+ });
+ }
+
+ // Make sure that topology is stable, version won't change and partition map is up-to-date for the given cache.
+ static async ensureStableTopology(igniteClient, cache, key = 1, skipLogs=false, timeout=5000) {
+ let oldTopVer = igniteClient._router._affinityTopologyVer;
+
+ await cache.get(key);
+
+ let newTopVer = igniteClient._router._affinityTopologyVer;
+
+ while (newTopVer !== oldTopVer) {
+ oldTopVer = newTopVer;
+ await cache.get(key);
+ newTopVer = igniteClient._router._affinityTopologyVer;
+ }
+
+ // Now when topology stopped changing, let's ensure we received distribution map.
+ let ok = await TestingHelper.waitForCondition(async () => {
+ await cache.get(key);
+ return await TestingHelper._waitMapObtained(igniteClient, cache, 1000);
+ }, timeout);
+
+ if (!ok)
+ throw 'getting of partition map timed out';
+
+ if (skipLogs)
+ await TestingHelper.getRequestGridIdx();
+ }
+
+ // Waiting for distribution map to be obtained.
+ static async _waitMapObtained(igniteClient, cache, timeout) {
+ return await TestingHelper.waitForCondition(() => {
+ return igniteClient._router._distributionMap.has(cache._cacheId);
+ }, timeout);
+ }
+
+ static async readLogFile(idx) {
+ const reader = TestingHelper._logReaders.get(idx);
+ if (!reader) {
+ TestingHelper.logDebug('WARNING: Reader is null');
+ return null;
+ }
+
+ return await reader.nextRequest();
+ }
+
+ static async getRequestGridIdx(message='Get') {
+ if (!TestingHelper._logReaders)
+ throw 'Logs are not enabled for the cluster';
+
+ let res = -1
+ for(let [id, logReader] of TestingHelper._logReaders) {
+ if (!logReader)
+ continue;
+
+ let req = null;
+ do {
+ req = await logReader.nextRequest();
+ TestingHelper.logDebug('Node' + id +': Got ' + req + ', looking for ' + message);
+ if (req === message)
+ res = id;
+ } while (req != null);
+ }
+
+ TestingHelper.logDebug('Request "' + message + '" node: ' + res);
+
+ return res;
+ }
+
+ static getLogFiles(idx) {
+ const glob = require('glob');
+ // glob package only works with slashes so no need in 'path' here.
+ const logsPattern = Util.format('./logs/ignite-log-%d*.txt', idx);
+ const res = glob.sync(logsPattern);
+ return res;
+ }
+
+ static clearLogs(idx) {
+ for (const f of TestingHelper.getLogFiles(idx))
+ fs.unlinkSync(f);
+ }
+
+ static async _startNode(needLogging, idx = 1) {
+ TestingHelper.clearLogs(idx);
+
+ const runner = TestingHelper.getNodeRunner();
+
+ let nodeEnv = {};
+ for (const ev in process.env)
+ nodeEnv[ev] = process.env[ev];
+
+ if (config.debug) {
+ nodeEnv['JVM_OPTS'] = '-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE \
+ -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=' + (5005 + idx);
+ }
+
+ const nodeCfg = TestingHelper.getConfigPath(needLogging, idx);
+ TestingHelper.logDebug('Trying to start node using following command: ' + runner + ' ' + nodeCfg);
+
+ const srv = child_process.spawn(runner, [nodeCfg], {env: nodeEnv});
+
+ srv.on('error', (error) => {
+ jasmine.fail('Failed to start node: ' + error);
+ throw 'Failed to start node: ' + error;
+ });
+
+ srv.stdout.on('data', (data) => {
+ if (config.nodeDebug)
+ console.log(data.toString());
+ });
+
+ srv.stderr.on('data', (data) => {
+ if (config.nodeDebug)
+ console.error(data.toString());
+ });
+
+ const started = await TestingHelper.waitForCondition(async () =>
+ TestingHelper.tryConnectClient(idx), 10000);
+
+ if (!started) {
+ await TestingHelper.killNodeAndWait(srv);
+ throw 'Failed to start Node: timeout while trying to connect';
+ }
+
+ return srv
+ }
+
static executeExample(name, outputChecker) {
return new Promise((resolve, reject) => {
- exec('node ' + name, (error, stdout, stderr) => {
+ child_process.exec('node ' + name, (error, stdout, stderr) => {
TestingHelper.logDebug(stdout);
resolve(stdout);
})
diff --git a/spec/config.js b/spec/config.js
index 747170a..d8966a8 100644
--- a/spec/config.js
+++ b/spec/config.js
@@ -18,9 +18,14 @@
'use strict';
exports.endpoints = process.env.APACHE_IGNITE_CLIENT_ENDPOINTS ?
- process.env.APACHE_IGNITE_CLIENT_ENDPOINTS.split(',') : [];
+ process.env.APACHE_IGNITE_CLIENT_ENDPOINTS.split(',') : [];
exports.debug = process.env.APACHE_IGNITE_CLIENT_DEBUG === 'true' ||
- process.env.APACHE_IGNITE_CLIENT_DEBUG === '1';
+ process.env.APACHE_IGNITE_CLIENT_DEBUG === '1';
+exports.nodeDebug = process.env.APACHE_IGNITE_SERVER_DEBUG === 'true' ||
+ process.env.APACHE_IGNITE_SERVER_DEBUG === '1';
+exports.partitionAwareness = process.env.APACHE_IGNITE_CLIENT_PARTITION_AWARENESS === 'true' ||
+ process.env.APACHE_IGNITE_CLIENT_PARTITION_AWARENESS === '1';
+exports.igniteHome = process.env.IGNITE_HOME;
//exports.endpoints = ['127.0.0.1:10800'];
diff --git a/spec/configs/ignite-config-1.xml b/spec/configs/ignite-config-1.xml
new file mode 100644
index 0000000..6aa94df
--- /dev/null
+++ b/spec/configs/ignite-config-1.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="10801"/>
+ <property name="portRange" value="0"/>
+ </bean>
+ </property>
+
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="spec/configs/log4j-1.xml"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/ignite-config-2.xml b/spec/configs/ignite-config-2.xml
new file mode 100644
index 0000000..d06420c
--- /dev/null
+++ b/spec/configs/ignite-config-2.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="10802"/>
+ <property name="portRange" value="0"/>
+ </bean>
+ </property>
+
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="spec/configs/log4j-2.xml"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/ignite-config-3.xml b/spec/configs/ignite-config-3.xml
new file mode 100644
index 0000000..2143aab
--- /dev/null
+++ b/spec/configs/ignite-config-3.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="10803"/>
+ <property name="portRange" value="0"/>
+ </bean>
+ </property>
+
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="spec/configs/log4j-3.xml"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/ignite-config-4.xml b/spec/configs/ignite-config-4.xml
new file mode 100644
index 0000000..33c7e8a
--- /dev/null
+++ b/spec/configs/ignite-config-4.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="10804"/>
+ <property name="portRange" value="0"/>
+ </bean>
+ </property>
+
+ <property name="gridLogger">
+ <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+ <constructor-arg type="java.lang.String" value="spec/configs/log4j-4.xml"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/ignite-config-base.xml b/spec/configs/ignite-config-base.xml
new file mode 100644
index 0000000..f4bd067
--- /dev/null
+++ b/spec/configs/ignite-config-base.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" abstract="true" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="localAddress" value="127.0.0.1"/>
+ <property name="localPort" value="48500"/>
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:48500..48503</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300"/>
+ </bean>
+ </property>
+
+ <property name="communicationSpi">
+ <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
+ <property name="localAddress" value="127.0.0.1"/>
+ <property name="localPort" value="48100"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="custom-affinity"/>
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ <property name="affinity">
+ <bean class="org.apache.ignite.internal.processors.affinity.LocalAffinityFunction"/>
+ </property>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="partitioned0"/>
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="backups" value="0"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="partitioned1"/>
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="backups" value="1"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="partitioned3"/>
+ <property name="cacheMode" value="PARTITIONED"/>
+ <property name="backups" value="3"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ </bean>
+
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="replicated"/>
+ <property name="cacheMode" value="REPLICATED"/>
+ <property name="writeSynchronizationMode" value="FULL_SYNC"/>
+ </bean>
+ </list>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/ignite-config-default.xml b/spec/configs/ignite-config-default.xml
new file mode 100644
index 0000000..b4d8879
--- /dev/null
+++ b/spec/configs/ignite-config-default.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <import resource="ignite-config-base.xml"/>
+
+ <bean parent="grid.cfg">
+ <property name="clientConnectorConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="host" value="127.0.0.1"/>
+ <property name="port" value="10801"/>
+ <property name="portRange" value="10"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/spec/configs/log4j-1.xml b/spec/configs/log4j-1.xml
new file mode 100644
index 0000000..307baf5
--- /dev/null
+++ b/spec/configs/log4j-1.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+ Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="Debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="logs/ignite-log-1.txt"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener">
+ <level value="DEBUG"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print out all info by default. -->
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
diff --git a/spec/configs/log4j-2.xml b/spec/configs/log4j-2.xml
new file mode 100644
index 0000000..a0637ae
--- /dev/null
+++ b/spec/configs/log4j-2.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+ Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="Debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="logs/ignite-log-2.txt"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener">
+ <level value="DEBUG"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print out all info by default. -->
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
diff --git a/spec/configs/log4j-3.xml b/spec/configs/log4j-3.xml
new file mode 100644
index 0000000..1d2fa51
--- /dev/null
+++ b/spec/configs/log4j-3.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+ Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="Debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="logs/ignite-log-3.txt"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener">
+ <level value="DEBUG"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print out all info by default. -->
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
diff --git a/spec/configs/log4j-4.xml b/spec/configs/log4j-4.xml
new file mode 100644
index 0000000..8addfbf
--- /dev/null
+++ b/spec/configs/log4j-4.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+ Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="Debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+ <param name="Threshold" value="DEBUG"/>
+ <param name="File" value="logs/ignite-log-4.txt"/>
+ <param name="Append" value="true"/>
+ <param name="MaxFileSize" value="10MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.ignite.internal.processors.odbc.ClientListenerNioListener">
+ <level value="DEBUG"/>
+ </category>
+
+ <!-- Default settings. -->
+ <root>
+ <!-- Print out all info by default. -->
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
diff --git a/spec/partition_awareness/PartitionAwarenessConnection.spec.js b/spec/partition_awareness/PartitionAwarenessConnection.spec.js
new file mode 100644
index 0000000..6f1ee92
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessConnection.spec.js
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const PartitionAwarenessTestUtils = require('./PartitionAwarenessTestUtils');
+const IgniteClient = require('apache-ignite-client');
+const ObjectType = IgniteClient.ObjectType;
+const IgniteClientConfiguration = IgniteClient.IgniteClientConfiguration;
+
+const CACHE_NAME = '__test_cache';
+const SERVER_NUM = 3;
+
+describe('partition awareness with checks of connection to cluster test suite >', () => {
+ beforeEach((done) => {
+ Promise.resolve().
+ then(async () => {
+ await TestingHelper.initClusterOnly(SERVER_NUM, true);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ }, TestingHelper.TIMEOUT);
+
+ afterEach((done) => {
+ Promise.resolve().
+ then(async () => {
+ await TestingHelper.cleanUp();
+ }).
+ then(done).
+ catch(_error => done());
+ }, TestingHelper.TIMEOUT);
+
+ it('client with partition awareness connecting to unknown servers', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const badEndpoints = ['127.0.0.1:10900', '127.0.0.1:10901'];
+ const realEndpoints = TestingHelper.getEndpoints(SERVER_NUM);
+
+ for (const ep of realEndpoints)
+ expect(badEndpoints).not.toContain(ep);
+
+ const client = TestingHelper.makeClient();
+ const cfg = new IgniteClientConfiguration(...badEndpoints).setConnectionOptions(false, null, true);
+
+ try {
+ await client.connect(cfg);
+ }
+ catch (error) {
+ expect(error.message).toContain('Connection failed');
+
+ return;
+ }
+
+ throw 'Connection should be rejected';
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('cache operation routed to new started node', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const newNodeId = SERVER_NUM + 1;
+ const endpoints = TestingHelper.getEndpoints(SERVER_NUM + 1);
+
+ const client = TestingHelper.makeClient();
+ const cfg = new IgniteClientConfiguration(...endpoints).setConnectionOptions(false, null, true);
+ await client.connect(cfg);
+
+ const cache = await PartitionAwarenessTestUtils.getOrCreateCache(
+ client,
+ ObjectType.PRIMITIVE_TYPE.INTEGER,
+ ObjectType.PRIMITIVE_TYPE.INTEGER,
+ CACHE_NAME);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(client, cache, 1, true);
+
+ // Starting new node
+ await TestingHelper.startTestServer(true, newNodeId);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(client, cache, 1, true);
+
+ let keys = 1000;
+ for (let i = 1; i < keys; ++i) {
+ await cache.put(i * 1433, i);
+ const serverId = await TestingHelper.getRequestGridIdx('Put');
+
+ // It means request got to the new node.
+ if (serverId == newNodeId)
+ return;
+ }
+
+ throw 'Not a single request out of ' + keys + ' got to the new node';
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+});
diff --git a/spec/partition_awareness/PartitionAwarenessFailover.spec.js b/spec/partition_awareness/PartitionAwarenessFailover.spec.js
new file mode 100644
index 0000000..923a84d
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessFailover.spec.js
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const PartitionAwarenessTestUtils = require('./PartitionAwarenessTestUtils');
+const IgniteClient = require('apache-ignite-client');
+const ObjectType = IgniteClient.ObjectType;
+
+const CACHE_NAME = '__test_cache';
+const SERVER_NUM = 3;
+
+describe('partition awareness multiple connections failover test suite >', () => {
+ let igniteClient = null;
+
+ beforeEach((done) => {
+ Promise.resolve().
+ then(async () => {
+ await TestingHelper.init(true, SERVER_NUM, true);
+ igniteClient = TestingHelper.igniteClient;
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ }, TestingHelper.TIMEOUT);
+
+ afterEach((done) => {
+ Promise.resolve().
+ then(async () => {
+ await TestingHelper.cleanUp();
+ }).
+ then(done).
+ catch(_error => done());
+ }, TestingHelper.TIMEOUT);
+
+ it('cache operation fails gracefully when all nodes are killed', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ let key = 1;
+
+ // Put/Get
+ await cache.put(key, key);
+ expect(await cache.get(key)).toEqual(key);
+
+ // Killing nodes
+ await TestingHelper.stopTestServers();
+
+ // Get
+ try {
+ await cache.put(key, key);
+ }
+ catch (error) {
+ expect(error.message).toMatch(/(.*Cluster is unavailable*.)|(.*client is not in an appropriate state.*)/);
+
+ return;
+ }
+
+ throw 'Operation fail is expected';
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('cache operation does not fail when single node is killed', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ let key = 1;
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, key, true);
+
+ // Put test value to find out the right node
+ await cache.put(key, key);
+ expect(await cache.get(key)).toEqual(key);
+
+ // Killing node for the key
+ const serverId = await TestingHelper.getRequestGridIdx('Put');
+ expect(serverId).not.toEqual(-1, 'Can not find node for a put request');
+
+ await TestingHelper.killNodeByIdAndWait(serverId);
+
+ await cache.put(key, key);
+ expect(await cache.get(key)).toEqual(key);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('cache operation does not fail when node is killed and recovered', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ let key = 1;
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, key, true);
+
+ // Put test value to find out the right node
+ await cache.put(key, key);
+ expect(await cache.get(key)).toEqual(key);
+
+ // Killing node for the key
+ const recoveredNodeId = await TestingHelper.getRequestGridIdx('Put');
+ expect(recoveredNodeId).not.toEqual(-1, 'Can not find node for a put request');
+
+ await TestingHelper.killNodeByIdAndWait(recoveredNodeId);
+ await TestingHelper.sleep(1000);
+ await TestingHelper.startTestServer(true, recoveredNodeId);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, key, true);
+
+ let keys = 1000;
+ for (let i = 1; i < keys; ++i) {
+ await cache.put(i * 1433, i);
+ const serverId = await TestingHelper.getRequestGridIdx('Put');
+
+ // It means request got to the new node.
+ if (serverId == recoveredNodeId)
+ return;
+ }
+
+ throw 'Not a single request out of ' + keys + ' got to the recovered node';
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ async function getCache(keyType, valueType, cacheName = CACHE_NAME, cacheCfg = null) {
+ return await PartitionAwarenessTestUtils.getOrCreateCache(igniteClient, keyType, valueType, cacheName, cacheCfg);
+ }
+});
diff --git a/spec/partition_awareness/PartitionAwarenessLocalPeek.spec.js b/spec/partition_awareness/PartitionAwarenessLocalPeek.spec.js
new file mode 100644
index 0000000..4157afd
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessLocalPeek.spec.js
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const IgniteClient = require('apache-ignite-client');
+const Errors = IgniteClient.Errors;
+const CacheConfiguration = IgniteClient.CacheConfiguration;
+const CacheKeyConfiguration = IgniteClient.CacheKeyConfiguration;
+const ObjectType = IgniteClient.ObjectType;
+const BinaryObject = IgniteClient.BinaryObject;
+const ComplexObjectType = IgniteClient.ComplexObjectType;
+
+const CACHE_NAME = '__test_cache';
+
+describe('partition awareness with local peek test suite >', () => {
+ let igniteClient = null;
+ const affinityKeyField = 'affKeyField';
+
+ beforeAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ // Pass "true" to turn on Partition Awareness even
+ // if APACHE_IGNITE_CLIENT_PARTITION_AWARENESS env var is not passed
+ await TestingHelper.init(true, 3);
+ igniteClient = TestingHelper.igniteClient;
+ await checkPartitionAwarenessActive(done);
+ await testSuiteCleanup(done);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ }, TestingHelper.TIMEOUT);
+
+ afterAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ await testSuiteCleanup(done);
+ await TestingHelper.cleanUp();
+ }).
+ then(done).
+ catch(error => done());
+ }, TestingHelper.TIMEOUT);
+
+ it('put keys of different primitive types and check local peek', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const val = "someVal";
+ const valType = ObjectType.PRIMITIVE_TYPE.STRING;
+
+ for (let keyType of Object.keys(TestingHelper.primitiveValues)) {
+ keyType = parseInt(keyType);
+ if (keyType == ObjectType.PRIMITIVE_TYPE.DECIMAL) {
+ // Decimal is not a recommended type to use as a key
+ continue;
+ }
+ const typeInfo1 = TestingHelper.primitiveValues[keyType];
+ for (let value1 of typeInfo1.values) {
+ await putAndCheckLocalPeek(keyType, valType, value1, val);
+ if (typeInfo1.typeOptional) {
+ await putAndCheckLocalPeek(null, valType, value1, val);
+ }
+ }
+ }
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('put binary object and check local peek', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const typeName = 'TestClass1';
+ const intValue = 256256256;
+ const stringValue = 'someStr';
+ const boolValue = true;
+ const doubleValue = 256.256;
+
+ const key = new BinaryObject(typeName);
+
+ key.setField('field_int', intValue, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ key.setField('field_string', stringValue);
+ key.setField('field_bool', boolValue);
+ key.setField('field_douible', doubleValue);
+
+ await putAndCheckLocalPeek(null, null, key, intValue);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('put binary object with affinity key and check local peek', (done) => {
+ Promise.resolve().
+ then(async () => {
+ // We use separate cache here
+ const cacheName = '__test_cache2';
+ const typeName = 'TestClass2';
+
+ const intValue = 256256256;
+ const stringValue = 'someStr';
+
+ const keyCfg = new CacheKeyConfiguration(typeName, affinityKeyField);
+ const cacheCfg = createCacheConfig(keyCfg);
+
+ const key = new BinaryObject(typeName);
+
+ key.setField(affinityKeyField, intValue, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ key.setField('field_string', stringValue);
+ key.setField('field_int', intValue, ObjectType.PRIMITIVE_TYPE.INTEGER);
+
+ await putAndCheckLocalPeek(null, null, key, intValue, cacheName, cacheCfg);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('put js object with affinity key and check local peek', (done) => {
+ Promise.resolve().
+ then(async () => {
+ // We use separate cache here
+ const cacheName = '__test_cache3';
+ const typeName = 'TestClass3';
+
+ const intValue = 16161616;
+ const stringValue = 'someStr';
+
+ const keyCfg = new CacheKeyConfiguration(typeName, affinityKeyField);
+ const cacheCfg = createCacheConfig(keyCfg);
+
+ const key = {};
+
+ key[affinityKeyField] = intValue;
+ key['field_string'] = stringValue;
+ key['field_int'] = intValue;
+
+ const keyType = new ComplexObjectType(key, typeName);
+
+ // With keyType hint
+ await putAndCheckLocalPeek(keyType, null, key, intValue, cacheName, cacheCfg);
+ // Without keyType hint
+ await putAndCheckLocalPeek(null, null, key, intValue, cacheName, cacheCfg);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ async function putAndCheckLocalPeek(keyType, valueType, key, value, cache_name = CACHE_NAME, cacheCfg = null) {
+ const cache = (await igniteClient.getOrCreateCache(cache_name, cacheCfg)).
+ setKeyType(keyType).
+ setValueType(valueType);
+
+ try {
+ await cache.put(key, value);
+ await checkLocalPeek(cache, key, value);
+ }
+ finally {
+ await cache.removeAll();
+ }
+ }
+
+ async function checkLocalPeek(cache, key, value) {
+ await TestingHelper.ensureStableTopology(igniteClient, cache, key);
+
+ const affHint = cache._createAffinityHint(key);
+ const bestSocket = await igniteClient._router._chooseConnection(affHint);
+
+ for (const socket of igniteClient._router._getAllConnections()) {
+ let localPeekVal = await cache._localPeek(socket, key);
+ if (socket == bestSocket) {
+ expect(localPeekVal).toBe(value, 'local peek did not return the expected value');
+ }
+ else {
+ expect(localPeekVal).toBe(null, 'local peek returned not null value');
+ }
+ }
+ }
+
+ function createCacheConfig(keyCfg = null) {
+ return new CacheConfiguration().
+ setWriteSynchronizationMode(CacheConfiguration.WRITE_SYNCHRONIZATION_MODE.FULL_SYNC).
+ setCacheMode(CacheConfiguration.CACHE_MODE.PARTITIONED).
+ setKeyConfigurations(keyCfg);
+ }
+
+ async function checkPartitionAwarenessActive(done) {
+ await TestingHelper.waitForConditionOrThrow(() => {
+ return igniteClient._router._partitionAwarenessActive;
+ }, 2000).
+ then(done).
+ catch(_e => done.fail("Partition Awareness hasn't been activated. Probably, the cluster doesn't support it"));
+ }
+
+ async function testSuiteCleanup(done) {
+ await TestingHelper.destroyCache(CACHE_NAME, done);
+ }
+});
diff --git a/spec/partition_awareness/PartitionAwarenessMultipleConnections.spec.js b/spec/partition_awareness/PartitionAwarenessMultipleConnections.spec.js
new file mode 100644
index 0000000..cfba699
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessMultipleConnections.spec.js
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const PartitionAwarenessTestUtils = require('./PartitionAwarenessTestUtils');
+const IgniteClient = require('apache-ignite-client');
+const ObjectType = IgniteClient.ObjectType;
+
+const CACHE_NAME = '__test_cache';
+const CUSTOM_AFFINITY_CACHE = 'custom-affinity';
+const PARTITIONED_0_CACHE = 'partitioned0';
+const PARTITIONED_1_CACHE = 'partitioned1';
+const PARTITIONED_3_CACHE = 'partitioned3';
+const REPLICATED_CACHE = 'replicated';
+const SERVER_NUM = 3;
+
+describe('partition awareness multiple connections test suite >', () => {
+ let igniteClient = null;
+
+ beforeAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ await TestingHelper.init(true, SERVER_NUM, true);
+ igniteClient = TestingHelper.igniteClient;
+ await testSuiteCleanup(done);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ }, TestingHelper.TIMEOUT);
+
+ afterAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ await testSuiteCleanup(done);
+ await TestingHelper.cleanUp();
+ }).
+ then(done).
+ catch(_error => done());
+ }, TestingHelper.TIMEOUT);
+
+ it('all cache operations with partition awareness and multiple connections', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getOrCreateCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ await PartitionAwarenessTestUtils.testAllCacheOperations(cache);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('all cache operations with partition awareness and bad affinity', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getOrCreateCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, CUSTOM_AFFINITY_CACHE);
+ await PartitionAwarenessTestUtils.testRandomNode(cache);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('put with partition awareness and unknown cache', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, '__unknown_cache_359f72tg');
+ let key = 42;
+ try {
+ await cache.put(key, key);
+ }
+ catch (error) {
+ expect(error.message).toContain('Cache does not exist');
+ return;
+ }
+ fail('Exception was expected');
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('get or create null cache with partition awareness', (done) => {
+ Promise.resolve().
+ then(async () => {
+ try {
+ await getOrCreateCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, null);
+ }
+ catch (error) {
+ expect(error.toString()).toContain('"name" argument should not be empty');
+ return;
+ }
+ fail('Exception was expected');
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('get or create null cache with partition awareness', (done) => {
+ Promise.resolve().
+ then(async () => {
+ try {
+ await getOrCreateCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, null);
+ }
+ catch (error) {
+ expect(error.toString()).toContain('"name" argument should not be empty');
+ return;
+ }
+ fail('Exception was expected');
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('all cache operations with partition awareness and partitioned cache with 0 backups', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, PARTITIONED_0_CACHE);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, 0, true);
+
+ await PartitionAwarenessTestUtils.testAllCacheOperationsOnTheSameKey(cache, 42);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('all cache operations with partition awareness and partitioned cache with 1 backups', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, PARTITIONED_1_CACHE);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, 0, true);
+
+ await PartitionAwarenessTestUtils.testAllCacheOperationsOnTheSameKey(cache, 100500);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('all cache operations with partition awareness and partitioned cache with 3 backups', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, PARTITIONED_3_CACHE);
+
+ // Update partition mapping
+ await TestingHelper.ensureStableTopology(igniteClient, cache, 0, true);
+
+ await PartitionAwarenessTestUtils.testAllCacheOperationsOnTheSameKey(cache, 1337);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ it('all cache operations with partition awareness and replicated cache', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER, REPLICATED_CACHE);
+ await PartitionAwarenessTestUtils.testAllCacheOperations(cache);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ async function getOrCreateCache(keyType, valueType, cacheName = CACHE_NAME, cacheCfg = null) {
+ return await PartitionAwarenessTestUtils.getOrCreateCache(igniteClient, keyType, valueType, cacheName, cacheCfg);
+ }
+
+ async function getCache(keyType, valueType, cacheName = CACHE_NAME, cacheCfg = null) {
+ return (await igniteClient.getCache(cacheName, cacheCfg)).
+ setKeyType(keyType).
+ setValueType(valueType);
+ }
+
+ async function clearCache(name) {
+ await (await igniteClient.getCache(name)).clear();
+ }
+
+ async function testSuiteCleanup(done) {
+ await clearCache(CUSTOM_AFFINITY_CACHE);
+ await clearCache(PARTITIONED_0_CACHE);
+ await clearCache(PARTITIONED_1_CACHE);
+ await clearCache(PARTITIONED_3_CACHE);
+ await clearCache(REPLICATED_CACHE);
+ await TestingHelper.destroyCache(CACHE_NAME, done);
+ }
+});
diff --git a/spec/partition_awareness/PartitionAwarenessSingleServer.spec.js b/spec/partition_awareness/PartitionAwarenessSingleServer.spec.js
new file mode 100644
index 0000000..7c72fca
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessSingleServer.spec.js
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const PartitionAwarenessTestUtils = require('./PartitionAwarenessTestUtils');
+const IgniteClient = require('apache-ignite-client');
+const ObjectType = IgniteClient.ObjectType;
+
+const CACHE_NAME = '__test_cache';
+const SERVER_NUM = 3;
+
+describe('partition awareness with single server test suite >', () => {
+ let igniteClient = null;
+
+ beforeAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ let endpoints = TestingHelper.getEndpoints(SERVER_NUM);
+ await TestingHelper.init(true, SERVER_NUM, true, [endpoints[0]]);
+ igniteClient = TestingHelper.igniteClient;
+ await testSuiteCleanup(done);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ }, TestingHelper.TIMEOUT);
+
+ afterAll((done) => {
+ Promise.resolve().
+ then(async () => {
+ await testSuiteCleanup(done);
+ await TestingHelper.cleanUp();
+ }).
+ then(done).
+ catch(_error => done());
+ }, TestingHelper.TIMEOUT);
+
+ it('all cache operations with partition aware client and single connection', (done) => {
+ Promise.resolve().
+ then(async () => {
+ const cache = await getCache(ObjectType.PRIMITIVE_TYPE.INTEGER, ObjectType.PRIMITIVE_TYPE.INTEGER);
+ await PartitionAwarenessTestUtils.testSameNode(cache);
+ }).
+ then(done).
+ catch(error => done.fail(error));
+ });
+
+ async function getCache(keyType, valueType, cacheName = CACHE_NAME, cacheCfg = null) {
+ return await PartitionAwarenessTestUtils.getOrCreateCache(igniteClient, keyType, valueType, cacheName, cacheCfg);
+ }
+
+ async function testSuiteCleanup(done) {
+ await TestingHelper.destroyCache(CACHE_NAME, done);
+ }
+});
diff --git a/spec/partition_awareness/PartitionAwarenessTestUtils.js b/spec/partition_awareness/PartitionAwarenessTestUtils.js
new file mode 100644
index 0000000..2e2ef1a
--- /dev/null
+++ b/spec/partition_awareness/PartitionAwarenessTestUtils.js
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+require('jasmine-expect');
+
+const TestingHelper = require('../TestingHelper');
+const IgniteClient = require('apache-ignite-client');
+const CacheConfiguration = IgniteClient.CacheConfiguration;
+
+// Helper class for testing partition awareness feature of apache-ignite-client library.
+class PartitionAwarenessTestUtils {
+ static createCacheConfig() {
+ return new CacheConfiguration().
+ setWriteSynchronizationMode(CacheConfiguration.WRITE_SYNCHRONIZATION_MODE.FULL_SYNC).
+ setCacheMode(CacheConfiguration.CACHE_MODE.PARTITIONED);
+ }
+
+ static async getOrCreateCache(igniteClient, keyType, valueType, cacheName = CACHE_NAME, cacheCfg = null) {
+ if (!cacheCfg)
+ cacheCfg = PartitionAwarenessTestUtils.createCacheConfig();
+
+ return (await igniteClient.getOrCreateCache(cacheName, cacheCfg)).
+ setKeyType(keyType).
+ setValueType(valueType);
+ }
+
+ static async testRandomNode(cache) {
+ const key = 42;
+
+ await cache.put(key, key);
+ const firstNodeId = await TestingHelper.getRequestGridIdx('Put');
+ expect(firstNodeId).not.toEqual(-1, 'Can not locate node for an operation.');
+
+ for (let i = 0; i < 20; ++i) {
+ await cache.put(key, key);
+ const anotherNodeId = await TestingHelper.getRequestGridIdx('Put');
+ expect(anotherNodeId).not.toEqual(-1, 'Can not locate node for an operation.');
+
+ if (firstNodeId == anotherNodeId)
+ return;
+ }
+
+ throw 'All requests go to the same server when random was expected';
+ }
+
+ static async testSameNode(cache) {
+ let key = 1337;
+
+ await cache.put(key, key);
+ const firstNodeId = await TestingHelper.getRequestGridIdx('Put');
+ expect(firstNodeId).not.toEqual(-1, 'Can not locate node for an operation.');
+
+ for (let i = 0; i < 20; ++i) {
+ key = key + 1337;
+ await cache.put(key, key);
+ const anotherNodeId = await TestingHelper.getRequestGridIdx('Put');
+ expect(anotherNodeId).not.toEqual(-1, 'Can not locate node for an operation.');
+
+ if (firstNodeId != anotherNodeId)
+ throw 'All requests expected to go to the same server';
+ }
+ }
+
+ static async testAllCacheOperations(cache) {
+ const key = 1;
+ const key2 = 2;
+
+ // Put/Get
+ await cache.put(key, key);
+ expect(await cache.get(key)).toEqual(key);
+
+ // Replace
+ let res = await cache.replace(key, key2);
+ expect(res).toBe(true);
+ expect(await cache.get(key)).toEqual(key2);
+
+ // ContainsKey
+ res = await cache.containsKey(key2);
+ expect(res).toBe(false);
+
+ await cache.put(key2, key2);
+ res = await cache.containsKey(key2);
+ expect(res).toBe(true);
+
+ // Clear
+ await cache.clearKey(key2);
+ expect(await cache.get(key2)).toBeNull;
+
+ // GetAndPut
+ await cache.put(key, key);
+ res = await cache.getAndPut(key, key2);
+ expect(res).toEqual(key);
+ expect(await cache.get(key)).toEqual(key2);
+
+ // GetAndPutIfAbsent
+ await cache.clearKey(key);
+ res = await cache.getAndPutIfAbsent(key, key);
+ let res2 = await cache.getAndPutIfAbsent(key, key2);
+ expect(res).toBeNull();
+ expect(res2).toEqual(key);
+ expect(await cache.get(key)).toEqual(key);
+
+ // PutIfAbsent
+ await cache.clearKey(key);
+ res = await cache.putIfAbsent(key, key);
+ res2 = await cache.putIfAbsent(key, key2);
+ expect(res).toBe(true);
+ expect(res2).toBe(false);
+ expect(await cache.get(key)).toEqual(key);
+
+ // GetAndRemove
+ await cache.put(key, key);
+ res = await cache.getAndRemove(key);
+ expect(res).toEqual(key);
+ expect(await cache.get(key)).toBeNull();
+
+ // GetAndReplace
+ await cache.put(key, key);
+ res = await cache.getAndReplace(key, key2);
+ expect(res).toEqual(key);
+ expect(await cache.get(key)).toEqual(key2);
+
+ // RemoveKey
+ await cache.put(key, key);
+ await cache.removeKey(key);
+ expect(await cache.get(key)).toBeNull();
+
+ // RemoveIfEquals
+ await cache.put(key, key);
+ res = await cache.removeIfEquals(key, key2);
+ res2 = await cache.removeIfEquals(key, key);
+ expect(res).toBe(false);
+ expect(res2).toBe(true);
+ expect(await cache.get(key)).toBeNull();
+
+ // Replace
+ await cache.put(key, key);
+ await cache.replace(key, key2);
+ expect(await cache.get(key)).toEqual(key2);
+
+ // ReplaceIfEquals
+ await cache.put(key, key);
+ res = await cache.replaceIfEquals(key, key2, key2);
+ res2 = await cache.replaceIfEquals(key, key, key2);
+ expect(res).toBe(false);
+ expect(res2).toBe(true);
+ expect(await cache.get(key)).toEqual(key2);
+ }
+
+ static async expectOnTheNode(expectedNodeId, req) {
+ const actualNodeId = await TestingHelper.getRequestGridIdx(req);
+ expect(actualNodeId).toEqual(expectedNodeId);
+ }
+
+ static async testAllCacheOperationsOnTheSameKey(cache, key) {
+ const value1 = 42;
+ const value2 = 100500;
+
+ // Put/Get
+ await cache.put(key, value1);
+ const expectedNodeId = await TestingHelper.getRequestGridIdx('Put');
+
+ expect(await cache.get(key)).toEqual(value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // Replace
+ let res = await cache.replace(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Replace');
+
+ expect(res).toBe(true);
+ expect(await cache.get(key)).toEqual(value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // Clear
+ await cache.clearKey(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ClearKey');
+ expect(await cache.get(key)).toBeNull;
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // ContainsKey
+ res = await cache.containsKey(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ContainsKey');
+ expect(res).toBe(false);
+
+ // GetAndPut
+ await cache.put(key, value1);
+ await TestingHelper.getRequestGridIdx('Put');
+
+ res = await cache.getAndPut(key, value2);
+ await TestingHelper.getRequestGridIdx('GetAndPut');
+
+ expect(res).toEqual(value1);
+ expect(await cache.get(key)).toEqual(value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // GetAndPutIfAbsent
+ await cache.clearKey(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ClearKey');
+
+ res = await cache.getAndPutIfAbsent(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'GetAndPutIfAbsent');
+
+ let res2 = await cache.getAndPutIfAbsent(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'GetAndPutIfAbsent');
+
+ expect(res).toBeNull();
+ expect(res2).toEqual(value1);
+ expect(await cache.get(key)).toEqual(value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // PutIfAbsent
+ await cache.clearKey(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ClearKey');
+
+ res = await cache.putIfAbsent(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'PutIfAbsent');
+
+ res2 = await cache.putIfAbsent(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'PutIfAbsent');
+
+ expect(res).toBe(true);
+ expect(res2).toBe(false);
+ expect(await cache.get(key)).toEqual(value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // GetAndRemove
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ res = await cache.getAndRemove(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'GetAndRemove');
+
+ expect(res).toEqual(value1);
+ expect(await cache.get(key)).toBeNull();
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // GetAndReplace
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ res = await cache.getAndReplace(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'GetAndReplace');
+
+ expect(res).toEqual(value1);
+ expect(await cache.get(key)).toEqual(value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // RemoveKey
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ await cache.removeKey(key);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'RemoveKey');
+
+ expect(await cache.get(key)).toBeNull();
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // RemoveIfEquals
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ res = await cache.removeIfEquals(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'RemoveIfEquals');
+
+ res2 = await cache.removeIfEquals(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'RemoveIfEquals');
+
+ expect(res).toBe(false);
+ expect(res2).toBe(true);
+ expect(await cache.get(key)).toBeNull();
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // Replace
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ await cache.replace(key, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Replace');
+
+ expect(await cache.get(key)).toEqual(value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+
+ // ReplaceIfEquals
+ await cache.put(key, value1);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Put');
+
+ res = await cache.replaceIfEquals(key, value2, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ReplaceIfEquals');
+
+ res2 = await cache.replaceIfEquals(key, value1, value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'ReplaceIfEquals');
+
+ expect(res).toBe(false);
+ expect(res2).toBe(true);
+ expect(await cache.get(key)).toEqual(value2);
+ await PartitionAwarenessTestUtils.expectOnTheNode(expectedNodeId, 'Get');
+ }
+}
+
+module.exports = PartitionAwarenessTestUtils;