blob: 1e2a3d22676bb5942a701c82732955c1ef1c69b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const assert = require('assert');
const EventEmitter = require('events');
const proxyquire = require('proxyquire');
const sinon = require('sinon');
const Connection = require('../../lib/connection');
const requests = require('../../lib/requests');
const defaultOptions = require('../../lib/client-options').defaultOptions();
const utils = require('../../lib/utils');
const errors = require('../../lib/errors');
const ExecutionOptions = require('../../lib/execution-options').ExecutionOptions;
const helper = require('../test-helper');
describe('Connection', function () {
describe('constructor', function () {
it('should parse host endpoint into address and port', function () {
const values = [
['127.0.0.1:9042', '127.0.0.1', '9042'],
['10.1.1.255:8888', '10.1.1.255', '8888'],
['::1:8888', '::1', '8888'],
['::1:1234', '::1', '1234'],
['aabb::eeff:11:2233:4455:6677:8899:9999', 'aabb::eeff:11:2233:4455:6677:8899', '9999']
];
values.forEach(function (item) {
const c = new Connection(item[0], 4, defaultOptions);
assert.strictEqual(c.address, item[1]);
assert.strictEqual(c.port, item[2]);
});
});
});
describe('#prepareOnce()', function () {
function prepareAndAssert(connection, query) {
return (function (cb) {
connection.prepareOnce(query, 'ks', function (err, r) {
assert.ifError(err);
assert.strictEqual(query, r);
cb();
});
});
}
it('should prepare different queries', function (done) {
const connection = newInstance();
//override sendStream behaviour
connection.sendStream = function(r, o, cb) {
setImmediate(function () {
cb(null, r.query);
});
};
utils.parallel([
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY2'),
prepareAndAssert(connection, 'QUERY3')
], function (err) {
assert.ifError(err);
done();
});
});
it('should prepare different queries with keyspace', function (done) {
const connection = newInstance();
connection.keyspace = 'ks1';
//override sendStream behaviour
connection.sendStream = function(r, o, cb) {
setImmediate(function () {
cb(null, r.query);
});
};
utils.parallel([
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY2'),
prepareAndAssert(connection, 'QUERY3')
], function (err) {
assert.ifError(err);
done();
});
});
it('should prepare the same query once', function (done) {
const connection = newInstance();
let ioCount = 0;
//override sendStream behaviour
connection.sendStream = function(r, o, cb) {
setImmediate(function () {
ioCount++;
cb(null, r.query);
});
};
utils.parallel([
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY1')
], function (err) {
assert.ifError(err);
assert.strictEqual(ioCount, 1);
done();
});
});
it('should prepare the same query once with keyspace', function (done) {
const connection = newInstance();
connection.keyspace = 'ks1';
let ioCount = 0;
//override sendStream behaviour
connection.sendStream = function(r, o, cb) {
setImmediate(function () {
ioCount++;
cb(null, r.query);
});
};
utils.parallel([
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY1'),
prepareAndAssert(connection, 'QUERY1')
], function (err) {
assert.ifError(err);
assert.strictEqual(ioCount, 1);
done();
});
});
});
describe('#sendStream()', function () {
let clock;
before(() => clock = sinon.useFakeTimers());
after(() => clock.restore());
it('should set the timeout for the idle request', function () {
const sent = [];
const writeQueueFake = getWriteQueueFake(sent);
const c = newInstance(undefined, undefined, { pooling: { heartBeatInterval: 20 } }, writeQueueFake);
c.sendStream(new requests.QueryRequest('QUERY1'), null, utils.noop);
clock.tick(20);
// 2 requests were sent, the user query plus the idle 'options' query
assert.deepEqual(sent.map(function (op) {
if (op.request instanceof requests.QueryRequest) {
return op.request.query;
}
return op.request;
}), [ 'QUERY1', requests.options ]);
c.close();
});
it('should not set the timeout for the idle request when heartBeatInterval is 0', function () {
const sent = [];
const writeQueueFake = getWriteQueueFake(sent);
const c = newInstance(undefined, undefined, { pooling: { heartBeatInterval: 0 } }, writeQueueFake);
c.sendStream(new requests.QueryRequest('QUERY1'), null, utils.noop);
clock.tick(20);
// Only 1 request was sent, no idle query
assert.deepEqual(sent.map(function (op) {
if (op.request instanceof requests.QueryRequest) {
return op.request.query;
}
return op.request;
}), [ 'QUERY1' ]);
c.close();
});
it('should reset the timeout after each new request', function () {
const sent = [];
const writeQueueFake = getWriteQueueFake(sent);
const c = newInstance(undefined, undefined, { pooling: { heartBeatInterval: 20 } }, writeQueueFake);
for (let i = 0; i < 4; i++) {
clock.tick(10);
c.sendStream(new requests.QueryRequest('QUERY' + i), null, utils.noop);
}
// Only 4 request were sent, no idle query
assert.deepEqual(sent.map(function (op) {
if (op.request instanceof requests.QueryRequest) {
return op.request.query;
}
return op.request;
}), Array.apply(null, new Array(4)).map((x, i) => 'QUERY' + i));
c.close();
});
it('should set the request timeout', function (done) {
const writeQueueFake = getWriteQueueFake();
const c = newInstance(undefined, undefined, { pooling: { heartBeatInterval: 0 } }, writeQueueFake);
c.sendStream(new requests.QueryRequest('QUERY1'), getExecOptions({ readTimeout: 20 }), function (err) {
helper.assertInstanceOf(err, errors.OperationTimedOutError);
c.close();
done();
});
clock.tick(20);
});
});
describe('#close', function () {
it('should allow socket.close event to be emitted before calling back when connected', function (done) {
class Socket extends BaseSocketMock {
destroy() {
setImmediate(() => this.emit('close'));
}
end() {
this.destroy();
}
}
const ConnectionInjected = proxyquire('../../lib/connection', { 'net': { Socket } });
const c = new ConnectionInjected('127.0.0.1:9042', 9042, utils.extend({}, defaultOptions));
c.logEmitter = helper.noop;
c.sendStream = function (r, o, cb) {
cb(null, {});
};
c.open(function (err) {
assert.ifError(err);
assert.ok(c.connected);
//it is now connected
const socket = c.netClient;
let closeEmitted = 0;
socket.on('close', function () {
closeEmitted++;
});
c.close(function (err) {
assert.ifError(err);
assert.strictEqual(closeEmitted, 1);
done();
});
});
});
it('should allow socket.close event to be emitted before calling back when disconnected', function (done) {
class Socket extends BaseSocketMock {
destroy() {
setImmediate(() => this.emit('close'));
}
end() {
setImmediate(() => this.emit('close'));
}
}
const ConnectionInjected = proxyquire('../../lib/connection', { 'net': { Socket } });
const c = new ConnectionInjected('127.0.0.1:9042', 9042, utils.extend({}, defaultOptions));
c.logEmitter = helper.noop;
c.sendStream = function (r, o, cb) {
cb(null, {});
};
c.open(function (err) {
assert.ifError(err);
assert.ok(c.connected);
//force destroy
c.connected = false;
const socket = c.netClient;
let closeEmitted = 0;
socket.on('close', function () {
closeEmitted++;
});
c.close(function (err) {
assert.ifError(err);
assert.strictEqual(closeEmitted, 1);
done();
});
});
});
});
});
/** @return {Connection} */
function newInstance(address, protocolVersion, options, writeQueue){
address = address || helper.baseOptions.contactPoints[0];
options = utils.deepExtend({ logEmitter: helper.noop }, defaultOptions, options);
const c = new Connection(address + ':' + 9000, protocolVersion || 1, options);
c.connected = !!writeQueue;
c.writeQueue = writeQueue;
return c;
}
function getWriteQueueFake(sent) {
sent = sent || [];
return ({
push: function (op, writeCallback) {
sent.push(op);
setImmediate(writeCallback);
}
});
}
function getExecOptions(options) {
const result = ExecutionOptions.empty();
result.getReadTimeout = () => options.readTimeout;
return result;
}
class BaseSocketMock extends EventEmitter {
connect(p, a, cb) {
setImmediate(cb);
}
destroy() {}
end() {}
setTimeout() {}
setKeepAlive() {}
setNoDelay() {}
pipe() {
return this;
}
}