blob: 35d8a4fb157f30e851dd02aa191018947786b0a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const { assert } = require('chai');
const sinon = require('sinon');
const util = require('util');
const events = require('events');
const hostModule = require('../../lib/host');
const Host = hostModule.Host;
const HostConnectionPool = require('../../lib/host-connection-pool');
const Metadata = require('../../lib/metadata');
const HostMap = hostModule.HostMap;
const types = require('../../lib/types');
const clientOptions = require('../../lib/client-options');
const defaultOptions = clientOptions.defaultOptions();
defaultOptions.pooling.coreConnectionsPerHost = clientOptions.coreConnectionsPerHostV3;
const utils = require('../../lib/utils.js');
const policies = require('../../lib/policies');
const helper = require('../test-helper');
const reconnection = policies.reconnection;
describe('HostConnectionPool', function () {
this.timeout(5000);
describe('#borrowConnection()', function () {
it('should avoid returning the previous connection', () => {
const hostPool = newHostConnectionPoolInstance();
hostPool.coreConnectionsLength = 4;
hostPool.connections = [
{ getInFlight: () => 0, index: 0 },
{ getInFlight: () => 0, index: 1 },
{ getInFlight: () => 0, index: 2 },
{ getInFlight: () => 0, index: 3 },
];
const result = new Map();
// Avoid returning connection at index 2
const previousConnectionIndex = 2;
for (let i = 0; i < 8; i++) {
const c = hostPool.borrowConnection(hostPool.connections[previousConnectionIndex]);
result.set(c.index, (result.get(c.index) || 0) + 1);
}
assert.strictEqual(result.get(0), 2);
assert.strictEqual(result.get(1), 2);
assert.strictEqual(result.get(3), 4);
assert.strictEqual(result.get(previousConnectionIndex), undefined);
});
});
describe('#drainAndShutdown()', function () {
it('should wait for connections to drain before shutting down', function (done) {
const hostPool = newHostConnectionPoolInstance();
const c = new events.EventEmitter();
c.getInFlight = helper.functionOf(100);
hostPool.connections = [
c,
{ close: helper.noop, getInFlight: helper.functionOf(0) }
];
hostPool.drainAndShutdown();
let drained, closed;
hostPool.once('close', function () {
assert.ok(drained);
assert.ok(closed);
done();
});
c.close = function () {
closed = true;
};
setImmediate(function () {
drained = true;
c.emit('drain');
});
});
it('should timeout when draining connections takes longer than expected', function (done) {
const hostPool = newHostConnectionPoolInstance({ socketOptions: { readTimeout: 20 } });
const c = new events.EventEmitter();
c.getInFlight = helper.functionOf(100);
hostPool.connections = [ c ];
hostPool.drainAndShutdown();
let closed;
hostPool.once('close', function () {
assert.ok(closed);
});
c.close = function () {
closed = true;
};
setTimeout(function () {
// Its not closed immediately
assert.ok(!closed);
}, 10);
setTimeout(function () {
// Drain was never emitted but the pool is closed
assert.ok(closed);
assert.strictEqual(hostPool.connections.length, 0);
done();
}, 140);
});
});
describe('#_attemptNewConnection()', function () {
it('should create and attempt to open a connection', async () => {
const hostPool = newHostConnectionPoolInstance();
const c = sinon.spy({
openAsync: () => Promise.resolve()
});
hostPool._createConnection = function () {
return c;
};
await hostPool._attemptNewConnection();
assert.strictEqual(1, c.openAsync.callCount);
});
it('should callback in error when open fails', async () => {
const hostPool = newHostConnectionPoolInstance();
const c = sinon.spy({
openAsync: () => Promise.reject(new Error('Test dummy error')),
closeAsync: () => Promise.resolve()
});
hostPool._createConnection = function () {
return c;
};
await helper.assertThrowsAsync(hostPool._attemptNewConnection());
assert.strictEqual(c.openAsync.callCount, 1);
assert.strictEqual(c.closeAsync.callCount, 1);
});
it('should create a single connection with multiple calls in parallel', async () => {
const hostPool = newHostConnectionPoolInstance();
const c = sinon.spy({
openAsync: () => Promise.resolve()
});
hostPool._createConnection = sinon.spy(() => c);
await Promise.all(Array(10).fill(0).map(() => hostPool._attemptNewConnection()));
assert.strictEqual(c.openAsync.callCount, 1);
assert.strictEqual(hostPool._createConnection.callCount, 1);
});
});
describe('minInFlight()', function () {
it('should round robin between connections with the same amount of in-flight requests', function () {
/** @type {Array.<Connection>} */
const connections = [];
for (let i = 0; i < 3; i++) {
connections.push({ getInFlight: helper.functionOf(0), index: i});
}
const initial = HostConnectionPool.minInFlight(connections, 32, null).index;
for (let i = 1; i < 10; i++) {
assert.strictEqual(
(initial + i) % connections.length,
HostConnectionPool.minInFlight(connections, 32, null).index);
}
});
it('should skip the previous connection', function () {
/** @type {Array.<Connection>} */
const connections = [];
for (let i = 0; i < 5; i++) {
connections.push({ getInFlight: helper.functionOf(32), index: i});
}
const previousConnectionIndex = 2;
const previousConnection = connections[previousConnectionIndex];
const initial = HostConnectionPool.minInFlight(connections, 32, null).index;
// Assert that minInFlight() skips the previous connection
for (let i = 1; i < 10; i++) {
let expectedIndex = (initial + i) % connections.length;
if (expectedIndex === previousConnectionIndex) {
expectedIndex++;
}
assert.strictEqual(
expectedIndex,
HostConnectionPool.minInFlight(connections, 32, previousConnection).index);
}
});
it('should skip the previous connection when there are two', function () {
/** @type {Array.<Connection>} */
const connections = [];
for (let i = 0; i < 2; i++) {
connections.push({ getInFlight: helper.functionOf(32), index: i});
}
const previousConnection = connections[1];
// Assert that minInFlight() skips the previous connection, multiple times
for (let i = 0; i < 10; i++) {
assert.strictEqual(0, HostConnectionPool.minInFlight(connections, 32, previousConnection).index);
}
});
it('should not skip the previous connection when there is a single connection in the pool', function () {
const connections = [ { getInFlight: helper.functionOf(32), index: 0} ];
for (let i = 0; i < 10; i++) {
assert.strictEqual(connections[0], HostConnectionPool.minInFlight(connections, 32, connections[0]));
}
});
});
});
describe('Host', function () {
describe('#setUp()', function () {
it('should reset the reconnection schedule when bring it up', function () {
const maxDelay = 1000;
const options = utils.extend({
policies: {
reconnection: new reconnection.ExponentialReconnectionPolicy(50, maxDelay, false)
}}, defaultOptions);
const host = newHostInstance(options);
const create = host.pool._createConnection.bind(host.pool);
host.pool._createConnection = function () {
const c = create();
c.open = helper.callbackNoop;
return c;
};
const initialSchedule = options.policies.reconnection.newSchedule();
host.reconnectionSchedule = initialSchedule;
host.setDownAt = 1;
host.setUp();
assert.notStrictEqual(host.reconnectionSchedule, initialSchedule);
});
});
describe('#setDown()', function () {
it('should emit event when called', function (done) {
const host = newHostInstance(defaultOptions);
host.on('down', done);
host.setDown();
host.shutdown(false);
});
});
describe('#getActiveConnection()', function () {
it('should return null if a the pool is initialized', function () {
const h = newHostInstance(defaultOptions);
assert.strictEqual(h.getActiveConnection(), null);
});
});
describe('#setDistance()', function () {
it('should call checkIsUp() when the new distance is local and was down', function () {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.ignored;
host.setDownAt = 1;
let checkIsUpCalled = 0;
host.checkIsUp = function () { checkIsUpCalled++; };
host.setDistance(types.distance.local);
assert.strictEqual(checkIsUpCalled, 1);
host.shutdown(false);
});
it('should call drainAndShutdown() and emit when the new distance is ignored', function () {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
let drainAndShutdownCalled = 0;
let ignoreEventCalled = 0;
host.pool.drainAndShutdown = function () { drainAndShutdownCalled++; };
host.once('ignore', function () {
ignoreEventCalled++;
});
host.setDistance(types.distance.ignored);
assert.strictEqual(drainAndShutdownCalled, 1);
assert.strictEqual(ignoreEventCalled, 1);
assert.strictEqual(host.pool.coreConnectionsLength, 0);
});
it('should not call drainAndShutdown() when the new distance is ignored and was previously ignored', function () {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.ignored;
let drainAndShutdownCalled = 0;
let ignoreEventCalled = 0;
host.pool.drainAndShutdown = function () { drainAndShutdownCalled++; };
host.once('ignore', function () {
ignoreEventCalled++;
});
host.setDistance(types.distance.ignored);
assert.strictEqual(drainAndShutdownCalled, 0);
assert.strictEqual(ignoreEventCalled, 0);
});
});
describe('#removeFromPool()', function () {
it('should remove the connection in a new array instance', function () {
const host = newHostInstance(defaultOptions);
const initialConnections = [ newConnectionMock(), newConnectionMock() ];
host.pool.connections = initialConnections;
host.removeFromPool(initialConnections[0]);
assert.deepEqual(host.pool.connections, [ initialConnections[1] ]);
assert.notStrictEqual(host.pool.connections, initialConnections);
host.shutdown(false);
});
it('should issue a new connection attempt when pool size is smaller than config', function () {
const host = newHostInstance(defaultOptions);
const initialConnections = [ newConnectionMock(), newConnectionMock() ];
host.pool.connections = initialConnections;
host.pool.coreConnectionsLength = 10;
assert.ok(!host.pool.hasScheduledNewConnection());
host.removeFromPool(initialConnections[1]);
assert.deepEqual(host.pool.connections, [ initialConnections[0] ]);
assert.ok(host.pool.hasScheduledNewConnection());
assert.ok(host.isUp());
host.shutdown(false);
});
it('should set the host down when no connections', function () {
const host = newHostInstance(defaultOptions);
const initialConnections = [ newConnectionMock()];
host.pool.connections = initialConnections;
host._distance = types.distance.local;
assert.ok(!host.pool.hasScheduledNewConnection());
assert.ok(host.isUp());
host.removeFromPool(initialConnections[0]);
assert.deepEqual(host.pool.connections, []);
assert.ok(host.pool.hasScheduledNewConnection());
assert.ok(!host.isUp());
host.shutdown(false);
});
it('should not set the host down when it is ignored', function () {
const host = newHostInstance(defaultOptions);
const initialConnections = [ newConnectionMock()];
host.pool.connections = initialConnections;
host._distance = types.distance.ignored;
assert.ok(host.isUp());
host.removeFromPool(initialConnections[0]);
assert.deepEqual(host.pool.connections, []);
assert.ok(host.isUp());
host.shutdown(false);
});
});
describe('#checkHealth()', function () {
it('should remove connection from Array and invoke close', function (done) {
const host = newHostInstance(defaultOptions);
let closeInvoked = 0;
const c = {
timedOutOperations: 1000,
close: function () {
closeInvoked++;
}
};
const initialConnections = [ newConnectionMock(), newConnectionMock(), c];
host.pool.connections = initialConnections;
host.checkHealth(c);
setImmediate(function () {
assert.strictEqual(1, closeInvoked);
assert.deepEqual(host.pool.connections, initialConnections.slice(0, 2));
// different references
assert.notStrictEqual(initialConnections, host.pool.connections);
host.shutdown(false);
done();
});
});
it('should remove set host down when no more connections available', function (done) {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
host.pool.connections = [ newConnectionMock() ];
assert.ok(host.isUp());
host.checkHealth(host.pool.connections[0]);
setImmediate(function () {
assert.strictEqual(host.pool.connections.length, 0);
assert.ok(!host.isUp());
host.shutdown(false);
done();
});
});
});
describe('#checkIsUp()', function () {
it('should schedule a connection attempt', function () {
const host = newHostInstance(defaultOptions);
host.setDownAt = 1;
assert.ok(!host.pool.hasScheduledNewConnection());
host.checkIsUp();
assert.ok(host.pool.hasScheduledNewConnection());
host.shutdown(false);
});
it('should reset the reconnection schedule and set the delay to 0', function () {
const host = newHostInstance(defaultOptions);
host.setDownAt = 1;
host.reconnectionDelay = 1;
const reconnectionSchedule = host.reconnectionSchedule;
assert.ok(!host.pool.hasScheduledNewConnection());
host.checkIsUp();
assert.notStrictEqual(host.reconnectionSchedule, reconnectionSchedule);
assert.strictEqual(host.reconnectionDelay, 0);
assert.ok(host.pool.hasScheduledNewConnection());
host.shutdown(false);
});
it('should not issue a connection attempt if host is UP', function () {
const host = newHostInstance(defaultOptions);
assert.ok(!host.pool.hasScheduledNewConnection());
host.checkIsUp();
assert.ok(!host.pool.hasScheduledNewConnection());
host.shutdown(false);
});
});
describe('#warmupPool()', function () {
it('should create the exact amount of connections after borrowing when opening is instant', async () => {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
host.pool.coreConnectionsLength = 4;
host.pool._createConnection = () => newConnectionMock({ openAsync: () => {} });
await host.warmupPool();
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
await helper.delayAsync(100);
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
});
it('should create the exact amount of connections after borrowing when opening takes some time', async () => {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
host.pool.coreConnectionsLength = 3;
host.pool._createConnection = () => newConnectionMock({ openAsync: () => helper.delayAsync(20) });
await host.warmupPool();
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
await helper.delayAsync(200);
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
});
it('should create the exact amount of connections when opening is instant', async () => {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
host.pool.coreConnectionsLength = 4;
host.pool._createConnection = () => newConnectionMock({ openAsync: () => {} });
await host.warmupPool();
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
await helper.delayAsync(100);
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
});
it('should create the exact amount of connections when opening takes some time', async () => {
const host = newHostInstance(defaultOptions);
host._distance = types.distance.local;
host.pool.coreConnectionsLength = 3;
host.pool._createConnection = () => newConnectionMock({ openAsync: () => helper.delayAsync(20) });
await host.warmupPool();
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
await helper.delayAsync(200);
assert.strictEqual(host.pool.coreConnectionsLength, host.pool.connections.length);
});
});
});
describe('HostMap', function () {
describe('#values()', function () {
it('should return a frozen array', function () {
const map = new HostMap();
map.set('h1', 'h1');
const values = map.values();
assert.strictEqual(values.length, 1);
assert.ok(Object.isFrozen(values));
});
it('should return the same instance as long as the value does not change', function () {
const map = new HostMap();
map.set('h1', 'h1');
const values1 = map.values();
const values2 = map.values();
assert.strictEqual(values1, values2);
map.set('h2', 'h2');
const values3 = map.values();
assert.strictEqual(values3.length, 2);
assert.notEqual(values3, values1);
});
});
describe('#set()', function () {
it('should modify the cached values', function () {
const map = new HostMap();
map.set('h1', 'v1');
const values = map.values();
assert.strictEqual(util.inspect(values), util.inspect(['v1']));
map.set('h1', 'v1a');
assert.strictEqual(util.inspect(map.values()), util.inspect(['v1a']));
assert.strictEqual(map.get('h1'), 'v1a');
assert.notStrictEqual(map.values(), values);
});
});
});
/**
* @returns {HostConnectionPool}
*/
function newHostConnectionPoolInstance(options) {
options = utils.extend({ logEmitter: function () {} }, defaultOptions, options);
return new HostConnectionPool(newHostInstance(options), 2);
}
/**
* @param {Object} options
* @returns {Host}
*/
function newHostInstance(options) {
options = utils.extend({logEmitter: function () {}}, options);
return new Host('0.0.0.1:9042', 2, options, new Metadata(options, null));
}
/**
* @returns {Connection}
*/
function newConnectionMock(properties) {
return utils.extend({
close: helper.noop,
closeAsync: () => Promise.resolve(),
getInFlight: helper.functionOf(0)
}, properties);
}