blob: 39ed3c762174adc8f92326d3a3b26d4e5c077874 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const { assert } = require('chai');
const sinon = require('sinon');
const util = require('util');
const path = require('path');
const policies = require('../lib/policies');
const types = require('../lib/types');
// const { types } = require('../lib/types');
const utils = require('../lib/utils');
const spawn = require('child_process').spawn;
const childProcessExec = require('child_process').exec;
const http = require('http');
const temp = require('temp').track(true);
const Client = require('../lib/client');
const defaultOptions = require('../lib/client-options').defaultOptions;
const { Host, HostMap } = require('../lib/host');
const OperationState = require('../lib/operation-state');
const promiseUtils = require('../lib/promise-utils');
const Vector = types.Vector;
util.inherits(RetryMultipleTimes, policies.retry.RetryPolicy);
const cassandraVersionByDse = {
'4.8': '2.1',
'5.0': '3.0',
'5.1': '3.11',
'6.0': '3.11',
'6.7': '3.11',
'6.8': '3.11',
'6.9': '3.11'
};
const cassandraVersionByHcd = {
'1.0': '4.0',
};
const afterNextHandlers = [];
let testUnhandledError = null;
// Use a afterEach handler at root level
afterEach(async () => {
while (afterNextHandlers.length > 0) {
const handler = afterNextHandlers.pop();
await handler();
}
});
afterEach('unhandled check', function () {
if (testUnhandledError !== null) {
const err = testUnhandledError;
testUnhandledError = null;
this.test.error(err);
}
});
// Add a listener for unhandled rejections and throw the error to exit with 1
process.on('unhandledRejection', reason => {
testUnhandledError = reason;
});
const helper = {
/**
* Creates a ccm cluster, initializes a Client instance the before() and after() hooks, create
* @param {Number|String} nodeLength A number representing the amount of nodes in a single datacenter or a string
* representing the amount of nodes in each datacenter, ie: "3:4".
* @param {Object} [options]
* @param {Object} [options.ccmOptions]
* @param {Boolean} [options.initClient] Determines whether to create a Client instance.
* @param {Object} [options.clientOptions] The options to use to initialize the client.
* @param {String} [options.keyspace] Name of the keyspace to create.
* @param {Number} [options.replicationFactor] Keyspace replication factor.
* @param {Array<String>} [options.queries] Queries to run after client creation.
* @param {Boolean} [options.removeClusterAfter=true] Determines whether ccm remove should be called on after().
*/
setup: function (nodeLength, options) {
options = options || utils.emptyObject;
before(helper.ccmHelper.start(nodeLength || 1, options.ccmOptions));
const initClient = options.initClient !== false;
let client;
let keyspace;
if (initClient) {
client = new Client(utils.extend({}, options.clientOptions, helper.baseOptions));
before(client.connect.bind(client));
keyspace = options.keyspace || helper.getRandomName('ks');
before(helper.toTask(client.execute, client, helper.createKeyspaceCql(keyspace, options.replicationFactor)));
before(helper.toTask(client.execute, client, 'USE ' + keyspace));
if (options.queries) {
before(function (done) {
utils.eachSeries(options.queries, function (q, next) {
client.execute(q, next);
}, done);
});
}
after(client.shutdown.bind(client));
}
if (options.removeClusterAfter !== false) {
after(helper.ccmHelper.remove);
}
return {
client: client,
keyspace: keyspace
};
},
/**
* Sync throws the error
* @type Function
*/
throwop: function (err) {
if (err) {
throw err;
}
},
/** @type Function */
noop: function () {
//do nothing
},
/** @type Function */
failop: function () {
throw new Error('Method should not be called');
},
/**
* Uses the last parameter as callback, invokes it via setImmediate
*/
callbackNoop: function () {
const args = Array.prototype.slice.call(arguments);
const cb = args[args.length-1];
if (typeof cb !== 'function') {
throw new Error('Helper method needs a callback as last parameter');
}
setImmediate(cb);
},
/**
* Returns a function that returns the provided value
* @param value
*/
functionOf: function (value) {
return (function fnOfFixedValue() {
return value;
});
},
/**
* @type {ClientOptions}
*/
baseOptions: (function () {
return {
//required
contactPoints: ['127.0.0.1'],
localDataCenter: 'dc1',
// retry all queries multiple times (for improved test resiliency).
policies: { retry: new RetryMultipleTimes(3) }
};
})(),
/**
* Returns a pseudo-random name in the form of 'ab{n}', n being an int zero padded with string length 16
* @returns {string}
*/
getRandomName: function (prefix) {
if (!prefix) {
prefix = 'ab';
}
const value = Math.floor(Math.random() * utils.maxInt);
return prefix + ('000000000000000' + value.toString()).slice(-16);
},
ipPrefix: '127.0.0.',
ccm: {},
ads: {},
/**
* Returns a cql string with a CREATE TABLE command containing all common types
* @param {String} tableName
* @returns {String}
*/
createTableCql: function (tableName) {
return util.format('CREATE TABLE %s (' +
' id uuid primary key,' +
' ascii_sample ascii,' +
' text_sample text,' +
' int_sample int,' +
' bigint_sample bigint,' +
' float_sample float,' +
' double_sample double,' +
' decimal_sample decimal,' +
' blob_sample blob,' +
' boolean_sample boolean,' +
' timestamp_sample timestamp,' +
' inet_sample inet,' +
' timeuuid_sample timeuuid,' +
' map_sample map<text, text>,' +
' list_sample list<text>,' +
' list_sample2 list<int>,' +
' set_sample set<text>)', tableName);
},
/**
* Returns a cql string with a CREATE TABLE command 1 partition key and 1 clustering key
* @param {String} tableName
* @returns {String}
*/
createTableWithClusteringKeyCql: function (tableName) {
return util.format('CREATE TABLE %s (' +
' id1 uuid,' +
' id2 timeuuid,' +
' text_sample text,' +
' int_sample int,' +
' bigint_sample bigint,' +
' float_sample float,' +
' double_sample double,' +
' map_sample map<uuid, int>,' +
' list_sample list<timeuuid>,' +
' set_sample set<int>,' +
' PRIMARY KEY (id1, id2))', tableName);
},
createKeyspaceCql: function (keyspace, replicationFactor, durableWrites) {
return util.format('CREATE KEYSPACE %s' +
' WITH replication = {\'class\': \'SimpleStrategy\', \'replication_factor\' : %d}' +
' AND durable_writes = %s;', keyspace, replicationFactor || 1, !!durableWrites
);
},
assertValueEqual: function (val1, val2) {
if (val1 === null && val2 === null) {
return;
}
if (val1 instanceof Buffer && val2 instanceof Buffer) {
val1 = val1.toString('hex');
val2 = val2.toString('hex');
}
if ((val1 instanceof types.Long && val2 instanceof types.Long) ||
(val1 instanceof Date && val2 instanceof Date) ||
(val1 instanceof types.InetAddress && val2 instanceof types.InetAddress) ||
(val1 instanceof types.Uuid && val2 instanceof types.Uuid)) {
val1 = val1.toString();
val2 = val2.toString();
}
if (Array.isArray(val1) ||
(val1.constructor && val1.constructor.name === 'Object') ||
val1 instanceof helper.Map) {
val1 = util.inspect(val1, {depth: null});
val2 = util.inspect(val2, {depth: null});
}
assert.strictEqual(val1, val2);
},
assertInstanceOf: function (instance, constructor) {
assert.notEqual(instance, null, 'Expected instance, obtained ' + instance);
assert.ok(instance instanceof constructor, 'Expected instance of ' + constructor.name + ', actual constructor: ' + instance.constructor.name);
},
assertNotInstanceOf: function (instance, constructor) {
assert.notEqual(instance, null, 'Expected instance, obtained ' + instance);
assert.ok(!(instance instanceof constructor), 'Expected instance different than ' + constructor.name + ', actual constructor: ' + instance.constructor.name);
},
assertContains: function (value, searchValue, caseInsensitive) {
const originalValue = value;
const originalSearchValue = searchValue;
assert.strictEqual(typeof value, 'string');
const message = 'String: "%s" does not contain "%s"';
if (caseInsensitive !== false) {
value = value.toLowerCase();
searchValue = searchValue.toLowerCase();
}
assert.ok(value.indexOf(searchValue) >= 0, util.format(message, originalValue, originalSearchValue));
},
/**
* Asserts that the value has some properties defined and the value of those properties
* @param {Object} value
* @param {Object} expectedProperties
* @param {Boolean} [strictEquality=true]
*/
assertProperties: (value, expectedProperties, strictEquality) => {
const properties = Object.keys(expectedProperties);
if (properties.length === 0) {
throw new Error('expectedProperties should be defined as an object');
}
assert.ok(value, 'value should be defined');
const assertFn = strictEquality !== false ? assert.strictEqual : assert.equal;
properties.forEach(key => assertFn(value[key], expectedProperties[key]));
},
/**
* Tests for deep equality of Maps between the actual and expected parameters
* @param actual
* @param expected
*/
assertMapEqual: (actual, expected) => {
helper.assertInstanceOf(actual, Map);
helper.assertInstanceOf(expected, Map);
assert.deepStrictEqual(Array.from(actual.keys()), Array.from(expected.keys()), 'Map keys do not match ');
expected.forEach((value, key) => {
assert.deepStrictEqual(actual.get(key), value, `Value for '${key}' does not match`);
});
},
/**
* Asserts promise gets rejected with the provided error
*/
assertThrowsAsync: async (promise, type, message) => {
let err;
try {
await promise;
} catch (e) {
err = e;
}
assert.instanceOf(err, Error);
if (type) {
assert.instanceOf(err, type);
}
if (message) {
let re = message;
if (typeof message === 'string') {
re = new RegExp(message);
}
assert.match(err.message, re);
}
return err;
},
/**
* Invokes the provided function once after the calling test finished.
* @param {Function} fn
*/
afterThisTest: function(fn) {
afterNextHandlers.push(fn);
},
/**
* Invokes client.shutdown() after this test finishes.
* @param {Client} client
* @returns {Client}
*/
shutdownAfterThisTest: function(client) {
this.afterThisTest(() => client.shutdown());
return client;
},
/**
* Returns a function that waits on schema agreement before executing callback
* @param {Client} client
* @param {Function} callback
* @returns {Function}
*/
waitSchema: function (client, callback) {
return (function (err) {
if (err) {
return callback(err);
}
if (!client.hosts) {
throw new Error('No hosts on Client');
}
if (client.hosts.length === 1) {
return callback();
}
setTimeout(callback, 200 * client.hosts.length);
});
},
/**
* @returns {Function} A function with a single callback param, applying the fn with parameters
*/
toTask: function (fn, context) {
const params = Array.prototype.slice.call(arguments, 2);
return (function (next) {
params.push(next);
fn.apply(context, params);
});
},
waitCallback: function (ms, callback) {
if (!ms) {
ms = 0;
}
return (function (err) {
if (err) {
return callback(err);
}
setTimeout(callback, ms);
});
},
/**
* Gets the Apache Cassandra version.
* When the server is DSE/HCD, gets the Apache Cassandra equivalent.
*/
getCassandraVersion: function () {
const serverInfo = this.getServerInfo();
if (serverInfo.distribution === 'cassandra') {
return serverInfo.version;
}
const literalVersion = serverInfo.version.split('.').slice(0, 2).join('.');
if (serverInfo.distribution === 'hcd') {
return cassandraVersionByHcd[literalVersion] || cassandraVersionByHcd['1.0'];
}
if (serverInfo.distribution === 'dse') {
return cassandraVersionByDse[literalVersion] || cassandraVersionByDse['6.7'];
}
throw new Error('Unknown distribution ' + serverInfo.distribution);
},
/**
* Gets the server version and type.
* @return {{version: String, isDse: Boolean, isHcd: Boolean, distribution: String}}
*/
getServerInfo: function () {
return {
version: process.env['CCM_VERSION'] || '3.11.4',
isDse: process.env['CCM_DISTRIBUTION'] === 'dse',
isHcd: process.env['CCM_DISTRIBUTION'] === 'hcd',
distribution: process.env['CCM_DISTRIBUTION'] || 'cassandra'
};
},
getSimulatedCassandraVersion: function() {
let version = this.getCassandraVersion();
// simulacron does not support protocol V2 and V1, so cap at 2.1.
if (version < '2.1') {
version = '2.1.19';
} else if (version >= '4.0') {
// simulacron does not support protocol V5, so cap at 3.11
version = '3.11.2';
}
return version;
},
/**
* Determines if the current server is a DSE instance *AND* version is greater than or equals to the version provided
* @param {String} version The version in string format, dot separated.
* @returns {Boolean}
*/
isDseGreaterThan: function (version) {
const serverInfo = this.getServerInfo();
if (!serverInfo.isDse) {
return false;
}
return helper.versionCompare(serverInfo.version, version);
},
/** Determines if the current server is a DSE instance. */
isDse: function () {
return this.getServerInfo().isDse;
},
/** Determines if the current server is a HCD instance. */
isHcd: function () {
return this.getServerInfo().isHcd;
},
/**
* Determines if the current C* or DSE instance version is greater than or equals to the C* version provided
* @param {String} version The version in string format, dot separated.
* @returns {Boolean}
*/
isCassandraGreaterThan: function (version) {
return helper.versionCompare(helper.getCassandraVersion(), version);
},
versionCompare: function (instanceVersionStr, version) {
let expected = [1, 0]; //greater than or equals to
if (version.indexOf('<=') === 0) {
version = version.substr(2);
expected = [-1, 0]; //less than or equals to
}
else if (version.indexOf('<') === 0) {
version = version.substr(1);
expected = [-1]; //less than
}
const instanceVersion = instanceVersionStr.split('.').map(function (x) { return parseInt(x, 10);});
const compareVersion = version.split('.').map(function (x) { return parseInt(x, 10) || 0;});
for (let i = 0; i < compareVersion.length; i++) {
const compare = compareVersion[i] || 0;
if (instanceVersion[i] > compare) {
//is greater
return expected.indexOf(1) >= 0;
}
else if (instanceVersion[i] < compare) {
//is smaller
return expected.indexOf(-1) >= 0;
}
}
//are equal
return expected.indexOf(0) >= 0;
},
log: function(levels) {
if (!levels) {
levels = ['info', 'warning', 'error'];
}
return (function (l) {
if (levels.indexOf(l) >= 0) {
// eslint-disable-next-line no-console, no-undef
console.log.apply(console, arguments);
}
});
},
/**
* @returns {Array}
*/
fillArray: function (length, val) {
const result = new Array(length);
for (let i = 0; i < length; i++) {
result[i] = val;
}
return result;
},
/**
* @returns {Array}
*/
iteratorToArray: function (iterator) {
const result = [];
let item = iterator.next();
while (!item.done) {
result.push(item.value);
item = iterator.next();
}
return result;
},
/** @returns {Promise<Array} */
asyncIteratorToArray: async function (iterable) {
const result = [];
const iterator = iterable[Symbol.asyncIterator]();
while (true) {
const item = await iterator.next();
if (item.done) {
break;
}
const length = result.push(item.value);
if (length > 1000) {
throw new Error('Unexpected never ending async iterator');
}
}
return result;
},
/**
* @param arr
* @param {Function|String} predicate function to compare or property name to compare
* @param val
* @returns {*}
*/
find: function (arr, predicate, val) {
if (arr == null) {
throw new TypeError('Array.prototype.find called on null or undefined');
}
if (typeof predicate === 'string') {
const propName = predicate;
predicate = function (item) {
return (item && item[propName] === val);
};
}
if (typeof predicate !== 'function') {
throw new TypeError('predicate must be a function');
}
let value;
for (let i = 0; i < arr.length; i++) {
value = arr[i];
if (predicate.call(null, value, i, arr)) {
return value;
}
}
return undefined;
},
/**
* @param {Array} arr
* @param {Function }predicate
*/
first: function (arr, predicate) {
const filterArr = arr.filter(predicate);
if (filterArr.length === 0) {
throw new Error('Item not found: ' + predicate);
}
return filterArr[0];
},
Map: MapPolyFill,
Set: SetPolyFill,
AllowListPolicy: AllowListPolicy,
FallthroughRetryPolicy: FallthroughRetryPolicy,
/**
* Determines if test tracing is enabled
*/
isTracing: function () {
return (process.env.TEST_TRACE === 'on');
},
trace: function (format) {
if (!helper.isTracing()) {
return;
}
// eslint-disable-next-line no-console, no-undef
console.log('\t...' + util.format.apply(null, arguments));
},
/**
* Version dependent it() method for mocha test case
* @param {String} testVersion Minimum version of Cassandra needed for this test
* @param {String} testCase Test case name
* @param {Function} func
*/
vit: function (testVersion, testCase, func) {
executeIfVersion(testVersion, it, [testCase, func]);
},
/**
* Version dependent describe() method for mocha test case
* @param {String} testVersion Minimum version of DSE/Cassandra needed for this test
* @param {String} title Title of the describe section.
* @param {Function} func
*/
vdescribe: function (testVersion, title, func) {
executeIfVersion(testVersion, describe, [title, func]);
},
/**
* Given a {Host} returns the last octet of its ip address.
* i.e. (127.0.0.247:9042) -> 247.
*
* @param {Host|string} host or host address to get ip address of.
* @returns {string} Last octet of the host address.
*/
lastOctetOf: function(host) {
const address = typeof host === "string" ? host : host.address;
const ipAddress = address.split(':')[0].split('.');
return ipAddress[ipAddress.length-1];
},
/**
* Given a {Client} and a {Number} returns the host whose last octet
* ends with the requested number.
* @param {Client|HostMap|Array<Host>} hostsOrClient Client to lookup hosts from.
* @param {Number} number last octet of requested host.
* @param {Boolean} [throwWhenNotFound] Determines whether this method should throw an error when the host
* can not be found.
* @returns {Host}
*/
findHost: function(hostsOrClient, number, throwWhenNotFound) {
let hostArray;
if (hostsOrClient instanceof HostMap) {
hostArray = hostsOrClient.values();
} else if (hostsOrClient instanceof Client) {
hostArray = hostsOrClient.hosts.values();
} else if (Array.isArray(hostsOrClient)) {
hostArray = hostsOrClient;
} else {
throw new Error('First parameter must be the host array/map or the Client');
}
const host = hostArray.find(h => +this.lastOctetOf(h) === +number);
if (throwWhenNotFound && !host) {
throw new Error(`Host ${number} not found`);
}
return host;
},
/**
* Provides utilities to asynchronously wait on conditions.
*/
wait: {
until: async function(condition, maxAttempts = 500, delay = 20) {
for (let i = 0; i <= maxAttempts; i++) {
// Condition can be both sync or async
const c = await condition();
if (!c) {
if (i === maxAttempts) {
throw new Error(`Condition still false after ${maxAttempts * delay}ms: ${condition.toString()}`);
}
await helper.delayAsync(delay);
} else {
break;
}
}
},
forNodeUp: async function (hostsOrClient, lastOctet, maxAttempts = 500, delay = 20) {
const host = helper.findHost(hostsOrClient, lastOctet, true);
await this.until(() => host.isUp(), maxAttempts, delay);
},
forAllNodesUp: async function (client, maxAttempts = 500, delay = 20) {
await this.until(() => !client.hosts.values().find(h => !h.isUp()), maxAttempts, delay);
},
forAllNodesDown: async function (client, maxAttempts = 500, delay = 20) {
await this.until(() => !client.hosts.values().find(h => h.isUp()), maxAttempts, delay);
},
forNodeDown: async function (hostsOrClient, lastOctet, maxAttempts = 500, delay = 20) {
const host = helper.findHost(hostsOrClient, lastOctet, true);
await this.until(() => !host.isUp(), maxAttempts, delay);
},
forNodeToBeAdded: async function (hostsOrClient, lastOctet, maxAttempts = 1000, delay = 20) {
await this.until(() => helper.findHost(hostsOrClient, lastOctet), maxAttempts, delay);
},
forNodeToBeRemoved: async function (hostsOrClient, lastOctet, maxAttempts = 1000, delay = 20) {
await this.until(() => !helper.findHost(hostsOrClient, lastOctet), maxAttempts, delay);
}
},
/**
* Returns a function, that when invoked shutdowns the client and callbacks
* @param {Client} client
* @param {Function} callback
* @returns {Function}
*/
finish: function (client, callback) {
return (function onFinish(err) {
client.shutdown(function () {
assert.ifError(err);
callback();
});
});
},
/**
* Returns a handler that executes multiple queries
* @param {Client} client
* @param {Array<string>} queries
*/
executeTask: function (client, queries) {
return (function (done) {
utils.series([
client.connect.bind(client),
function executeQueries(next) {
utils.eachSeries(queries, function (query, eachNext) {
client.execute(query, eachNext);
}, next);
}
], helper.finish(client, done));
});
},
/**
* Executes a function at regular intervals while the condition is false and the amount of attempts < maxAttempts.
* @param {Function} condition
* @param {Number} delay
* @param {Number} maxAttempts
* @param {Function} done
*/
setIntervalUntil: function (condition, delay, maxAttempts, done) {
let attempts = 0;
utils.whilst(
function whilstCondition() {
return !condition();
},
function whilstItem(next) {
if (attempts++ >= maxAttempts) {
return next(new Error(util.format('Condition still false after %d attempts: %s', maxAttempts, condition)));
}
setTimeout(next, delay);
},
done);
},
/**
* Returns a method that executes a function at regular intervals while the condition is false and the amount of
* attempts < maxAttempts.
* @param {Function} condition
* @param {Number} delay
* @param {Number} maxAttempts
*/
setIntervalUntilTask: function (condition, delay, maxAttempts) {
const self = this;
return (function setIntervalUntilHandler(done) {
self.setIntervalUntil(condition, delay, maxAttempts, done);
});
},
/**
* Executes a function at regular intervals while the condition is false and the amount of attempts < maxAttempts.
* @param {Function} condition
* @param {Number} delay
* @param {Number} maxAttempts
*/
setIntervalUntilPromise: function (condition, delay, maxAttempts) {
return new Promise((resolve, reject) => {
this.setIntervalUntil(condition, delay, maxAttempts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
},
/**
* Returns a method that delays invoking the callback
*/
delay: function (delayMs) {
return (function delayedHandler(next) {
setTimeout(next, delayMs);
});
},
delayAsync: (delayMs) => promiseUtils.delay(delayMs),
queries: {
basic: "SELECT key FROM system.local",
basicNoResults: "SELECT key from system.local WHERE key = 'not_existent'"
},
getPoolingOptions: function (localLength, remoteLength, heartBeatInterval) {
const pooling = {
heartBeatInterval: heartBeatInterval || 0,
coreConnectionsPerHost: {}
};
pooling.coreConnectionsPerHost[types.distance.local] = localLength || 1;
pooling.coreConnectionsPerHost[types.distance.remote] = remoteLength || 1;
pooling.coreConnectionsPerHost[types.distance.ignored] = 0;
return pooling;
},
getHostsMock: function (hostsInfo, prepareQueryCb, sendStreamCb, protocolVersion) {
return hostsInfo.map(function (info, index) {
protocolVersion = protocolVersion || types.protocolVersion.maxSupported;
const h = new Host(index.toString(), protocolVersion, defaultOptions(), {});
h.isUp = function () {
return !(info.isUp === false);
};
h.checkHealth = utils.noop;
h.log = utils.noop;
h.shouldBeIgnored = !!info.ignored;
h.prepareCalled = 0;
h.sendStreamCalled = 0;
h.connectionKeyspace = [];
h.borrowConnection = function () {
if (!h.isUp() || h.shouldBeIgnored) {
throw new Error('This host should not be used');
}
return ({
protocolVersion: protocolVersion,
keyspace: 'ks',
changeKeyspace: (keyspace) => {
this.keyspace = keyspace;
h.connectionKeyspace.push(keyspace);
return Promise.resolve();
},
prepareOnce: function (q, ks, cb) {
h.prepareCalled++;
if (prepareQueryCb) {
return prepareQueryCb(q, h, cb);
}
cb(null, { id: 1, meta: {} });
},
sendStream: function (r, o, cb) {
h.sendStreamCalled++;
if (sendStreamCb) {
return sendStreamCb(r, h, cb);
}
const op = new OperationState(r, o, cb);
setImmediate(function () {
op.setResult(null, {});
});
return op;
},
prepareOnceAsync: function (q, ks) {
return new Promise((resolve, reject) => {
h.prepareCalled++;
if (prepareQueryCb) {
return prepareQueryCb(q, h, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
}
resolve({ id: 1, meta: {} });
});
}
});
};
return sinon.spy(h);
});
},
getLoadBalancingPolicyFake: function getLoadBalancingPolicyFake(hostsInfo, prepareQueryCb, sendStreamCb, protocolVersion) {
const hosts = this.getHostsMock(hostsInfo, prepareQueryCb, sendStreamCb, protocolVersion);
const fake = {
newQueryPlan: function (q, ks, cb) {
cb(null, utils.arrayIterator(hosts));
},
/**
* Returns the array of hosts used in the query plan.
*/
getFixedQueryPlan: function () {
return hosts;
},
getDistance: function () {
return types.distance.local;
},
/**
* Shutdowns the hosts and invoke the optional callback.
*/
shutdown: function (cb) {
hosts.forEach(h => h.shutdown(false));
if (cb) {
cb();
}
}
};
helper.afterThisTest(() => fake.shutdown());
return fake;
},
/**
* Returns true if the tests are being run on Windows
* @returns {boolean}
*/
isWin: function () {
return process.platform.indexOf('win') === 0;
},
/**
* Invokes a function multiple times and returns a Promise that is resolved when all the promises have completed.
* @param {Number} times
* @param {Function} fn
* @returns {Promise}
*/
repeat: function(times, fn) {
const arr = new Array(times);
for (let i = 0; i < times; i++) {
arr[i] = fn(i);
}
return Promise.all(arr);
},
requireOptional: function (moduleName) {
try {
// eslint-disable-next-line
return require(moduleName);
}
catch (err) {
if (err.code === 'MODULE_NOT_FOUND') {
return null;
}
throw err;
}
},
assertBufferString: function (instance, textValue) {
this.assertInstanceOf(instance, Buffer);
assert.strictEqual(instance.toString(), textValue);
},
conditionalDescribe: function (condition, text) {
if (condition) {
return describe;
}
return (function xdescribeWithText(name, fn) {
return xdescribe(util.format('%s [%s]', name, text), fn);
});
},
getOptions: function (options) {
return utils.extend({}, helper.baseOptions, options);
},
/**
* @param {ResultSet} result
*/
keyedById: function (result) {
const map = {};
const columnKeys = result.columns.map(function (c) { return c.name;});
if (columnKeys.indexOf('id') < 0 || columnKeys.indexOf('value') < 0) {
throw new Error('ResultSet must contain the columns id and value');
}
result.rows.forEach(function (row) {
map[row['id']] = row['value'];
});
return map;
},
/**
* Connects to the cluster, makes a few queries and shutsdown the client
* @param {Client} client
* @param {Function} callback
*/
connectAndQuery: function (client, callback) {
const self = this;
utils.series([
client.connect.bind(client),
function doSomeQueries(next) {
utils.timesSeries(10, function (n, timesNext) {
client.execute(self.queries.basic, timesNext);
}, next);
},
client.shutdown.bind(client)
], callback);
},
/**
*
* @param {Array.<String>} yamlToFix
*/
fixYaml : function (yamlToFix) {
if (helper.isCassandraGreaterThan("4.1.0")) {
// fix the yaml options that turned obsolete since 4.1.0
yamlToFix = yamlToFix.map(keyValue => {
const [key, value] = keyValue.split(':');
const a = /^(\w+)_in_ms$/.exec(key);
if (a) {
return `${a[1]}:${value}ms`;
}
const b = /^(\w+)_in_kb$/.exec(key);
if (b) {
return `${b[1]}:${value}KiB`;
}
const c = /enable_(\w+)$/.exec(key);
if (c) {
return `${c[1]}_enabled:${value}`;
}
return keyValue;
});
}
return yamlToFix;
},
/**
* Makes a http request and returns the body.
* @param {{host, port, path}} requestOptions
* @returns {Promise<string>}
*/
makeWebRequest: function(requestOptions) {
const timeoutMs = 500;
return new Promise((resolve, reject) => {
const req = http.get(Object.assign({ timeout: timeoutMs }, requestOptions), res => {
let data = '';
res
.on('data', chunk => data += chunk.toString())
.on('end', () => {
if (res.statusCode !== 200) {
return reject(new Error(`Obtained http status ${res.statusCode}`));
}
resolve(data);
});
});
req.on('error', err => reject(err));
req.setTimeout(timeoutMs, () => req.abort());
});
}
};
/**
* A polyfill of Map, valid for testing. It does not support update of values
* @constructor
*/
function MapPolyFill(arr) {
this.arr = arr || [];
const self = this;
Object.defineProperty(this, 'size', {
get: function() { return self.arr.length; },
configurable: false
});
}
MapPolyFill.prototype.set = function (k, v) {
this.arr.push([k, v]);
};
MapPolyFill.prototype.get = function (k) {
return this.arr.filter(function (item) {
return item[0] === k;
})[0];
};
MapPolyFill.prototype.forEach = function (callback) {
this.arr.forEach(function (item) {
//first the value, then the key
callback(item[1], item[0]);
});
};
MapPolyFill.prototype.toString = function() {
return this.arr.toString();
};
function SetPolyFill(arr) {
this.arr = arr || [];
}
SetPolyFill.prototype.forEach = function (cb, thisArg) {
this.arr.forEach(cb, thisArg);
};
SetPolyFill.prototype.add = function (x) {
this.arr.push(x);
};
SetPolyFill.prototype.toString = function() {
return this.arr.toString();
};
// Core driver used ccmHelper
helper.ccmHelper = helper.ccm;
/**
* Removes previous and creates a new cluster (create, populate and start)
* @param {Number|String} nodeLength number of nodes in the cluster. If multiple dcs, use the notation x:y:z:...
* @param {{[vnodes]: Boolean, [yaml]: Array.<String>, [jvmArgs]: Array.<String>, [ssl]: Boolean,
* [dseYaml]: Array.<String>, [workloads]: Array.<String>, [sleep]: Number, [ipFormat]: String|null, partitioner: String}} options
* @param {Function} callback
*/
helper.ccm.startAll = function (nodeLength, options, callback) {
const self = helper.ccm;
options = options || {};
// adapt to multi dc format so data center naming is consistent.
if (typeof nodeLength === 'number') {
nodeLength = nodeLength + ':0';
}
const serverInfo = helper.getServerInfo();
helper.trace(`Starting ${serverInfo.isDse ? 'DSE' : 'Cassandra'} cluster v${serverInfo.version} with ${nodeLength} node(s)`);
utils.series([
function (next) {
//it wont hurt to remove
self.exec(['remove'], function () {
//ignore error
next();
});
},
function (next) {
const clusterName = helper.getRandomName('test');
let create = ['create', clusterName];
if (serverInfo.isDse) {
create.push('--dse');
}
if(serverInfo.isHcd) {
create.push('--hcd');
}
create.push('-v', serverInfo.version);
if (process.env['CCM_INSTALL_DIR']) {
create = ['create', clusterName, '--install-dir=' + process.env['CCM_INSTALL_DIR']];
helper.trace('With', create[2]);
}
if (options.ssl) {
create.push('--ssl', self.getPath('ssl'));
}
if (options.partitioner) {
create.push('-p');
create.push(options.partitioner);
}
self.exec(create, helper.waitCallback(options.sleep, next));
},
function (next) {
const populate = ['populate', '-n', nodeLength.toString()];
if (options.vnodes) {
populate.push('--vnodes');
}
if (options.ipFormat) {
populate.push('--ip-format='+ options.ipFormat);
}
self.exec(populate, helper.waitCallback(options.sleep, next));
},
function (next) {
if (!options.yaml || !options.yaml.length) {
return next();
}
options.yaml = helper.fixYaml(options.yaml);
helper.trace('With cassandra yaml options', options.yaml);
self.exec(['updateconf'].concat(options.yaml), next);
},
function (next) {
if (!options.dseYaml || !options.dseYaml.length) {
return next();
}
helper.trace('With dse yaml options', options.dseYaml);
self.exec(['updatedseconf'].concat(options.dseYaml), next);
},
function (next) {
if (!options.workloads || !options.workloads.length) {
return next();
}
helper.trace('With workloads', options.workloads);
self.exec(['setworkload', options.workloads.join(',')], next);
},
function (next) {
const start = ['start', '--wait-for-binary-proto'];
if (helper.isWin() && helper.isCassandraGreaterThan('2.2.4')) {
start.push('--quiet-windows');
}
if (Array.isArray(options.jvmArgs)) {
options.jvmArgs.forEach(function (arg) {
start.push('--jvm_arg', arg);
}, this);
helper.trace('With jvm args', options.jvmArgs);
}
self.exec(start, helper.waitCallback(options.sleep, next));
},
self.waitForUp.bind(self)
], function (err) {
callback(err);
});
};
helper.ccm.start = function (nodeLength, options) {
return (function executeStartAll(next) {
helper.ccm.startAll(nodeLength, options, next);
});
};
/**
* Adds a new node to the cluster
* @param {number|{nodeIndex: number, dc?: string}} options 1 based index of the node or options.
* @param {Function} callback
*/
helper.ccm.bootstrapNode = function (options, callback) {
if (typeof options === 'number') {
options = { nodeIndex: options };
}
const ipPrefix = helper.ipPrefix;
helper.trace('bootstrapping node', options.nodeIndex);
const ccmArgs = [
'add',
'node' + options.nodeIndex,
'-i',
ipPrefix + options.nodeIndex,
'-j',
(7000 + 100 * options.nodeIndex).toString(),
'-b'
];
if (helper.getServerInfo().isDse) {
ccmArgs.push('--dse');
}
if (helper.getServerInfo().isHcd) {
ccmArgs.push('--hcd');
}
if (options.dc) {
ccmArgs.push('-d', options.dc);
}
helper.ccm.exec(ccmArgs, callback);
};
helper.ccm.decommissionNode = function (nodeIndex, callback) {
helper.trace('decommissioning node', nodeIndex);
const args = ['node' + nodeIndex, 'decommission'];
// Special case for C* 3.12+, DSE 5.1+, force decommission (see CASSANDRA-12510)
if (helper.isDseGreaterThan('5.1')) {
args.push('--force');
}
helper.ccm.exec(args, callback);
};
/**
* Sets the workload(s) for a given node.
* @param {Number} nodeIndex 1 based index of the node
* @param {Array<String>} workloads workloads to set.
* @param {Function} callback
*/
helper.ccm.setWorkload = function (nodeIndex, workloads, callback) {
helper.trace('node', nodeIndex, 'with workloads', workloads);
helper.ccm.exec([
'node' + nodeIndex,
'setworkload',
workloads.join(',')
], callback);
};
/**
* @param {Number} nodeIndex 1 based index of the node
* @param {Function} callback
*/
helper.ccm.startNode = function (nodeIndex, callback) {
const args = ['node' + nodeIndex, 'start', '--wait-for-binary-proto'];
if (helper.isWin() && helper.isCassandraGreaterThan('2.2.4')) {
args.push('--quiet-windows');
}
helper.ccm.exec(args, callback);
};
/**
* @param {Number} nodeIndex 1 based index of the node
* @param {Function} callback
*/
helper.ccm.stopNode = function (nodeIndex, callback) {
helper.ccm.exec(['node' + nodeIndex, 'stop'], callback);
};
helper.ccm.pauseNode = function (nodeIndex, callback) {
helper.ccm.exec(['node' + nodeIndex, 'pause'], callback);
};
helper.ccm.resumeNode = function (nodeIndex, callback) {
helper.ccm.exec(['node' + nodeIndex, 'resume'], callback);
};
helper.ccm.exec = function (params, callback) {
helper.ccm.spawn('ccm', params, callback);
};
helper.ccm.spawn = function (processName, params, callback) {
if (!callback) {
callback = function () {};
}
params = params || [];
const originalProcessName = processName;
if (process.platform.indexOf('win') === 0) {
params = ['/c', processName].concat(params);
processName = 'cmd.exe';
}
const p = spawn(processName, params);
const stdoutArray= [];
const stderrArray= [];
let closing = 0;
p.stdout.setEncoding('utf8');
p.stderr.setEncoding('utf8');
p.stdout.on('data', function (data) {
stdoutArray.push(data);
});
p.stderr.on('data', function (data) {
stderrArray.push(data);
});
p.on('close', function (code) {
if (closing++ > 0) {
//avoid calling multiple times
return;
}
const info = {code: code, stdout: stdoutArray, stderr: stderrArray};
let err = null;
if (code !== 0) {
err = new Error(
'Error executing ' + originalProcessName + ':\n' +
info.stderr.join('\n') +
info.stdout.join('\n')
);
err.info = info;
}
callback(err, info);
});
};
helper.ccm.remove = function (callback) {
helper.ccm.exec(['remove'], callback);
};
helper.ccm.removeIfAny = function (callback) {
helper.ccm.exec(['remove'], function () {
// Ignore errors
callback();
});
};
/**
* Reads the logs to see if the cql protocol is up
* @param callback
*/
helper.ccm.waitForUp = function (callback) {
let started = false;
let retryCount = 0;
const self = helper.ccm;
utils.whilst(function () {
return !started && retryCount < 60;
}, function iterator (next) {
self.exec(['node1', 'showlog'], function (err, info) {
if (err) {
return next(err);
}
const regex = /Starting listening for CQL clients/mi;
started = regex.test(info.stdout.join(''));
retryCount++;
if (!started) {
//wait 1 sec between retries
return setTimeout(next, 1000);
}
return next();
});
}, callback);
};
/**
* Gets the path of the ccm
* @param subPath
*/
helper.ccm.getPath = function (subPath) {
let ccmPath = process.env.CCM_PATH;
if (!ccmPath) {
ccmPath = (process.platform === 'win32') ? process.env.HOMEPATH : process.env.HOME;
ccmPath = path.join(ccmPath, 'workspace/tools/ccm');
}
return path.join(ccmPath, subPath);
};
helper.ads._spawnAndWait = function(processName, params, cb) {
if (!cb) {
throw new Error("Callback is required");
}
const originalProcessName = processName;
if (process.platform.indexOf('win') === 0) {
params = ['/c', processName].concat(params);
processName = 'cmd.exe';
}
helper.trace('Executing: ' + processName + ' ' + params.join(" "));
// eslint-disable-next-line prefer-const
let timeout;
const callbackOnce = (err) => {
if (timeout) {
clearTimeout(timeout);
}
cb(err);
cb = utils.noop;
};
// If process hasn't completed in 10 seconds.
timeout = setTimeout(function() {
callbackOnce(new Error("Timed out while waiting for " + processName + " to complete."));
}, 10000);
const p = spawn(processName, params, {
env: Object.assign({ KRB5_CONFIG: this.getKrb5ConfigPath()}, process.env)
});
p.stdout.on('data', function (data) {
helper.trace("%s_out> %s", originalProcessName, data);
if(data.indexOf('Principal Initialization Complete.') !== -1) {
callbackOnce();
}
});
p.stderr.on('data', function (data) {
helper.trace("%s_err> %s", originalProcessName, data);
});
p.on('error', function (err) {
helper.trace('Sub-process emitted error', processName, err);
callbackOnce(err);
});
p.on('close', function (code) {
helper.trace("%s exited with code %d", originalProcessName, code);
clearTimeout(timeout);
if (code !== 0) {
callbackOnce(new Error("Process exited with non-zero exit code: " + code));
}
});
return p;
};
/**
* Starts the embedded-ads jar with ldap (port 10389) and kerberos enabled (port 10088). Depends on ADS_JAR
* environment variable to resolve the absolute file path of the embedded-ads jar.
*
* @param {Function} cb Callback to invoke when server is started and listening.
*/
helper.ads.start = function(cb) {
const self = this;
temp.mkdir('ads', function(err, dir) {
if(err) {
cb(err);
}
self.dir = dir;
const jarFile = self.getJar();
const params = ['-jar', jarFile, '-k', '--confdir', self.dir];
self.process = self._spawnAndWait('java', params, function(err) {
if (!err) {
// Set KRB5_CONFIG environment variable so kerberos module knows to use it.
process.env.KRB5_CONFIG = self.getKrb5ConfigPath();
}
cb(err);
});
});
};
/**
* Invokes a klist to list the current registered tickets and their expiration if trace is enabled.
*
* This is really only useful for debugging.
*
* @param {Function} cb Callback to invoke on completion.
*/
helper.ads.listTickets = function(cb) {
this._exec('klist', [], cb);
};
/**
* Acquires a ticket for the given username and its principal.
* @param {String} username Username to acquire ticket for (i.e. cassandra).
* @param {String} principal Principal to acquire ticket for (i.e. cassandra@DATASTAX.COM).
* @param {Function} cb Callback to invoke on completion.
*/
helper.ads.acquireTicket = function(username, principal, cb) {
helper.trace('Acquiring ticket');
const keytab = this.getKeytabPath(username);
// Use ktutil on windows, kinit otherwise.
const processName = 'kinit';
const params = ['-t', keytab, '-k', principal];
if (process.platform.indexOf('win') === 0) {
// Not really sure what to do here yet...
}
this._exec(processName, params, cb);
};
/**
* Destroys all tickets for the given principal.
* @param {String} principal Principal for whom its tickets will be destroyed (i.e. dse/127.0.0.1@DATASTAX.COM).
* @param {Function} cb Callback to invoke on completion.
*/
helper.ads.destroyTicket = function(principal, cb) {
if (typeof principal === 'function') {
//noinspection JSValidateTypes
cb = principal;
principal = null;
}
// Use ktutil on windows, kdestroy otherwise.
const processName = 'kdestroy';
const params = [];
if (process.platform.indexOf('win') === 0) {
// Not really sure what to do here yet...
}
this._exec(processName, params, cb);
};
helper.ads._exec = function(processName, params, callback) {
if (params.length === 0) {
childProcessExec(processName, callback);
return;
}
childProcessExec(`${processName} ${params.join(' ')}`, callback);
};
/**
* Stops the server process.
* @param {Function} cb Callback to invoke when server stopped or with an error.
*/
helper.ads.stop = function(cb) {
if(this.process !== undefined) {
if(this.process.exitCode) {
helper.trace("Server already stopped with exit code %d.", this.process.exitCode);
cb();
} else {
this.process.on('close', function () {
cb();
});
this.process.on('error', cb);
this.process.kill('SIGINT');
}
} else {
cb(Error("Process is not defined."));
}
};
/**
* Gets the path of the embedded-ads jar. Resolved from ADS_JAR environment variable or $HOME/embedded-ads.jar.
*/
helper.ads.getJar = function () {
let adsJar = process.env.ADS_JAR;
if (!adsJar) {
helper.trace("ADS_JAR environment variable not set, using $HOME/embedded-ads.jar");
adsJar = (process.platform === 'win32') ? process.env.HOMEPATH : process.env.HOME;
adsJar = path.join(adsJar, 'embedded-ads.jar');
}
helper.trace("Using %s for embedded ADS server.", adsJar);
return adsJar;
};
/**
* Returns the file path to the keytab for the given user.
* @param {String} username User to resolve keytab for.
*/
helper.ads.getKeytabPath = function(username) {
return path.join(this.dir, username + ".keytab");
};
/**
* Returns the file path to the krb5.conf file generated by ads.
*/
helper.ads.getKrb5ConfigPath = function() {
return path.join(this.dir, 'krb5.conf');
};
/**
* @type {Array<{subtypeString : string, typeInfo: import('../lib/encoder').VectorColumnInfo, value: Array}>}
*/
const dataProvider = [
{
subtypeString: 'float',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.float }, 3]
},
value: [1.1122000217437744, 2.212209939956665, 3.3999900817871094]
},
{
subtypeString: 'double',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.double }, 3]
},
value: [1.1, 2.2, 3.3]
},
{
subtypeString: 'varchar',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.text }, 3]
},
value: ['ab', 'b', 'cde']
},
{
subtypeString: 'bigint',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.bigint }, 3]
},
value: [new types.Long(1), new types.Long(2), new types.Long(3)]
},
{
subtypeString: 'blob',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.blob }, 3]
},
value: [Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6]), Buffer.from([7, 8, 9])]
},
{
subtypeString: 'boolean',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.boolean }, 3]
},
value: [true, false, true]
},
{
subtypeString: 'decimal',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.decimal }, 3]
},
value: [types.BigDecimal.fromString('1.1'), types.BigDecimal.fromString('2.2'), types.BigDecimal.fromString('3.3')]
},
{
subtypeString: 'inet',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.inet }, 3]
},
value: [types.InetAddress.fromString('127.0.0.1'), types.InetAddress.fromString('0.0.0.0'), types.InetAddress.fromString('34.12.10.19')]
},
{
subtypeString: 'tinyint',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.tinyint }, 3]
},
value: [1, 2, 3]
},
{
subtypeString: 'smallint',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.smallint }, 3]
},
value: [1, 2, 3]
},
{
subtypeString: 'int',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.int }, 3]
},
value: [-1, 0, -3]
},
{
subtypeString: 'duration',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.custom,
info: 'org.apache.cassandra.db.marshal.DurationType'
},3],
customTypeName: 'vector',
},
value: [new types.Duration(1, 2, 3), new types.Duration(4, 5, 6), new types.Duration(7, 8, 9)]
},
{
subtypeString: 'date',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.date }, 3]
},
value: [new types.LocalDate(2020, 1, 1), new types.LocalDate(2020, 2, 1), new types.LocalDate(2020, 3, 1)]
},
{
subtypeString: 'time',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.time }, 3]
},
value: [new types.LocalTime(types.Long.fromString('6331999999911')), new types.LocalTime(types.Long.fromString('6331999999911')), new types.LocalTime(types.Long.fromString('6331999999911'))]
},
{
subtypeString: 'timestamp',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.timestamp }, 3]
},
value: [new Date(2020, 1, 1, 1, 1, 1, 1), new Date(2020, 2, 1, 1, 1, 1, 1), new Date(2020, 3, 1, 1, 1, 1, 1)]
},
{
subtypeString: 'uuid',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.uuid }, 3]
},
value: [types.Uuid.random(), types.Uuid.random(), types.Uuid.random()]
},
{
subtypeString: 'timeuuid',
typeInfo: {
code: types.dataTypes.custom,
customTypeName: 'vector',
info: [{ code: types.dataTypes.timeuuid }, 3]
},
value: [types.TimeUuid.now(), types.TimeUuid.now(), types.TimeUuid.now()]
}
];
helper.dataProvider = dataProvider;
const dataProviderWithCollections = dataProvider.flatMap(data => [
data,
// vector<list<subtype>, 3>
{
subtypeString: 'list<' + data.subtypeString + '>',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.list,
info: {
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
}
},3],
customTypeName: 'vector',
},
value: data.value.map(value => [value, value, value])
},
// vector<map<int, subtype>, 3>
{
subtypeString: 'map<int, ' + data.subtypeString + '>',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.map,
info: [
{ code: types.dataTypes.int },
{
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
}
]
},3],
customTypeName: 'vector'
},
value: data.value.map((value) => ({ 1: value, 2: value, 3: value }))
},
// vector<set<subtype>, 3>
{
subtypeString: 'set<' + data.subtypeString + '>',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.set,
info: {
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
}
},3],
customTypeName: 'vector',
},
value: data.value.map(value => [value, value, value])
},
// vector<tuple<subtype, subtype>, 3>
{
subtypeString: 'tuple<' + data.subtypeString + ', ' + data.subtypeString + '>',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.tuple,
info: [
{
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
},
{
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
}
]
},3],
customTypeName: 'vector',
},
value: data.value.map(value => new types.Tuple(value, value))
},
// vector<vector<subtype, 3>, 3>
{
subtypeString: 'vector<' + data.subtypeString + ', 3>',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.custom,
info: [{
code: data.typeInfo.info[0].code,
info: data.typeInfo.info[0]["info"]
},3],
customTypeName: 'vector',
},3],
customTypeName: 'vector',
},
value: data.value.map(value => new Vector([value, value, value], data.subtypeString))
}
]).concat([
// vector<my_udt, 3>
{
subtypeString: 'my_udt',
typeInfo: {
code: types.dataTypes.custom,
info: [{
code: types.dataTypes.udt,
info: {
name: 'my_udt',
fields: [{ name: 'f1', type: { code: types.dataTypes.text } }],
}
}, 3],
customTypeName: 'vector',
},
value: [{ f1: 'a' }, { f1: 'b' }, { f1: 'c' }]
}
]);
helper.dataProviderWithCollections = dataProviderWithCollections;
/**
* A retry policy for testing purposes only, retries for a number of times
* @param {Number} times
* @constructor
*/
function RetryMultipleTimes(times) {
this.times = times;
}
RetryMultipleTimes.prototype.onReadTimeout = function (requestInfo) {
if (requestInfo.nbRetry > this.times) {
return this.rethrowResult();
}
return this.retryResult();
};
RetryMultipleTimes.prototype.onUnavailable = function (requestInfo) {
if (requestInfo.nbRetry > this.times) {
return this.rethrowResult();
}
return this.retryResult();
};
RetryMultipleTimes.prototype.onWriteTimeout = function (requestInfo) {
if (requestInfo.nbRetry > this.times) {
return this.rethrowResult();
}
return this.retryResult();
};
/**
* For test purposes, filters the child policy by last octet of the ip address
* @param {Array} list
* @param [childPolicy]
* @constructor
*/
function AllowListPolicy(list, childPolicy) {
this.list = list;
this.childPolicy = childPolicy || new policies.loadBalancing.RoundRobinPolicy();
}
util.inherits(AllowListPolicy, policies.loadBalancing.LoadBalancingPolicy);
AllowListPolicy.prototype.init = function (client, hosts, callback) {
this.childPolicy.init(client, hosts, callback);
};
AllowListPolicy.prototype.newQueryPlan = function (keyspace, info, callback) {
const list = this.list;
this.childPolicy.newQueryPlan(keyspace, info, function (err, iterator) {
callback(err, {
next: function () {
let item = iterator.next();
while (!item.done) {
if (list.indexOf(helper.lastOctetOf(item.value)) >= 0) {
break;
}
item = iterator.next();
}
return item;
}
});
});
};
function FallthroughRetryPolicy() {
}
util.inherits(FallthroughRetryPolicy, policies.retry.RetryPolicy);
FallthroughRetryPolicy.prototype.onUnavailable = function () {
this.rethrowResult();
};
FallthroughRetryPolicy.prototype.onReadTimeout = FallthroughRetryPolicy.prototype.onUnavailable;
FallthroughRetryPolicy.prototype.onWriteTimeout = FallthroughRetryPolicy.prototype.onUnavailable;
FallthroughRetryPolicy.prototype.onRequestError = FallthroughRetryPolicy.prototype.onUnavailable;
/**
* Conditionally executes func if testVersion is <= the current cassandra version.
* @param {String} testVersion Minimum version of Cassandra needed.
* @param {Function} func The function to conditionally execute.
* @param {Array} args the arguments to apply to the function.
*/
function executeIfVersion (testVersion, func, args) {
const serverInfo = helper.getServerInfo();
let invokeFunction = false;
if (testVersion.startsWith('dse-')) {
if (serverInfo.isDse) {
// Compare only if the server instance is DSE
invokeFunction = helper.versionCompare(serverInfo.version, testVersion.substr(4));
}
} else {
// Use the C* version (of DSE or the actual C* version)
invokeFunction = helper.versionCompare(helper.getCassandraVersion(), testVersion);
}
if (invokeFunction) {
func.apply(this, args);
}
}
/**
* Policy only suitable for testing, it creates a fixed query plan containing the nodes in the same order, i.e. [a, b].
*/
class OrderedLoadBalancingPolicy extends policies.loadBalancing.RoundRobinPolicy {
/**
* Creates a new instance.
* @param {Array<String>|SimulacronCluster} [addresses] When specified, it uses the order from the provided host
* addresses.
*/
constructor(addresses) {
super();
if (addresses && typeof addresses.dc === 'function') {
// With Simulacron, use the nodes from the first DC in that order
addresses = addresses.dc(0).nodes.map(n => n.address);
}
this.addresses = addresses;
}
getDistance(host) {
if (!this.addresses) {
return types.distance.local;
}
if (this.addresses.indexOf(host.address) >= 0) {
return types.distance.local;
}
return types.distance.ignored;
}
newQueryPlan(keyspace, info, callback) {
const hosts = !this.addresses
? this.hosts.values()
: this.addresses.map(address => this.hosts.get(address));
return callback(null, hosts[Symbol.iterator]());
}
}
module.exports = helper;
module.exports.RetryMultipleTimes = RetryMultipleTimes;
module.exports.OrderedLoadBalancingPolicy = OrderedLoadBalancingPolicy;