IGNITE-9258: Node.js now can handle more than one client in the same app
This closes #4554
diff --git a/lib/BinaryObject.js b/lib/BinaryObject.js
index 2cc6be6..fb139da 100644
--- a/lib/BinaryObject.js
+++ b/lib/BinaryObject.js
@@ -25,7 +25,6 @@
const BinaryType = require('./internal/BinaryType');
const BinaryField = require('./internal/BinaryType').BinaryField;
const BinaryTypeBuilder = require('./internal/BinaryType').BinaryTypeBuilder;
-const BinaryWriter = require('./internal/BinaryWriter');
const ArgumentChecker = require('./internal/ArgumentChecker');
const Logger = require('./internal/Logger');
@@ -283,28 +282,28 @@
/**
* @ignore
*/
- static async _fromBuffer(buffer) {
+ static async _fromBuffer(communicator, buffer) {
const result = new BinaryObject(new ComplexObjectType({})._typeName);
result._buffer = buffer;
result._startPos = buffer.position;
- await result._read();
+ await result._read(communicator);
return result;
}
/**
* @ignore
*/
- async _write(buffer) {
+ async _write(communicator, buffer) {
if (this._buffer && !this._modified) {
buffer.writeBuffer(this._buffer.buffer, this._startPos, this._startPos + this._length);
}
else {
- await this._typeBuilder.finalize();
+ await this._typeBuilder.finalize(communicator);
this._startPos = buffer.position;
buffer.position = this._startPos + HEADER_LENGTH;
// write fields
for (let field of this._fields.values()) {
- await field._writeValue(buffer, this._typeBuilder.getField(field.id).typeCode);
+ await field._writeValue(communicator, buffer, this._typeBuilder.getField(field.id).typeCode);
}
this._schemaOffset = buffer.position - this._startPos;
// write schema
@@ -351,8 +350,8 @@
/**
* @ignore
*/
- async _read() {
- await this._readHeader();
+ async _read(communicator) {
+ await this._readHeader(communicator);
this._buffer.position = this._startPos + this._schemaOffset;
const fieldOffsets = new Array();
const fieldIds = this._typeBuilder._schema.fieldIds;
@@ -382,7 +381,7 @@
offset = fieldOffsets[i][1];
nextOffset = i + 1 < fieldOffsets.length ? fieldOffsets[i + 1][1] : this._schemaOffset;
field = BinaryObjectField._fromBuffer(
- this._buffer, this._startPos + offset, nextOffset - offset, fieldId);
+ communicator,this._buffer, this._startPos + offset, nextOffset - offset, fieldId);
this._fields.set(field.id, field);
}
this._buffer.position = this._startPos + this._length;
@@ -391,7 +390,7 @@
/**
* @ignore
*/
- async _readHeader() {
+ async _readHeader(communicator) {
// type code
this._buffer.readByte();
// version
@@ -419,7 +418,7 @@
BinaryUtils.TYPE_CODE.SHORT :
BinaryUtils.TYPE_CODE.INTEGER;
- if (BinaryObject._isFlagSet(FLAG_HAS_RAW_DATA)) {
+ if (BinaryObject._isFlagSet(flags, FLAG_HAS_RAW_DATA)) {
throw Errors.IgniteClientError.serializationError(
false, 'complex objects with raw data are not supported');
}
@@ -427,7 +426,7 @@
throw Errors.IgniteClientError.serializationError(
false, 'schema is absent for object with compact footer');
}
- this._typeBuilder = await BinaryTypeBuilder.fromTypeId(typeId, schemaId, hasSchema);
+ this._typeBuilder = await BinaryTypeBuilder.fromTypeId(communicator, typeId, schemaId, hasSchema);
}
}
@@ -460,31 +459,35 @@
async getValue(type = null) {
if (this._value === undefined || this._buffer && this._type !== type) {
this._buffer.position = this._offset;
- const BinaryReader = require('./internal/BinaryReader');
- this._value = await BinaryReader.readObject(this._buffer, type);
+ this._value = await this._communicator.readObject(this._buffer, type);
this._type = type;
}
return this._value;
}
- static _fromBuffer(buffer, offset, length, id) {
+ static _fromBuffer(communicator, buffer, offset, length, id) {
const result = new BinaryObjectField(null);
result._id = id;
+ result._communicator = communicator;
result._buffer = buffer;
result._offset = offset;
result._length = length;
return result;
}
- async _writeValue(buffer, expectedTypeCode) {
+ async _writeValue(communicator, buffer, expectedTypeCode) {
const offset = buffer.position;
- if (this._buffer) {
+ if (this._buffer && this._communicator === communicator) {
buffer.writeBuffer(this._buffer.buffer, this._offset, this._offset + this._length);
}
else {
+ if (this._value === undefined) {
+ await this.getValue();
+ }
BinaryUtils.checkCompatibility(this._value, expectedTypeCode);
- await BinaryWriter.writeObject(buffer, this._value, this._type);
+ await communicator.writeObject(buffer, this._value, this._type);
}
+ this._communicator = communicator;
this._buffer = buffer;
this._length = buffer.position - offset;
this._offset = offset;
diff --git a/lib/CacheClient.js b/lib/CacheClient.js
index b76471f..f59910b 100644
--- a/lib/CacheClient.js
+++ b/lib/CacheClient.js
@@ -18,8 +18,6 @@
'use strict';
const BinaryUtils = require('./internal/BinaryUtils');
-const BinaryReader = require('./internal/BinaryReader');
-const BinaryWriter = require('./internal/BinaryWriter');
const ArgumentChecker = require('./internal/ArgumentChecker');
const SqlQuery = require('./Query').SqlQuery;
const SqlFieldsQuery = require('./Query').SqlFieldsQuery;
@@ -156,7 +154,7 @@
ArgumentChecker.notEmpty(keys, 'keys');
ArgumentChecker.hasType(keys, 'keys', false, Array);
let result = null;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_GET_ALL,
async (payload) => {
this._writeCacheInfo(payload);
@@ -167,8 +165,8 @@
result = new Array(resultCount);
for (let i = 0; i < resultCount; i++) {
result[i] = new CacheEntry(
- await BinaryReader.readObject(payload, this._getKeyType()),
- await BinaryReader.readObject(payload, this._getValueType()));
+ await this._communicator.readObject(payload, this._getKeyType()),
+ await this._communicator.readObject(payload, this._getValueType()));
}
});
return result;
@@ -206,7 +204,7 @@
async putAll(entries) {
ArgumentChecker.notEmpty(entries, 'entries');
ArgumentChecker.hasType(entries, 'entries', true, CacheEntry);
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_PUT_ALL,
async (payload) => {
this._writeCacheInfo(payload);
@@ -374,12 +372,12 @@
ArgumentChecker.notNull(value, 'value');
ArgumentChecker.notNull(newValue, 'newValue');
let result;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_REPLACE_IF_EQUALS,
async (payload) => {
this._writeCacheInfo(payload);
await this._writeKeyValue(payload, key, value);
- await BinaryWriter.writeObject(payload, newValue, this._getValueType());
+ await this._communicator.writeObject(payload, newValue, this._getValueType());
},
async (payload) => {
result = payload.readBoolean();
@@ -395,7 +393,7 @@
* @throws {IgniteClientError} if error.
*/
async clear() {
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_CLEAR,
async (payload) => {
this._writeCacheInfo(payload);
@@ -481,7 +479,7 @@
* @throws {IgniteClientError} if error.
*/
async removeAll() {
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_REMOVE_ALL,
async (payload) => {
this._writeCacheInfo(payload);
@@ -502,7 +500,7 @@
async getSize(...peekModes) {
ArgumentChecker.hasValueFrom(peekModes, 'peekModes', true, CacheClient.PEEK_MODE);
let result;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_GET_SIZE,
async (payload) => {
this._writeCacheInfo(payload);
@@ -537,14 +535,14 @@
ArgumentChecker.hasType(query, 'query', false, SqlQuery, SqlFieldsQuery, ScanQuery);
let value = null;
- await this._socket.send(
+ await this._communicator.send(
query._operation,
async (payload) => {
this._writeCacheInfo(payload);
- await query._write(payload);
+ await query._write(this._communicator, payload);
},
async (payload) => {
- value = await query._getCursor(this._socket, payload, this._keyType, this._valueType);
+ value = await query._getCursor(this._communicator, payload, this._keyType, this._valueType);
});
return value;
}
@@ -554,13 +552,13 @@
/**
* @ignore
*/
- constructor(name, config, socket) {
+ constructor(name, config, communicator) {
this._name = name;
this._cacheId = CacheClient._calculateId(this._name);
this._config = config;
this._keyType = null;
this._valueType = null;
- this._socket = socket;
+ this._communicator = communicator;
}
/**
@@ -582,8 +580,8 @@
* @ignore
*/
async _writeKeyValue(payload, key, value) {
- await BinaryWriter.writeObject(payload, key, this._getKeyType());
- await BinaryWriter.writeObject(payload, value, this._getValueType());
+ await this._communicator.writeObject(payload, key, this._getKeyType());
+ await this._communicator.writeObject(payload, value, this._getValueType());
}
/**
@@ -592,7 +590,7 @@
async _writeKeys(payload, keys) {
payload.writeInteger(keys.length);
for (let key of keys) {
- await BinaryWriter.writeObject(payload, key, this._getKeyType());
+ await this._communicator.writeObject(payload, key, this._getKeyType());
}
}
@@ -616,7 +614,7 @@
async _writeKeyValueOp(operation, key, value, payloadReader = null) {
ArgumentChecker.notNull(key, 'key');
ArgumentChecker.notNull(value, 'value');
- await this._socket.send(
+ await this._communicator.send(
operation,
async (payload) => {
this._writeCacheInfo(payload);
@@ -633,7 +631,7 @@
await this._writeKeyValueOp(
operation, key, value,
async (payload) => {
- result = await BinaryReader.readObject(payload, this._getValueType());
+ result = await this._communicator.readObject(payload, this._getValueType());
});
return result;
}
@@ -656,11 +654,11 @@
*/
async _writeKeyOp(operation, key, payloadReader = null) {
ArgumentChecker.notNull(key, 'key');
- await this._socket.send(
+ await this._communicator.send(
operation,
async (payload) => {
this._writeCacheInfo(payload);
- await BinaryWriter.writeObject(payload, key, this._getKeyType());
+ await this._communicator.writeObject(payload, key, this._getKeyType());
},
payloadReader);
}
@@ -673,7 +671,7 @@
await this._writeKeyOp(
operation, key,
async (payload) => {
- value = await BinaryReader.readObject(payload, this._getValueType());
+ value = await this._communicator.readObject(payload, this._getValueType());
});
return value;
}
@@ -697,7 +695,7 @@
async _writeKeysOp(operation, keys, payloadReader = null) {
ArgumentChecker.notEmpty(keys, 'keys');
ArgumentChecker.hasType(keys, 'keys', false, Array);
- await this._socket.send(
+ await this._communicator.send(
operation,
async (payload) => {
this._writeCacheInfo(payload);
diff --git a/lib/CacheConfiguration.js b/lib/CacheConfiguration.js
index a4e4574..ccf20b9 100644
--- a/lib/CacheConfiguration.js
+++ b/lib/CacheConfiguration.js
@@ -20,8 +20,7 @@
const ComplexObjectType = require('./ObjectType').ComplexObjectType;
const ObjectArrayType = require('./ObjectType').ObjectArrayType;
const BinaryUtils = require('./internal/BinaryUtils');
-const BinaryReader = require('./internal/BinaryReader');
-const BinaryWriter = require('./internal/BinaryWriter');
+const BinaryCommunicator = require('./internal/BinaryCommunicator');
const ArgumentChecker = require('./internal/ArgumentChecker');
const Errors = require('./Errors');
@@ -94,17 +93,17 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._typeName);
- await BinaryWriter.writeString(buffer, this._affinityKeyFieldName);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._typeName);
+ BinaryCommunicator.writeString(buffer, this._affinityKeyFieldName);
}
/**
* @ignore
*/
- async _read(buffer) {
- this._typeName = await BinaryReader.readObject(buffer);
- this._affinityKeyFieldName = await BinaryReader.readObject(buffer);
+ async _read(communicator, buffer) {
+ this._typeName = BinaryCommunicator.readString(buffer);
+ this._affinityKeyFieldName = BinaryCommunicator.readString(buffer);
}
}
@@ -306,27 +305,27 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._keyTypeName);
- await BinaryWriter.writeString(buffer, this._valueTypeName);
- await BinaryWriter.writeString(buffer, this._tableName);
- await BinaryWriter.writeString(buffer, this._keyFieldName);
- await BinaryWriter.writeString(buffer, this._valueFieldName);
- await this._writeSubEntities(buffer, this._fields);
- await this._writeAliases(buffer);
- await this._writeSubEntities(buffer, this._indexes);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._keyTypeName);
+ BinaryCommunicator.writeString(buffer, this._valueTypeName);
+ BinaryCommunicator.writeString(buffer, this._tableName);
+ BinaryCommunicator.writeString(buffer, this._keyFieldName);
+ BinaryCommunicator.writeString(buffer, this._valueFieldName);
+ await this._writeSubEntities(communicator, buffer, this._fields);
+ await this._writeAliases(communicator, buffer);
+ await this._writeSubEntities(communicator, buffer, this._indexes);
}
/**
* @ignore
*/
- async _writeAliases(buffer) {
+ async _writeAliases(communicator, buffer) {
const length = this._aliases ? this._aliases.size : 0;
buffer.writeInteger(length);
if (length > 0) {
for (let [key, value] of this._aliases.entries()) {
- await BinaryWriter.writeString(buffer, key);
- await BinaryWriter.writeString(buffer, value);
+ BinaryCommunicator.writeString(buffer, key);
+ BinaryCommunicator.writeString(buffer, value);
}
}
}
@@ -334,12 +333,12 @@
/**
* @ignore
*/
- async _writeSubEntities(buffer, entities) {
+ async _writeSubEntities(communicator, buffer, entities) {
const length = entities ? entities.length : 0;
buffer.writeInteger(length);
if (length > 0) {
for (let entity of entities) {
- await entity._write(buffer);
+ await entity._write(communicator, buffer);
}
}
}
@@ -347,28 +346,28 @@
/**
* @ignore
*/
- async _read(buffer) {
- this._keyTypeName = await BinaryReader.readObject(buffer);
- this._valueTypeName = await BinaryReader.readObject(buffer);
- this._tableName = await BinaryReader.readObject(buffer);
- this._keyFieldName = await BinaryReader.readObject(buffer);
- this._valueFieldName = await BinaryReader.readObject(buffer);
- this._fields = await this._readSubEntities(buffer, QueryField);
- await this._readAliases(buffer);
- this._indexes = await this._readSubEntities(buffer, QueryIndex);
+ async _read(communicator, buffer) {
+ this._keyTypeName = await communicator.readObject(buffer);
+ this._valueTypeName = await communicator.readObject(buffer);
+ this._tableName = await communicator.readObject(buffer);
+ this._keyFieldName = await communicator.readObject(buffer);
+ this._valueFieldName = await communicator.readObject(buffer);
+ this._fields = await this._readSubEntities(communicator, buffer, QueryField);
+ await this._readAliases(communicator, buffer);
+ this._indexes = await this._readSubEntities(communicator, buffer, QueryIndex);
}
/**
* @ignore
*/
- async _readSubEntities(buffer, objectConstructor) {
+ async _readSubEntities(communicator, buffer, objectConstructor) {
const length = buffer.readInteger(buffer);
const result = new Array(length);
if (length > 0) {
let res;
for (let i = 0; i < length; i++) {
res = new objectConstructor();
- await res._read(buffer);
+ await res._read(communicator, buffer);
result[i] = res;
}
}
@@ -378,13 +377,13 @@
/**
* @ignore
*/
- async _readAliases(buffer) {
+ async _readAliases(communicator, buffer) {
const length = buffer.readInteger(buffer);
this._aliases = new Map();
if (length > 0) {
let res;
for (let i = 0; i < length; i++) {
- this._aliases.set(await BinaryReader.readObject(buffer), await BinaryReader.readObject(buffer));
+ this._aliases.set(await communicator.readObject(buffer), await communicator.readObject(buffer));
}
}
}
@@ -416,6 +415,7 @@
this._precision = -1;
this._scale = -1;
this._valueType = null;
+ this._communicator = null;
this._buffer = null;
this._index = null;
}
@@ -538,7 +538,7 @@
if (this._buffer) {
const position = this._buffer.position;
this._buffer.position = this._index;
- const result = await BinaryReader.readObject(this._buffer, valueType);
+ const result = await this._communicator.readObject(this._buffer, valueType);
this._buffer.position = position;
return result;
}
@@ -600,12 +600,12 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._name);
- await BinaryWriter.writeString(buffer, this._typeName);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._name);
+ BinaryCommunicator.writeString(buffer, this._typeName);
buffer.writeBoolean(this._isKeyField);
buffer.writeBoolean(this._isNotNull);
- await BinaryWriter.writeObject(buffer, this._defaultValue ? this._defaultValue : null, this._valueType);
+ await communicator.writeObject(buffer, this._defaultValue ? this._defaultValue : null, this._valueType);
buffer.writeInteger(this._precision);
buffer.writeInteger(this._scale);
}
@@ -613,15 +613,16 @@
/**
* @ignore
*/
- async _read(buffer) {
- this._name = await BinaryReader.readObject(buffer);
- this._typeName = await BinaryReader.readObject(buffer);
+ async _read(communicator, buffer) {
+ this._name = await communicator.readObject(buffer);
+ this._typeName = await communicator.readObject(buffer);
this._isKeyField = buffer.readBoolean();
this._isNotNull = buffer.readBoolean();
this._defaultValue = undefined;
+ this._communicator = communicator;
this._buffer = buffer;
this._index = buffer.position;
- await BinaryReader.readObject(buffer);
+ await communicator.readObject(buffer);
this._precision = buffer.readInteger();
this._scale = buffer.readInteger();
}
@@ -732,7 +733,7 @@
*
* @return {number}
*/
- getInlineSize() {
+ getInlineSize() {
return this._inlineSize;
}
@@ -762,8 +763,8 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._name);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._name);
buffer.writeByte(this._type);
buffer.writeInteger(this._inlineSize);
// write fields
@@ -771,7 +772,7 @@
buffer.writeInteger(length);
if (length > 0) {
for (let [key, value] of this._fields.entries()) {
- await BinaryWriter.writeString(buffer, key);
+ BinaryCommunicator.writeString(buffer, key);
buffer.writeBoolean(value);
}
}
@@ -780,8 +781,8 @@
/**
* @ignore
*/
- async _read(buffer) {
- this._name = await BinaryReader.readObject(buffer);
+ async _read(communicator, buffer) {
+ this._name = await communicator.readObject(buffer);
this._type = buffer.readByte();
this._inlineSize = buffer.readInteger();
// read fields
@@ -790,7 +791,7 @@
if (length > 0) {
let res;
for (let i = 0; i < length; i++) {
- this._fields.set(await BinaryReader.readObject(buffer), buffer.readBoolean());
+ this._fields.set(await communicator.readObject(buffer), buffer.readBoolean());
}
}
}
@@ -1610,7 +1611,7 @@
/**
* @ignore
*/
- async _write(buffer, name) {
+ async _write(communicator, buffer, name) {
this._properties.set(PROP_NAME, name);
const startPos = buffer.position;
@@ -1619,7 +1620,7 @@
BinaryUtils.getSize(BinaryUtils.TYPE_CODE.SHORT);
for (let [propertyCode, property] of this._properties) {
- await this._writeProperty(buffer, propertyCode, property);
+ await this._writeProperty(communicator, buffer, propertyCode, property);
}
const length = buffer.position - startPos;
@@ -1632,23 +1633,23 @@
/**
* @ignore
*/
- async _writeProperty(buffer, propertyCode, property) {
+ async _writeProperty(communicator, buffer, propertyCode, property) {
buffer.writeShort(propertyCode);
const propertyType = PROP_TYPES[propertyCode];
switch (BinaryUtils.getTypeCode(propertyType)) {
case BinaryUtils.TYPE_CODE.INTEGER:
case BinaryUtils.TYPE_CODE.LONG:
case BinaryUtils.TYPE_CODE.BOOLEAN:
- await BinaryWriter.writeObject(buffer, property, propertyType, false);
+ await communicator.writeObject(buffer, property, propertyType, false);
return;
case BinaryUtils.TYPE_CODE.STRING:
- await BinaryWriter.writeObject(buffer, property, propertyType);
+ await communicator.writeObject(buffer, property, propertyType);
return;
case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
const length = property ? property.length : 0;
buffer.writeInteger(length);
for (let prop of property) {
- await prop._write(buffer);
+ await prop._write(communicator, buffer);
}
return;
default:
@@ -1659,54 +1660,54 @@
/**
* @ignore
*/
- async _read(buffer) {
+ async _read(communicator, buffer) {
// length
buffer.readInteger();
- await this._readProperty(buffer, PROP_ATOMICITY_MODE);
- await this._readProperty(buffer, PROP_BACKUPS);
- await this._readProperty(buffer, PROP_CACHE_MODE);
- await this._readProperty(buffer, PROP_COPY_ON_READ);
- await this._readProperty(buffer, PROP_DATA_REGION_NAME);
- await this._readProperty(buffer, PROP_EAGER_TTL);
- await this._readProperty(buffer, PROP_STATISTICS_ENABLED);
- await this._readProperty(buffer, PROP_GROUP_NAME);
- await this._readProperty(buffer, PROP_DEFAULT_LOCK_TIMEOUT);
- await this._readProperty(buffer, PROP_MAX_CONCURRENT_ASYNC_OPS);
- await this._readProperty(buffer, PROP_MAX_QUERY_ITERATORS);
- await this._readProperty(buffer, PROP_NAME);
- await this._readProperty(buffer, PROP_IS_ONHEAP_CACHE_ENABLED);
- await this._readProperty(buffer, PROP_PARTITION_LOSS_POLICY);
- await this._readProperty(buffer, PROP_QUERY_DETAIL_METRICS_SIZE);
- await this._readProperty(buffer, PROP_QUERY_PARALLELISM);
- await this._readProperty(buffer, PROP_READ_FROM_BACKUP);
- await this._readProperty(buffer, PROP_REBALANCE_BATCH_SIZE);
- await this._readProperty(buffer, PROP_REBALANCE_BATCHES_PREFETCH_COUNT);
- await this._readProperty(buffer, PROP_REBALANCE_DELAY);
- await this._readProperty(buffer, PROP_REBALANCE_MODE);
- await this._readProperty(buffer, PROP_REBALANCE_ORDER);
- await this._readProperty(buffer, PROP_REBALANCE_THROTTLE);
- await this._readProperty(buffer, PROP_REBALANCE_TIMEOUT);
- await this._readProperty(buffer, PROP_SQL_ESCAPE_ALL);
- await this._readProperty(buffer, PROP_SQL_INDEX_INLINE_MAX_SIZE);
- await this._readProperty(buffer, PROP_SQL_SCHEMA);
- await this._readProperty(buffer, PROP_WRITE_SYNCHRONIZATION_MODE);
- await this._readProperty(buffer, PROP_CACHE_KEY_CONFIGURATION);
- await this._readProperty(buffer, PROP_QUERY_ENTITY);
+ await this._readProperty(communicator, buffer, PROP_ATOMICITY_MODE);
+ await this._readProperty(communicator, buffer, PROP_BACKUPS);
+ await this._readProperty(communicator, buffer, PROP_CACHE_MODE);
+ await this._readProperty(communicator, buffer, PROP_COPY_ON_READ);
+ await this._readProperty(communicator, buffer, PROP_DATA_REGION_NAME);
+ await this._readProperty(communicator, buffer, PROP_EAGER_TTL);
+ await this._readProperty(communicator, buffer, PROP_STATISTICS_ENABLED);
+ await this._readProperty(communicator, buffer, PROP_GROUP_NAME);
+ await this._readProperty(communicator, buffer, PROP_DEFAULT_LOCK_TIMEOUT);
+ await this._readProperty(communicator, buffer, PROP_MAX_CONCURRENT_ASYNC_OPS);
+ await this._readProperty(communicator, buffer, PROP_MAX_QUERY_ITERATORS);
+ await this._readProperty(communicator, buffer, PROP_NAME);
+ await this._readProperty(communicator, buffer, PROP_IS_ONHEAP_CACHE_ENABLED);
+ await this._readProperty(communicator, buffer, PROP_PARTITION_LOSS_POLICY);
+ await this._readProperty(communicator, buffer, PROP_QUERY_DETAIL_METRICS_SIZE);
+ await this._readProperty(communicator, buffer, PROP_QUERY_PARALLELISM);
+ await this._readProperty(communicator, buffer, PROP_READ_FROM_BACKUP);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_BATCH_SIZE);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_BATCHES_PREFETCH_COUNT);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_DELAY);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_MODE);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_ORDER);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_THROTTLE);
+ await this._readProperty(communicator, buffer, PROP_REBALANCE_TIMEOUT);
+ await this._readProperty(communicator, buffer, PROP_SQL_ESCAPE_ALL);
+ await this._readProperty(communicator, buffer, PROP_SQL_INDEX_INLINE_MAX_SIZE);
+ await this._readProperty(communicator, buffer, PROP_SQL_SCHEMA);
+ await this._readProperty(communicator, buffer, PROP_WRITE_SYNCHRONIZATION_MODE);
+ await this._readProperty(communicator, buffer, PROP_CACHE_KEY_CONFIGURATION);
+ await this._readProperty(communicator, buffer, PROP_QUERY_ENTITY);
}
/**
* @ignore
*/
- async _readProperty(buffer, propertyCode) {
+ async _readProperty(communicator, buffer, propertyCode) {
const propertyType = PROP_TYPES[propertyCode];
switch (BinaryUtils.getTypeCode(propertyType)) {
case BinaryUtils.TYPE_CODE.INTEGER:
case BinaryUtils.TYPE_CODE.LONG:
case BinaryUtils.TYPE_CODE.BOOLEAN:
- this._properties.set(propertyCode, await BinaryReader._readTypedObject(buffer, propertyType));
+ this._properties.set(propertyCode, await communicator._readTypedObject(buffer, propertyType));
return;
case BinaryUtils.TYPE_CODE.STRING:
- this._properties.set(propertyCode, await BinaryReader.readObject(buffer, propertyType));
+ this._properties.set(propertyCode, await communicator.readObject(buffer, propertyType));
return;
case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
const length = buffer.readInteger();
@@ -1714,7 +1715,7 @@
const properties = new Array(length);
for (let i = 0; i < length; i++) {
const property = new propertyType._elementType._objectConstructor();
- await property._read(buffer);
+ await property._read(communicator, buffer);
properties[i] = property;
}
this._properties.set(propertyCode, properties);
diff --git a/lib/Cursor.js b/lib/Cursor.js
index 85176e3..39eea21 100644
--- a/lib/Cursor.js
+++ b/lib/Cursor.js
@@ -20,8 +20,6 @@
const Errors = require('./Errors');
const BinaryUtils = require('./internal/BinaryUtils');
const BinaryObject = require('./BinaryObject');
-const BinaryReader = require('./internal/BinaryReader');
-const BinaryWriter = require('./internal/BinaryWriter');
/**
* Class representing a cursor to obtain results of SQL and Scan query operations.
@@ -101,7 +99,7 @@
async close() {
// Close cursor only if the server has more pages: the server closes cursor automatically on last page
if (this._id && this._hasNext) {
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.RESOURCE_CLOSE,
async (payload) => {
await this._write(payload);
@@ -114,8 +112,8 @@
/**
* @ignore
*/
- constructor(socket, operation, buffer, keyType = null, valueType = null) {
- this._socket = socket;
+ constructor(communicator, operation, buffer, keyType = null, valueType = null) {
+ this._communicator = communicator;
this._operation = operation;
this._buffer = buffer;
this._keyType = keyType;
@@ -133,7 +131,7 @@
this._hasNext = false;
this._values = null;
this._buffer = null;
- await this._socket.send(
+ await this._communicator.send(
this._operation,
async (payload) => {
await this._write(payload);
@@ -175,8 +173,8 @@
async _readRow(buffer) {
const CacheEntry = require('./CacheClient').CacheEntry;
return new CacheEntry(
- await BinaryReader.readObject(buffer, this._keyType),
- await BinaryReader.readObject(buffer, this._valueType));
+ await this._communicator.readObject(buffer, this._keyType),
+ await this._communicator.readObject(buffer, this._valueType));
}
/**
@@ -273,8 +271,8 @@
/**
* @ignore
*/
- constructor(socket, buffer) {
- super(socket, BinaryUtils.OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer);
+ constructor(communicator, buffer) {
+ super(communicator, BinaryUtils.OPERATION.QUERY_SQL_FIELDS_CURSOR_GET_PAGE, buffer);
this._fieldNames = [];
}
@@ -286,7 +284,7 @@
this._fieldCount = buffer.readInteger();
if (includeFieldNames) {
for (let i = 0; i < this._fieldCount; i++) {
- this._fieldNames[i] = await BinaryReader.readObject(buffer);
+ this._fieldNames[i] = await this._communicator.readObject(buffer);
}
}
}
@@ -299,7 +297,7 @@
let fieldType;
for (let i = 0; i < this._fieldCount; i++) {
fieldType = this._fieldTypes && i < this._fieldTypes.length ? this._fieldTypes[i] : null;
- values[i] = await BinaryReader.readObject(buffer);
+ values[i] = await this._communicator.readObject(buffer, fieldType);
}
return values;
}
diff --git a/lib/EnumItem.js b/lib/EnumItem.js
index e4fb165..1d1725e 100644
--- a/lib/EnumItem.js
+++ b/lib/EnumItem.js
@@ -156,14 +156,14 @@
/**
* @ignore
*/
- async _write(buffer) {
+ async _write(communicator, buffer) {
buffer.writeInteger(this._typeId);
if (this._ordinal !== null) {
buffer.writeInteger(this._ordinal);
return;
}
else if (this._name !== null || this._value !== null) {
- const type = await this._getType(this._typeId);
+ const type = await this._getType(communicator, this._typeId);
if (type._isEnum && type._enumValues) {
for (let i = 0; i < type._enumValues.length; i++) {
if (this._name === type._enumValues[i][0] ||
@@ -181,10 +181,10 @@
/**
* @ignore
*/
- async _read(buffer) {
+ async _read(communicator, buffer) {
this._typeId = buffer.readInteger();
this._ordinal = buffer.readInteger();
- const type = await this._getType(this._typeId);
+ const type = await this._getType(communicator, this._typeId);
if (!type._isEnum || !type._enumValues || type._enumValues.length <= this._ordinal) {
throw new Errors.IgniteClientError('EnumItem can not be deserialized: type mismatch');
}
@@ -195,9 +195,8 @@
/**
* @ignore
*/
- async _getType(typeId) {
- const BinaryTypeStorage = require('./internal/BinaryTypeStorage');
- return await BinaryTypeStorage.getEntity().getType(typeId);
+ async _getType(communicator, typeId) {
+ return await communicator.typeStorage.getType(typeId);
}
}
diff --git a/lib/IgniteClient.js b/lib/IgniteClient.js
index ba3361f..544c37f 100644
--- a/lib/IgniteClient.js
+++ b/lib/IgniteClient.js
@@ -21,9 +21,7 @@
const IgniteClientConfiguration = require('./IgniteClientConfiguration');
const CacheConfiguration = require('./CacheConfiguration');
const BinaryUtils = require('./internal/BinaryUtils');
-const BinaryWriter = require('./internal/BinaryWriter');
-const BinaryReader = require('./internal/BinaryReader');
-const BinaryTypeStorage = require('./internal/BinaryTypeStorage');
+const BinaryCommunicator = require('./internal/BinaryCommunicator');
const ArgumentChecker = require('./internal/ArgumentChecker');
const Logger = require('./internal/Logger');
@@ -70,7 +68,7 @@
constructor(onStateChanged = null) {
const ClientFailoverSocket = require('./internal/ClientFailoverSocket');
this._socket = new ClientFailoverSocket(onStateChanged);
- BinaryTypeStorage.createEntity(this._socket);
+ this._communicator = new BinaryCommunicator(this._socket);
}
static get STATE() {
@@ -133,7 +131,7 @@
ArgumentChecker.notEmpty(name, 'name');
ArgumentChecker.hasType(cacheConfig, 'cacheConfig', false, CacheConfiguration);
- await this._socket.send(
+ await this._communicator.send(
cacheConfig ?
BinaryUtils.OPERATION.CACHE_CREATE_WITH_CONFIGURATION :
BinaryUtils.OPERATION.CACHE_CREATE_WITH_NAME,
@@ -161,7 +159,7 @@
async getOrCreateCache(name, cacheConfig = null) {
ArgumentChecker.notEmpty(name, 'name');
ArgumentChecker.hasType(cacheConfig, 'cacheConfig', false, CacheConfiguration);
- await this._socket.send(
+ await this._communicator.send(
cacheConfig ?
BinaryUtils.OPERATION.CACHE_GET_OR_CREATE_WITH_CONFIGURATION :
BinaryUtils.OPERATION.CACHE_GET_OR_CREATE_WITH_NAME,
@@ -199,7 +197,7 @@
*/
async destroyCache(name) {
ArgumentChecker.notEmpty(name, 'name');
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_DESTROY,
async (payload) => {
payload.writeInteger(CacheClient._calculateId(name));
@@ -222,7 +220,7 @@
async getCacheConfiguration(name) {
ArgumentChecker.notEmpty(name, 'name');
let config;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_GET_CONFIGURATION,
async (payload) => {
payload.writeInteger(CacheClient._calculateId(name));
@@ -230,7 +228,7 @@
},
async (payload) => {
config = new CacheConfiguration();
- await config._read(payload);
+ await config._read(this._communicator, payload);
});
return config;
}
@@ -248,11 +246,11 @@
*/
async cacheNames() {
let names;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.CACHE_GET_NAMES,
null,
async (payload) => {
- names = await BinaryReader.readStringArray(payload);
+ names = await this._communicator.readStringArray(payload);
});
return names;
}
@@ -273,7 +271,7 @@
* @ignore
*/
_getCache(name, cacheConfig = null) {
- return new CacheClient(name, cacheConfig, this._socket);
+ return new CacheClient(name, cacheConfig, this._communicator);
}
/**
@@ -281,10 +279,10 @@
*/
async _writeCacheNameOrConfig(buffer, name, cacheConfig) {
if (cacheConfig) {
- await cacheConfig._write(buffer, name);
+ await cacheConfig._write(this._communicator, buffer, name);
}
else {
- await BinaryWriter.writeString(buffer, name);
+ BinaryCommunicator.writeString(buffer, name);
}
}
}
diff --git a/lib/Query.js b/lib/Query.js
index 5c230df..029ec3d 100644
--- a/lib/Query.js
+++ b/lib/Query.js
@@ -20,7 +20,7 @@
const Cursor = require('./Cursor').Cursor;
const SqlFieldsCursor = require('./Cursor').SqlFieldsCursor;
const ArgumentChecker = require('./internal/ArgumentChecker');
-const BinaryWriter = require('./internal/BinaryWriter');
+const BinaryCommunicator = require('./internal/BinaryCommunicator');
const BinaryUtils = require('./internal/BinaryUtils');
const PAGE_SIZE_DEFAULT = 1024;
@@ -220,10 +220,10 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._type);
- await BinaryWriter.writeString(buffer, this._sql);
- await this._writeArgs(buffer);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._type);
+ BinaryCommunicator.writeString(buffer, this._sql);
+ await this._writeArgs(communicator, buffer);
buffer.writeBoolean(this._distributedJoins);
buffer.writeBoolean(this._local);
buffer.writeBoolean(this._replicatedOnly);
@@ -234,14 +234,14 @@
/**
* @ignore
*/
- async _writeArgs(buffer) {
+ async _writeArgs(communicator, buffer) {
const argsLength = this._args ? this._args.length : 0;
buffer.writeInteger(argsLength);
if (argsLength > 0) {
let argType;
for (let i = 0; i < argsLength; i++) {
argType = this._argTypes && i < this._argTypes.length ? this._argTypes[i] : null;
- await BinaryWriter.writeObject(buffer, this._args[i], argType);
+ await communicator.writeObject(buffer, this._args[i], argType);
}
}
}
@@ -249,8 +249,8 @@
/**
* @ignore
*/
- async _getCursor(socket, payload, keyType = null, valueType = null) {
- const cursor = new Cursor(socket, BinaryUtils.OPERATION.QUERY_SQL_CURSOR_GET_PAGE, payload, keyType, valueType);
+ async _getCursor(communicator, payload, keyType = null, valueType = null) {
+ const cursor = new Cursor(communicator, BinaryUtils.OPERATION.QUERY_SQL_CURSOR_GET_PAGE, payload, keyType, valueType);
cursor._readId(payload);
return cursor;
}
@@ -410,12 +410,12 @@
/**
* @ignore
*/
- async _write(buffer) {
- await BinaryWriter.writeString(buffer, this._schema);
+ async _write(communicator, buffer) {
+ BinaryCommunicator.writeString(buffer, this._schema);
buffer.writeInteger(this._pageSize);
buffer.writeInteger(this._maxRows);
- await BinaryWriter.writeString(buffer, this._sql);
- await this._writeArgs(buffer)
+ BinaryCommunicator.writeString(buffer, this._sql);
+ await this._writeArgs(communicator, buffer)
buffer.writeByte(this._statementType);
buffer.writeBoolean(this._distributedJoins);
buffer.writeBoolean(this._local);
@@ -430,8 +430,8 @@
/**
* @ignore
*/
- async _getCursor(socket, payload, keyType = null, valueType = null) {
- const cursor = new SqlFieldsCursor(socket, payload);
+ async _getCursor(communicator, payload, keyType = null, valueType = null) {
+ const cursor = new SqlFieldsCursor(communicator, payload);
await cursor._readFieldNames(payload, this._includeFieldNames);
return cursor;
}
@@ -485,9 +485,9 @@
/**
* @ignore
*/
- async _write(buffer) {
+ async _write(communicator, buffer) {
// filter
- await BinaryWriter.writeObject(buffer, null);
+ await communicator.writeObject(buffer, null);
buffer.writeInteger(this._pageSize);
buffer.writeInteger(this._partitionNumber);
buffer.writeBoolean(this._local);
@@ -496,8 +496,8 @@
/**
* @ignore
*/
- async _getCursor(socket, payload, keyType = null, valueType = null) {
- const cursor = new Cursor(socket, BinaryUtils.OPERATION.QUERY_SCAN_CURSOR_GET_PAGE, payload, keyType, valueType);
+ async _getCursor(communicator, payload, keyType = null, valueType = null) {
+ const cursor = new Cursor(communicator, BinaryUtils.OPERATION.QUERY_SCAN_CURSOR_GET_PAGE, payload, keyType, valueType);
cursor._readId(payload);
return cursor;
}
diff --git a/lib/internal/BinaryCommunicator.js b/lib/internal/BinaryCommunicator.js
new file mode 100644
index 0000000..9418d36
--- /dev/null
+++ b/lib/internal/BinaryCommunicator.js
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+const Decimal = require('decimal.js');
+const CollectionObjectType = require('../ObjectType').CollectionObjectType;
+const ComplexObjectType = require('../ObjectType').ComplexObjectType;
+const Errors = require('../Errors');
+const Timestamp = require('../Timestamp');
+const EnumItem = require('../EnumItem');
+const BinaryUtils = require('./BinaryUtils');
+const BinaryTypeStorage = require('./BinaryTypeStorage');
+
+class BinaryCommunicator {
+
+ constructor(socket) {
+ this._socket = socket;
+ this._typeStorage = new BinaryTypeStorage(this);
+ }
+
+ static readString(buffer) {
+ const typeCode = buffer.readByte();
+ BinaryUtils.checkTypesComatibility(BinaryUtils.TYPE_CODE.STRING, typeCode);
+ if (typeCode === BinaryUtils.TYPE_CODE.NULL) {
+ return null;
+ }
+ return buffer.readString();
+ }
+
+ static writeString(buffer, value) {
+ if (value === null) {
+ buffer.writeByte(BinaryUtils.TYPE_CODE.NULL);
+ }
+ else {
+ buffer.writeByte(BinaryUtils.TYPE_CODE.STRING);
+ buffer.writeString(value);
+ }
+ }
+
+ async send(opCode, payloadWriter, payloadReader = null) {
+ await this._socket.send(opCode, payloadWriter, payloadReader);
+ }
+
+ get typeStorage() {
+ return this._typeStorage;
+ }
+
+ async readObject(buffer, expectedType = null) {
+ const typeCode = buffer.readByte();
+ BinaryUtils.checkTypesComatibility(expectedType, typeCode);
+ return await this._readTypedObject(buffer, typeCode, expectedType);
+ }
+
+ async readStringArray(buffer) {
+ return await this._readTypedObject(buffer, BinaryUtils.TYPE_CODE.STRING_ARRAY);
+ }
+
+ async writeObject(buffer, object, objectType = null, writeObjectType = true) {
+ BinaryUtils.checkCompatibility(object, objectType);
+ if (object === null) {
+ buffer.writeByte(BinaryUtils.TYPE_CODE.NULL);
+ return;
+ }
+
+ objectType = objectType ? objectType : BinaryUtils.calcObjectType(object);
+ const objectTypeCode = BinaryUtils.getTypeCode(objectType);
+
+ if (writeObjectType) {
+ buffer.writeByte(objectTypeCode);
+ }
+ switch (objectTypeCode) {
+ case BinaryUtils.TYPE_CODE.BYTE:
+ case BinaryUtils.TYPE_CODE.SHORT:
+ case BinaryUtils.TYPE_CODE.INTEGER:
+ case BinaryUtils.TYPE_CODE.FLOAT:
+ case BinaryUtils.TYPE_CODE.DOUBLE:
+ buffer.writeNumber(object, objectTypeCode);
+ break;
+ case BinaryUtils.TYPE_CODE.LONG:
+ buffer.writeLong(object);
+ break;
+ case BinaryUtils.TYPE_CODE.CHAR:
+ buffer.writeChar(object);
+ break;
+ case BinaryUtils.TYPE_CODE.BOOLEAN:
+ buffer.writeBoolean(object);
+ break;
+ case BinaryUtils.TYPE_CODE.STRING:
+ buffer.writeString(object);
+ break;
+ case BinaryUtils.TYPE_CODE.UUID:
+ this._writeUUID(buffer, object);
+ break;
+ case BinaryUtils.TYPE_CODE.DATE:
+ buffer.writeDate(object);
+ break;
+ case BinaryUtils.TYPE_CODE.ENUM:
+ await this._writeEnum(buffer, object);
+ break;
+ case BinaryUtils.TYPE_CODE.DECIMAL:
+ this._writeDecimal(buffer, object);
+ break;
+ case BinaryUtils.TYPE_CODE.TIMESTAMP:
+ this._writeTimestamp(buffer, object);
+ break;
+ case BinaryUtils.TYPE_CODE.TIME:
+ this._writeTime(buffer, object);
+ break;
+ case BinaryUtils.TYPE_CODE.BYTE_ARRAY:
+ case BinaryUtils.TYPE_CODE.SHORT_ARRAY:
+ case BinaryUtils.TYPE_CODE.INTEGER_ARRAY:
+ case BinaryUtils.TYPE_CODE.LONG_ARRAY:
+ case BinaryUtils.TYPE_CODE.FLOAT_ARRAY:
+ case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY:
+ case BinaryUtils.TYPE_CODE.CHAR_ARRAY:
+ case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY:
+ case BinaryUtils.TYPE_CODE.STRING_ARRAY:
+ case BinaryUtils.TYPE_CODE.UUID_ARRAY:
+ case BinaryUtils.TYPE_CODE.DATE_ARRAY:
+ case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
+ case BinaryUtils.TYPE_CODE.ENUM_ARRAY:
+ case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY:
+ case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY:
+ case BinaryUtils.TYPE_CODE.TIME_ARRAY:
+ await this._writeArray(buffer, object, objectType, objectTypeCode);
+ break;
+ case BinaryUtils.TYPE_CODE.COLLECTION:
+ await this._writeCollection(buffer, object, objectType);
+ break;
+ case BinaryUtils.TYPE_CODE.MAP:
+ await this._writeMap(buffer, object, objectType);
+ break;
+ case BinaryUtils.TYPE_CODE.BINARY_OBJECT:
+ await this._writeBinaryObject(buffer, object, objectType);
+ break;
+ case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT:
+ await this._writeComplexObject(buffer, object, objectType);
+ break;
+ default:
+ throw Errors.IgniteClientError.unsupportedTypeError(objectType);
+ }
+ }
+
+ async _readTypedObject(buffer, objectTypeCode, expectedType = null) {
+ switch (objectTypeCode) {
+ case BinaryUtils.TYPE_CODE.BYTE:
+ case BinaryUtils.TYPE_CODE.SHORT:
+ case BinaryUtils.TYPE_CODE.INTEGER:
+ case BinaryUtils.TYPE_CODE.FLOAT:
+ case BinaryUtils.TYPE_CODE.DOUBLE:
+ return buffer.readNumber(objectTypeCode);
+ case BinaryUtils.TYPE_CODE.LONG:
+ return buffer.readLong().toNumber();
+ case BinaryUtils.TYPE_CODE.CHAR:
+ return buffer.readChar();
+ case BinaryUtils.TYPE_CODE.BOOLEAN:
+ return buffer.readBoolean();
+ case BinaryUtils.TYPE_CODE.STRING:
+ return buffer.readString();
+ case BinaryUtils.TYPE_CODE.UUID:
+ return this._readUUID(buffer);
+ case BinaryUtils.TYPE_CODE.DATE:
+ return buffer.readDate();
+ case BinaryUtils.TYPE_CODE.ENUM:
+ case BinaryUtils.TYPE_CODE.BINARY_ENUM:
+ return await this._readEnum(buffer);
+ case BinaryUtils.TYPE_CODE.DECIMAL:
+ return this._readDecimal(buffer);
+ case BinaryUtils.TYPE_CODE.TIMESTAMP:
+ return this._readTimestamp(buffer);
+ case BinaryUtils.TYPE_CODE.TIME:
+ return buffer.readDate();
+ case BinaryUtils.TYPE_CODE.BYTE_ARRAY:
+ case BinaryUtils.TYPE_CODE.SHORT_ARRAY:
+ case BinaryUtils.TYPE_CODE.INTEGER_ARRAY:
+ case BinaryUtils.TYPE_CODE.LONG_ARRAY:
+ case BinaryUtils.TYPE_CODE.FLOAT_ARRAY:
+ case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY:
+ case BinaryUtils.TYPE_CODE.CHAR_ARRAY:
+ case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY:
+ case BinaryUtils.TYPE_CODE.STRING_ARRAY:
+ case BinaryUtils.TYPE_CODE.UUID_ARRAY:
+ case BinaryUtils.TYPE_CODE.DATE_ARRAY:
+ case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
+ case BinaryUtils.TYPE_CODE.ENUM_ARRAY:
+ case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY:
+ case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY:
+ case BinaryUtils.TYPE_CODE.TIME_ARRAY:
+ return await this._readArray(buffer, objectTypeCode, expectedType);
+ case BinaryUtils.TYPE_CODE.COLLECTION:
+ return await this._readCollection(buffer, expectedType);
+ case BinaryUtils.TYPE_CODE.MAP:
+ return await this._readMap(buffer, expectedType);
+ case BinaryUtils.TYPE_CODE.BINARY_OBJECT:
+ return await this._readBinaryObject(buffer, expectedType);
+ case BinaryUtils.TYPE_CODE.NULL:
+ return null;
+ case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT:
+ return await this._readComplexObject(buffer, expectedType);
+ default:
+ throw Errors.IgniteClientError.unsupportedTypeError(objectTypeCode);
+ }
+ }
+
+ _readUUID(buffer) {
+ return [...buffer.readBuffer(BinaryUtils.getSize(BinaryUtils.TYPE_CODE.UUID))];
+ }
+
+ async _readEnum(buffer) {
+ const enumItem = new EnumItem(0);
+ await enumItem._read(this, buffer);
+ return enumItem;
+ }
+
+ _readDecimal(buffer) {
+ const scale = buffer.readInteger();
+ const dataLength = buffer.readInteger();
+ const data = buffer.readBuffer(dataLength);
+ const isNegative = (data[0] & 0x80) !== 0;
+ if (isNegative) {
+ data[0] &= 0x7F;
+ }
+ let result = new Decimal('0x' + data.toString('hex'));
+ if (isNegative) {
+ result = result.negated();
+ }
+ return result.mul(Decimal.pow(10, -scale));
+ }
+
+ _readTimestamp(buffer) {
+ return new Timestamp(buffer.readLong().toNumber(), buffer.readInteger());
+ }
+
+ async _readArray(buffer, arrayTypeCode, arrayType) {
+ if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) {
+ buffer.readInteger();
+ }
+ const length = buffer.readInteger();
+ const elementType = BinaryUtils.getArrayElementType(arrayType ? arrayType : arrayTypeCode);
+ const keepElementType = elementType === null ? true : BinaryUtils.keepArrayElementType(arrayTypeCode);
+ const result = new Array(length);
+ for (let i = 0; i < length; i++) {
+ result[i] = keepElementType ?
+ await this.readObject(buffer, elementType) :
+ await this._readTypedObject(buffer, elementType);
+ }
+ return result;
+ }
+
+ async _readMap(buffer, expectedMapType) {
+ const result = new Map();
+ const size = buffer.readInteger();
+ const subType = buffer.readByte();
+ let key, value;
+ for (let i = 0; i < size; i++) {
+ key = await this.readObject(buffer, expectedMapType ? expectedMapType._keyType : null);
+ value = await this.readObject(buffer, expectedMapType ? expectedMapType._valueType : null);
+ result.set(key, value);
+ }
+ return result;
+ }
+
+ async _readCollection(buffer, expectedColType) {
+ const size = buffer.readInteger();
+ const subType = buffer.readByte();
+ const isSet = CollectionObjectType._isSet(subType);
+ const result = isSet ? new Set() : new Array(size);
+ let element;
+ for (let i = 0; i < size; i++) {
+ element = await this.readObject(buffer, expectedColType ? expectedColType._elementType : null);
+ if (isSet) {
+ result.add(element);
+ }
+ else {
+ result[i] = element;
+ }
+ }
+ return result;
+ }
+
+ async _readBinaryObject(buffer, expectedType) {
+ const size = buffer.readInteger();
+ const startPos = buffer.position;
+ buffer.position = startPos + size;
+ const offset = buffer.readInteger();
+ const endPos = buffer.position;
+ buffer.position = startPos + offset;
+ const result = await this.readObject(buffer, expectedType);
+ buffer.position = endPos;
+ return result;
+ }
+
+ async _readComplexObject(buffer, expectedType) {
+ buffer.position = buffer.position - 1;
+ const BinaryObject = require('../BinaryObject');
+ const binaryObject = await BinaryObject._fromBuffer(this, buffer);
+ return expectedType ?
+ await binaryObject.toObject(expectedType) : binaryObject;
+ }
+
+ _writeUUID(buffer, value) {
+ buffer.writeBuffer(Buffer.from(value));
+ }
+
+ async _writeEnum(buffer, enumValue) {
+ await enumValue._write(this, buffer);
+ }
+
+ _writeDecimal(buffer, decimal) {
+ let strValue = decimal.toExponential();
+ let expIndex = strValue.indexOf('e');
+ if (expIndex < 0) {
+ expIndex = strValue.indexOf('E');
+ }
+ let scale = 0;
+ if (expIndex >= 0) {
+ scale = parseInt(strValue.substring(expIndex + 1));
+ strValue = strValue.substring(0, expIndex);
+ }
+ const isNegative = strValue.startsWith('-');
+ if (isNegative) {
+ strValue = strValue.substring(1);
+ }
+ const dotIndex = strValue.indexOf('.');
+ if (dotIndex >= 0) {
+ scale -= strValue.length - dotIndex - 1;
+ strValue = strValue.substring(0, dotIndex) + strValue.substring(dotIndex + 1);
+ }
+ scale = -scale;
+ let hexValue = new Decimal(strValue).toHexadecimal().substring(2);
+ hexValue = ((hexValue.length % 2 !== 0) ? '000' : '00') + hexValue;
+ const valueBuffer = Buffer.from(hexValue, 'hex');
+ if (isNegative) {
+ valueBuffer[0] |= 0x80;
+ }
+ buffer.writeInteger(scale);
+ buffer.writeInteger(valueBuffer.length);
+ buffer.writeBuffer(valueBuffer);
+ }
+
+ _writeTimestamp(buffer, timestamp) {
+ buffer.writeDate(timestamp);
+ buffer.writeInteger(timestamp.getNanos());
+ }
+
+ _writeTime(buffer, time) {
+ const midnight = new Date(time);
+ midnight.setHours(0, 0, 0, 0);
+ buffer.writeLong(time.getTime() - midnight.getTime());
+ }
+
+ async _writeArray(buffer, array, arrayType, arrayTypeCode) {
+ const BinaryType = require('./BinaryType');
+ const elementType = BinaryUtils.getArrayElementType(arrayType);
+ const keepElementType = BinaryUtils.keepArrayElementType(arrayTypeCode);
+ if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) {
+ buffer.writeInteger(elementType instanceof ComplexObjectType ?
+ BinaryType._calculateId(elementType._typeName) : -1);
+ }
+ buffer.writeInteger(array.length);
+ for (let elem of array) {
+ await this.writeObject(buffer, elem, elementType, keepElementType);
+ }
+ }
+
+ async _writeCollection(buffer, collection, collectionType) {
+ buffer.writeInteger(collection instanceof Set ? collection.size : collection.length);
+ buffer.writeByte(collectionType._subType);
+ for (let element of collection) {
+ await this.writeObject(buffer, element, collectionType._elementType);
+ }
+ }
+
+ async _writeMap(buffer, map, mapType) {
+ buffer.writeInteger(map.size);
+ buffer.writeByte(mapType._subType);
+ for (let [key, value] of map.entries()) {
+ await this.writeObject(buffer, key, mapType._keyType);
+ await this.writeObject(buffer, value, mapType._valueType);
+ }
+ }
+
+ async _writeBinaryObject(buffer, binaryObject) {
+ buffer.position = buffer.position - 1;
+ await binaryObject._write(this, buffer);
+ }
+
+ async _writeComplexObject(buffer, object, objectType) {
+ const BinaryObject = require('../BinaryObject');
+ await this._writeBinaryObject(buffer, await BinaryObject.fromObject(object, objectType));
+ }
+}
+
+module.exports = BinaryCommunicator;
diff --git a/lib/internal/BinaryReader.js b/lib/internal/BinaryReader.js
deleted file mode 100644
index 8c25c39..0000000
--- a/lib/internal/BinaryReader.js
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-'use strict';
-
-const Decimal = require('decimal.js');
-const BinaryObject = require('../BinaryObject');
-const CollectionObjectType = require('../ObjectType').CollectionObjectType;
-const Errors = require('../Errors');
-const Timestamp = require('../Timestamp');
-const EnumItem = require('../EnumItem');
-const BinaryUtils = require('./BinaryUtils');
-
-class BinaryReader {
-
- static async readObject(buffer, expectedType = null) {
- const typeCode = buffer.readByte();
- BinaryUtils.checkTypesComatibility(expectedType, typeCode);
- return await BinaryReader._readTypedObject(buffer, typeCode, expectedType);
- }
-
- static async readStringArray(buffer) {
- return await BinaryReader._readTypedObject(buffer, BinaryUtils.TYPE_CODE.STRING_ARRAY);
- }
-
- static async _readTypedObject(buffer, objectTypeCode, expectedType = null) {
- switch (objectTypeCode) {
- case BinaryUtils.TYPE_CODE.BYTE:
- case BinaryUtils.TYPE_CODE.SHORT:
- case BinaryUtils.TYPE_CODE.INTEGER:
- case BinaryUtils.TYPE_CODE.FLOAT:
- case BinaryUtils.TYPE_CODE.DOUBLE:
- return buffer.readNumber(objectTypeCode);
- case BinaryUtils.TYPE_CODE.LONG:
- return buffer.readLong().toNumber();
- case BinaryUtils.TYPE_CODE.CHAR:
- return buffer.readChar();
- case BinaryUtils.TYPE_CODE.BOOLEAN:
- return buffer.readBoolean();
- case BinaryUtils.TYPE_CODE.STRING:
- return buffer.readString();
- case BinaryUtils.TYPE_CODE.UUID:
- return BinaryReader._readUUID(buffer);
- case BinaryUtils.TYPE_CODE.DATE:
- return buffer.readDate();
- case BinaryUtils.TYPE_CODE.ENUM:
- case BinaryUtils.TYPE_CODE.BINARY_ENUM:
- return await BinaryReader._readEnum(buffer);
- case BinaryUtils.TYPE_CODE.DECIMAL:
- return BinaryReader._readDecimal(buffer);
- case BinaryUtils.TYPE_CODE.TIMESTAMP:
- return BinaryReader._readTimestamp(buffer);
- case BinaryUtils.TYPE_CODE.TIME:
- return buffer.readDate();
- case BinaryUtils.TYPE_CODE.BYTE_ARRAY:
- case BinaryUtils.TYPE_CODE.SHORT_ARRAY:
- case BinaryUtils.TYPE_CODE.INTEGER_ARRAY:
- case BinaryUtils.TYPE_CODE.LONG_ARRAY:
- case BinaryUtils.TYPE_CODE.FLOAT_ARRAY:
- case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY:
- case BinaryUtils.TYPE_CODE.CHAR_ARRAY:
- case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY:
- case BinaryUtils.TYPE_CODE.STRING_ARRAY:
- case BinaryUtils.TYPE_CODE.UUID_ARRAY:
- case BinaryUtils.TYPE_CODE.DATE_ARRAY:
- case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
- case BinaryUtils.TYPE_CODE.ENUM_ARRAY:
- case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY:
- case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY:
- case BinaryUtils.TYPE_CODE.TIME_ARRAY:
- return await BinaryReader._readArray(buffer, objectTypeCode, expectedType);
- case BinaryUtils.TYPE_CODE.COLLECTION:
- return await BinaryReader._readCollection(buffer, expectedType);
- case BinaryUtils.TYPE_CODE.MAP:
- return await BinaryReader._readMap(buffer, expectedType);
- case BinaryUtils.TYPE_CODE.BINARY_OBJECT:
- return await BinaryReader._readBinaryObject(buffer, expectedType);
- case BinaryUtils.TYPE_CODE.NULL:
- return null;
- case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT:
- return await BinaryReader._readComplexObject(buffer, expectedType);
- default:
- throw Errors.IgniteClientError.unsupportedTypeError(objectTypeCode);
- }
- }
-
- static _readUUID(buffer) {
- return [...buffer.readBuffer(BinaryUtils.getSize(BinaryUtils.TYPE_CODE.UUID))];
- }
-
- static async _readEnum(buffer) {
- const enumItem = new EnumItem(0);
- await enumItem._read(buffer);
- return enumItem;
- }
-
- static _readDecimal(buffer) {
- const scale = buffer.readInteger();
- const dataLength = buffer.readInteger();
- const data = buffer.readBuffer(dataLength);
- const isNegative = (data[0] & 0x80) !== 0;
- if (isNegative) {
- data[0] &= 0x7F;
- }
- let result = new Decimal('0x' + data.toString('hex'));
- if (isNegative) {
- result = result.negated();
- }
- return result.mul(Decimal.pow(10, -scale));
- }
-
- static _readTimestamp(buffer) {
- return new Timestamp(buffer.readLong().toNumber(), buffer.readInteger());
- }
-
- static async _readArray(buffer, arrayTypeCode, arrayType) {
- if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) {
- buffer.readInteger();
- }
- const length = buffer.readInteger();
- const elementType = BinaryUtils.getArrayElementType(arrayType ? arrayType : arrayTypeCode);
- const keepElementType = elementType === null ? true : BinaryUtils.keepArrayElementType(arrayTypeCode);
- const result = new Array(length);
- for (let i = 0; i < length; i++) {
- result[i] = keepElementType ?
- await BinaryReader.readObject(buffer, elementType) :
- await BinaryReader._readTypedObject(buffer, elementType);
- }
- return result;
- }
-
- static async _readMap(buffer, expectedMapType) {
- const result = new Map();
- const size = buffer.readInteger();
- const subType = buffer.readByte();
- let key, value;
- for (let i = 0; i < size; i++) {
- key = await BinaryReader.readObject(buffer, expectedMapType ? expectedMapType._keyType : null);
- value = await BinaryReader.readObject(buffer, expectedMapType ? expectedMapType._valueType : null);
- result.set(key, value);
- }
- return result;
- }
-
- static async _readCollection(buffer, expectedColType) {
- const size = buffer.readInteger();
- const subType = buffer.readByte();
- const isSet = CollectionObjectType._isSet(subType);
- const result = isSet ? new Set() : new Array(size);
- let element;
- for (let i = 0; i < size; i++) {
- element = await BinaryReader.readObject(buffer, expectedColType ? expectedColType._elementType : null);
- if (isSet) {
- result.add(element);
- }
- else {
- result[i] = element;
- }
- }
- return result;
- }
-
- static async _readBinaryObject(buffer, expectedType) {
- const size = buffer.readInteger();
- const startPos = buffer.position;
- buffer.position = startPos + size;
- const offset = buffer.readInteger();
- const endPos = buffer.position;
- buffer.position = startPos + offset;
- const result = await BinaryReader.readObject(buffer, expectedType);
- buffer.position = endPos;
- return result;
- }
-
- static async _readComplexObject(buffer, expectedType) {
- buffer.position = buffer.position - 1;
- const binaryObject = await BinaryObject._fromBuffer(buffer);
- return expectedType ?
- await binaryObject.toObject(expectedType) : binaryObject;
- }
-}
-
-module.exports = BinaryReader;
diff --git a/lib/internal/BinaryType.js b/lib/internal/BinaryType.js
index b9e239d..98aa6a3 100644
--- a/lib/internal/BinaryType.js
+++ b/lib/internal/BinaryType.js
@@ -21,7 +21,7 @@
const ComplexObjectType = require('../ObjectType').ComplexObjectType;
const BinaryTypeStorage = require('./BinaryTypeStorage');
const BinaryUtils = require('./BinaryUtils');
-const BinaryWriter = require('./BinaryWriter');
+const BinaryCommunicator = require('./BinaryCommunicator');
const Errors = require('../Errors');
class BinaryType {
@@ -112,9 +112,9 @@
// type id
buffer.writeInteger(this._id);
// type name
- await BinaryWriter.writeString(buffer, this._name);
+ BinaryCommunicator.writeString(buffer, this._name);
// affinity key field name
- await BinaryWriter.writeString(buffer, null);
+ BinaryCommunicator.writeString(buffer, null);
// fields count
buffer.writeInteger(this._fields.size);
// fields
@@ -136,7 +136,7 @@
buffer.writeInteger(length);
if (length > 0) {
for (let [key, value] of this._enumValues) {
- await BinaryWriter.writeString(buffer, key);
+ BinaryCommunicator.writeString(buffer, key);
buffer.writeInteger(value);
}
}
@@ -147,10 +147,9 @@
// type id
this._id = buffer.readInteger();
// type name
- const BinaryReader = require('./BinaryReader');
- this._name = await BinaryReader.readObject(buffer);
+ this._name = BinaryCommunicator.readString(buffer);
// affinity key field name
- await BinaryReader.readObject(buffer);
+ BinaryCommunicator.readString(buffer);
// fields count
const fieldsCount = buffer.readInteger();
// fields
@@ -173,13 +172,12 @@
}
async _readEnum(buffer) {
- const BinaryReader = require('./BinaryReader');
this._isEnum = buffer.readBoolean();
if (this._isEnum) {
const valuesCount = buffer.readInteger();
this._enumValues = new Array(valuesCount);
for (let i = 0; i < valuesCount; i++) {
- this._enumValues[i] = [await BinaryReader.readObject(buffer), buffer.readInteger()];
+ this._enumValues[i] = [BinaryCommunicator.readString(buffer), buffer.readInteger()];
}
}
}
@@ -312,7 +310,7 @@
async _write(buffer) {
// field name
- await BinaryWriter.writeString(buffer, this._name);
+ BinaryCommunicator.writeString(buffer, this._name);
// type code
buffer.writeInteger(this._typeCode);
// field id
@@ -320,9 +318,8 @@
}
async _read(buffer) {
- const BinaryReader = require('./BinaryReader');
// field name
- this._name = await BinaryReader.readObject(buffer);
+ this._name = BinaryCommunicator.readString(buffer);
// type code
this._typeCode = buffer.readInteger();
// field id
@@ -338,10 +335,10 @@
return result;
}
- static async fromTypeId(typeId, schemaId, hasSchema) {
+ static async fromTypeId(communicator, typeId, schemaId, hasSchema) {
let result = new BinaryTypeBuilder();
if (hasSchema) {
- let type = await BinaryTypeStorage.getEntity().getType(typeId, schemaId);
+ let type = await communicator.typeStorage.getType(typeId, schemaId);
if (type) {
result._type = type;
result._schema = type.getSchema(schemaId);
@@ -372,7 +369,7 @@
static fromComplexObjectType(complexObjectType, jsObject) {
let result = new BinaryTypeBuilder();
- const typeInfo = BinaryTypeStorage.getEntity().getByComplexObjectType(complexObjectType);
+ const typeInfo = BinaryTypeStorage.getByComplexObjectType(complexObjectType);
if (typeInfo) {
result._type = typeInfo[0];
result._schema = typeInfo[1];
@@ -380,7 +377,7 @@
}
else {
result._fromComplexObjectType(complexObjectType, jsObject);
- BinaryTypeStorage.getEntity().setByComplexObjectType(complexObjectType, result._type, result._schema);
+ BinaryTypeStorage.setByComplexObjectType(complexObjectType, result._type, result._schema);
}
return result;
}
@@ -424,9 +421,9 @@
}
}
- async finalize() {
+ async finalize(communicator) {
this._schema.finalize();
- await BinaryTypeStorage.getEntity().addType(this._type, this._schema);
+ await communicator.typeStorage.addType(this._type, this._schema);
}
constructor() {
diff --git a/lib/internal/BinaryTypeStorage.js b/lib/internal/BinaryTypeStorage.js
index d79156b..144710a 100644
--- a/lib/internal/BinaryTypeStorage.js
+++ b/lib/internal/BinaryTypeStorage.js
@@ -22,15 +22,26 @@
class BinaryTypeStorage {
- static getEntity() {
- if (!BinaryTypeStorage._entity) {
- throw Errors.IgniteClientError.internalError();
- }
- return BinaryTypeStorage._entity;
+ constructor(communicator) {
+ this._communicator = communicator;
+ this._types = new Map();
}
- static createEntity(socket) {
- BinaryTypeStorage._entity = new BinaryTypeStorage(socket);
+ static getByComplexObjectType(complexObjectType) {
+ return BinaryTypeStorage.complexObjectTypes.get(complexObjectType);
+ }
+
+ static setByComplexObjectType(complexObjectType, type, schema) {
+ if (!BinaryTypeStorage.complexObjectTypes.has(complexObjectType)) {
+ BinaryTypeStorage.complexObjectTypes.set(complexObjectType, [type, schema]);
+ }
+ }
+
+ static get complexObjectTypes() {
+ if (!BinaryTypeStorage._complexObjectTypes) {
+ BinaryTypeStorage._complexObjectTypes = new Map();
+ }
+ return BinaryTypeStorage._complexObjectTypes;
}
async addType(binaryType, binarySchema) {
@@ -61,29 +72,13 @@
return storageType;
}
- getByComplexObjectType(complexObjectType) {
- return this._complexObjectTypes.get(complexObjectType);
- }
-
- setByComplexObjectType(complexObjectType, type, schema) {
- if (!this._complexObjectTypes.has(complexObjectType)) {
- this._complexObjectTypes.set(complexObjectType, [type, schema]);
- }
- }
-
/** Private methods */
- constructor(socket) {
- this._socket = socket;
- this._types = new Map();
- this._complexObjectTypes = new Map();
- }
-
async _getBinaryType(typeId) {
const BinaryType = require('./BinaryType');
let binaryType = new BinaryType(null);
binaryType._id = typeId;
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.GET_BINARY_TYPE,
async (payload) => {
payload.writeInteger(typeId);
@@ -101,7 +96,7 @@
}
async _putBinaryType(binaryType) {
- await this._socket.send(
+ await this._communicator.send(
BinaryUtils.OPERATION.PUT_BINARY_TYPE,
async (payload) => {
await binaryType._write(payload);
diff --git a/lib/internal/BinaryWriter.js b/lib/internal/BinaryWriter.js
deleted file mode 100644
index 3686bb4..0000000
--- a/lib/internal/BinaryWriter.js
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-'use strict';
-
-const Decimal = require('decimal.js');
-const Errors = require('../Errors');
-const ComplexObjectType = require('../ObjectType').ComplexObjectType;
-const BinaryUtils = require('./BinaryUtils');
-
-class BinaryWriter {
-
- static async writeString(buffer, value) {
- await BinaryWriter.writeObject(buffer, value, BinaryUtils.TYPE_CODE.STRING);
- }
-
- static async writeObject(buffer, object, objectType = null, writeObjectType = true) {
- BinaryUtils.checkCompatibility(object, objectType);
- if (object === null) {
- buffer.writeByte(BinaryUtils.TYPE_CODE.NULL);
- return;
- }
-
- objectType = objectType ? objectType : BinaryUtils.calcObjectType(object);
- const objectTypeCode = BinaryUtils.getTypeCode(objectType);
-
- if (writeObjectType) {
- buffer.writeByte(objectTypeCode);
- }
- switch (objectTypeCode) {
- case BinaryUtils.TYPE_CODE.BYTE:
- case BinaryUtils.TYPE_CODE.SHORT:
- case BinaryUtils.TYPE_CODE.INTEGER:
- case BinaryUtils.TYPE_CODE.FLOAT:
- case BinaryUtils.TYPE_CODE.DOUBLE:
- buffer.writeNumber(object, objectTypeCode);
- break;
- case BinaryUtils.TYPE_CODE.LONG:
- buffer.writeLong(object);
- break;
- case BinaryUtils.TYPE_CODE.CHAR:
- buffer.writeChar(object);
- break;
- case BinaryUtils.TYPE_CODE.BOOLEAN:
- buffer.writeBoolean(object);
- break;
- case BinaryUtils.TYPE_CODE.STRING:
- buffer.writeString(object);
- break;
- case BinaryUtils.TYPE_CODE.UUID:
- BinaryWriter._writeUUID(buffer, object);
- break;
- case BinaryUtils.TYPE_CODE.DATE:
- buffer.writeDate(object);
- break;
- case BinaryUtils.TYPE_CODE.ENUM:
- await BinaryWriter._writeEnum(buffer, object);
- break;
- case BinaryUtils.TYPE_CODE.DECIMAL:
- BinaryWriter._writeDecimal(buffer, object);
- break;
- case BinaryUtils.TYPE_CODE.TIMESTAMP:
- BinaryWriter._writeTimestamp(buffer, object);
- break;
- case BinaryUtils.TYPE_CODE.TIME:
- BinaryWriter._writeTime(buffer, object);
- break;
- case BinaryUtils.TYPE_CODE.BYTE_ARRAY:
- case BinaryUtils.TYPE_CODE.SHORT_ARRAY:
- case BinaryUtils.TYPE_CODE.INTEGER_ARRAY:
- case BinaryUtils.TYPE_CODE.LONG_ARRAY:
- case BinaryUtils.TYPE_CODE.FLOAT_ARRAY:
- case BinaryUtils.TYPE_CODE.DOUBLE_ARRAY:
- case BinaryUtils.TYPE_CODE.CHAR_ARRAY:
- case BinaryUtils.TYPE_CODE.BOOLEAN_ARRAY:
- case BinaryUtils.TYPE_CODE.STRING_ARRAY:
- case BinaryUtils.TYPE_CODE.UUID_ARRAY:
- case BinaryUtils.TYPE_CODE.DATE_ARRAY:
- case BinaryUtils.TYPE_CODE.OBJECT_ARRAY:
- case BinaryUtils.TYPE_CODE.ENUM_ARRAY:
- case BinaryUtils.TYPE_CODE.DECIMAL_ARRAY:
- case BinaryUtils.TYPE_CODE.TIMESTAMP_ARRAY:
- case BinaryUtils.TYPE_CODE.TIME_ARRAY:
- await BinaryWriter._writeArray(buffer, object, objectType, objectTypeCode);
- break;
- case BinaryUtils.TYPE_CODE.COLLECTION:
- await BinaryWriter._writeCollection(buffer, object, objectType);
- break;
- case BinaryUtils.TYPE_CODE.MAP:
- await BinaryWriter._writeMap(buffer, object, objectType);
- break;
- case BinaryUtils.TYPE_CODE.BINARY_OBJECT:
- await BinaryWriter._writeBinaryObject(buffer, object, objectType);
- break;
- case BinaryUtils.TYPE_CODE.COMPLEX_OBJECT:
- await BinaryWriter._writeComplexObject(buffer, object, objectType);
- break;
- default:
- throw Errors.IgniteClientError.unsupportedTypeError(objectType);
- }
- }
-
- static _writeUUID(buffer, value) {
- buffer.writeBuffer(Buffer.from(value));
- }
-
- static async _writeEnum(buffer, enumValue) {
- await enumValue._write(buffer);
- }
-
- static _writeDecimal(buffer, decimal) {
- let strValue = decimal.toExponential();
- let expIndex = strValue.indexOf('e');
- if (expIndex < 0) {
- expIndex = strValue.indexOf('E');
- }
- let scale = 0;
- if (expIndex >= 0) {
- scale = parseInt(strValue.substring(expIndex + 1));
- strValue = strValue.substring(0, expIndex);
- }
- const isNegative = strValue.startsWith('-');
- if (isNegative) {
- strValue = strValue.substring(1);
- }
- const dotIndex = strValue.indexOf('.');
- if (dotIndex >= 0) {
- scale -= strValue.length - dotIndex - 1;
- strValue = strValue.substring(0, dotIndex) + strValue.substring(dotIndex + 1);
- }
- scale = -scale;
- let hexValue = new Decimal(strValue).toHexadecimal().substring(2);
- hexValue = ((hexValue.length % 2 !== 0) ? '000' : '00') + hexValue;
- const valueBuffer = Buffer.from(hexValue, 'hex');
- if (isNegative) {
- valueBuffer[0] |= 0x80;
- }
- buffer.writeInteger(scale);
- buffer.writeInteger(valueBuffer.length);
- buffer.writeBuffer(valueBuffer);
- }
-
- static _writeTimestamp(buffer, timestamp) {
- buffer.writeDate(timestamp);
- buffer.writeInteger(timestamp.getNanos());
- }
-
- static _writeTime(buffer, time) {
- const midnight = new Date(time);
- midnight.setHours(0, 0, 0, 0);
- buffer.writeLong(time.getTime() - midnight.getTime());
- }
-
- static async _writeArray(buffer, array, arrayType, arrayTypeCode) {
- const BinaryType = require('./BinaryType');
- const elementType = BinaryUtils.getArrayElementType(arrayType);
- const keepElementType = BinaryUtils.keepArrayElementType(arrayTypeCode);
- if (arrayTypeCode === BinaryUtils.TYPE_CODE.OBJECT_ARRAY) {
- buffer.writeInteger(elementType instanceof ComplexObjectType ?
- BinaryType._calculateId(elementType._typeName) : -1);
- }
- buffer.writeInteger(array.length);
- for (let elem of array) {
- await BinaryWriter.writeObject(buffer, elem, elementType, keepElementType);
- }
- }
-
- static async _writeCollection(buffer, collection, collectionType) {
- buffer.writeInteger(collection instanceof Set ? collection.size : collection.length);
- buffer.writeByte(collectionType._subType);
- for (let element of collection) {
- await BinaryWriter.writeObject(buffer, element, collectionType._elementType);
- }
- }
-
- static async _writeMap(buffer, map, mapType) {
- buffer.writeInteger(map.size);
- buffer.writeByte(mapType._subType);
- for (let [key, value] of map.entries()) {
- await BinaryWriter.writeObject(buffer, key, mapType._keyType);
- await BinaryWriter.writeObject(buffer, value, mapType._valueType);
- }
- }
-
- static async _writeBinaryObject(buffer, binaryObject) {
- buffer.position = buffer.position - 1;
- await binaryObject._write(buffer);
- }
-
- static async _writeComplexObject(buffer, object, objectType) {
- const BinaryObject = require('../BinaryObject');
- await BinaryWriter._writeBinaryObject(buffer, await BinaryObject.fromObject(object, objectType));
- }
-}
-
-module.exports = BinaryWriter;
diff --git a/lib/internal/ClientSocket.js b/lib/internal/ClientSocket.js
index 1f12040..ac7ccec 100644
--- a/lib/internal/ClientSocket.js
+++ b/lib/internal/ClientSocket.js
@@ -26,8 +26,7 @@
const IgniteClientConfiguration = require('../IgniteClientConfiguration');
const MessageBuffer = require('./MessageBuffer');
const BinaryUtils = require('./BinaryUtils');
-const BinaryReader = require('./BinaryReader');
-const BinaryWriter = require('./BinaryWriter');
+const BinaryCommunicator = require('./BinaryCommunicator');
const ArgumentChecker = require('./ArgumentChecker');
const Logger = require('./Logger');
@@ -240,7 +239,7 @@
const serverVersion = new ProtocolVersion();
serverVersion.read(buffer);
// Error message
- const errMessage = await BinaryReader.readObject(buffer);
+ const errMessage = BinaryCommunicator.readString(buffer);
if (!this._protocolVersion.equals(serverVersion)) {
if (!this._isSupportedVersion(serverVersion) ||
@@ -271,7 +270,7 @@
async _finalizeResponse(buffer, request, isSuccess) {
if (!isSuccess) {
// Error message
- const errMessage = await BinaryReader.readObject(buffer);
+ const errMessage = BinaryCommunicator.readString(buffer);
request.reject(new Errors.OperationError(errMessage));
}
else {
@@ -295,8 +294,8 @@
// Client code
payload.writeByte(2);
if (this._config._userName) {
- await BinaryWriter.writeString(payload, this._config._userName);
- await BinaryWriter.writeString(payload, this._config._password);
+ BinaryCommunicator.writeString(payload, this._config._userName);
+ BinaryCommunicator.writeString(payload, this._config._password);
}
}
diff --git a/spec/examples/AuthExample.spec.js b/spec/examples/AuthExample.spec.js
index 3fb9205..667a396 100644
--- a/spec/examples/AuthExample.spec.js
+++ b/spec/examples/AuthExample.spec.js
@@ -20,6 +20,11 @@
const TestingHelper = require('../TestingHelper');
describe('execute auth example >', () => {
+ beforeAll((done) => {
+ jasmine.DEFAULT_TIMEOUT_INTERVAL = TestingHelper.TIMEOUT;
+ done();
+ });
+
it('AuthTlsExample', (done) => {
TestingHelper.executeExample('examples/AuthTlsExample.js').
then(done).
diff --git a/spec/examples/Examples.spec.js b/spec/examples/Examples.spec.js
index c8dce3c..2ba5f08 100644
--- a/spec/examples/Examples.spec.js
+++ b/spec/examples/Examples.spec.js
@@ -20,6 +20,11 @@
const TestingHelper = require('../TestingHelper');
describe('execute examples >', () => {
+ beforeAll((done) => {
+ jasmine.DEFAULT_TIMEOUT_INTERVAL = TestingHelper.TIMEOUT;
+ done();
+ });
+
it('CachePutGetExample', (done) => {
TestingHelper.executeExample('examples/CachePutGetExample.js').
then(done).