blob: 14f81b2327d5166df83f00173ebf3878074cd61c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const assert = require('assert');
const util = require('util');
const RequestHandler = require('../../lib/request-handler');
const requests = require('../../lib/requests');
const helper = require('../test-helper');
const errors = require('../../lib/errors');
const types = require('../../lib/types');
const utils = require('../../lib/utils');
const retry = require('../../lib/policies/retry');
const speculativeExecution = require('../../lib/policies/speculative-execution');
const execProfileModule = require('../../lib/execution-profile');
const ProfileManager = execProfileModule.ProfileManager;
const ExecutionProfile = execProfileModule.ExecutionProfile;
const OperationState = require('../../lib/operation-state');
const defaultOptions = require('../../lib/client-options').defaultOptions;
const execOptionsModule = require('../../lib/execution-options');
const DefaultExecutionOptions = execOptionsModule.DefaultExecutionOptions;
const ExecutionOptions = execOptionsModule.ExecutionOptions;
const ClientMetrics = require('../../lib/metrics/client-metrics');
describe('RequestHandler', function () {
const queryRequest = new requests.QueryRequest('QUERY1');
describe('#send()', function () {
it('should return a ResultSet', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ]);
const handler = newInstance(queryRequest, null, lbp);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
});
it('should callback with error when error can not be retried', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new Error('Test Error'));
}
cb(null, {});
});
const handler = newInstance(queryRequest, null, lbp, new TestRetryPolicy());
let err;
try {
await handler.send();
} catch (e) {
err = e;
}
helper.assertInstanceOf(err, Error);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
});
it('should use the retry policy defined in the queryOptions', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.ResponseError(types.responseErrorCodes.writeTimeout, 'Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy();
const handler = newInstance(queryRequest, null, lbp, retryPolicy, true);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
assert.strictEqual(retryPolicy.writeTimeoutErrors.length, 1);
});
it('should use the provided host if specified in the queryOptions', async () => {
// get a fake host that always responds with a readTimeout
const host = helper.getHostsMock([ {} ], undefined, (r, h, cb) => {
cb(new errors.ResponseError(types.responseErrorCodes.readTimeout, 'Test error'));
})[0];
helper.afterThisTest(() => host.shutdown());
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
cb(null, {});
});
const retryPolicy = new TestRetryPolicy();
const handler = newInstance(queryRequest, null, lbp, retryPolicy, null, host);
const err = await helper.assertThrowsAsync(handler.send());
// expect an error that includes read timeout for that host.
assert.deepEqual(Object.keys(err.innerErrors), [host.address]);
assert.strictEqual(err.innerErrors[host.address].code, types.responseErrorCodes.readTimeout);
// should have skipped lbp entirely.
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 0);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
});
it('should callback with OperationTimedOutError when the retry policy decides', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.OperationTimedOutError('Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy(false);
const handler = newInstance(queryRequest, null, lbp, retryPolicy, true);
await helper.assertThrowsAsync(handler.send(), errors.OperationTimedOutError);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
assert.strictEqual(retryPolicy.requestErrors.length, 1);
});
it('should not use the retry policy if query is non-idempotent on writeTimeout', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.ResponseError(types.responseErrorCodes.writeTimeout, 'Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy();
const handler = newInstance(queryRequest, null, lbp, retryPolicy, false);
const err = await helper.assertThrowsAsync(handler.send());
helper.assertInstanceOf(err, errors.ResponseError);
assert.strictEqual(err.code, types.responseErrorCodes.writeTimeout);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
assert.strictEqual(retryPolicy.writeTimeoutErrors.length, 0);
});
it('should not use the retry policy if query is non-idempotent on OperationTimedOutError', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.OperationTimedOutError('Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy(false);
const handler = newInstance(queryRequest, null, lbp, retryPolicy, false);
await helper.assertThrowsAsync(handler.send(), errors.OperationTimedOutError);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
assert.strictEqual(retryPolicy.requestErrors.length, 0);
});
it('should use the retry policy even if query is non-idempotent on readTimeout', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.ResponseError(types.responseErrorCodes.readTimeout, 'Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy();
const handler = newInstance(queryRequest, null, lbp, retryPolicy, false);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
assert.strictEqual(retryPolicy.readTimeoutErrors.length, 1);
});
it('should use the retry policy even if query is non-idempotent on unavailable', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
if (h.address === '0') {
return cb(new errors.ResponseError(types.responseErrorCodes.unavailableException, 'Test error'));
}
cb(null, {});
});
const retryPolicy = new TestRetryPolicy();
const handler = newInstance(queryRequest, null, lbp, retryPolicy, false);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
assert.strictEqual(retryPolicy.unavailableErrors.length, 1);
});
context('when an UNPREPARED response is obtained', function () {
it('should send a prepare request on the same connection and update the cache', async () => {
const queryId = utils.allocBufferFromString('123');
const resultId = utils.allocBufferFromString('8675');
const metadata = { resultId: resultId };
let executeRequest;
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], function prepareCallback(q, h, cb) {
// mock prepare returning metadata different than what is already cached.
cb(null, { meta: metadata });
}, function sendCallback(r, h, cb) {
// capture final execute request to ensure new metadata was propagated.
executeRequest = r;
if (h.sendStreamCalled === 1) {
// Its the first request, send an error
const err = new errors.ResponseError(types.responseErrorCodes.unprepared, 'Test error');
err.queryId = queryId;
return cb(err);
}
cb(null, { });
});
const hosts = lbp.getFixedQueryPlan();
const preparedCacheData = { query: 'QUERY1', meta: {}};
const client = newClient({
getPreparedById: function (id) {
preparedCacheData.id = id;
return preparedCacheData;
}
}, lbp);
const request = new requests.ExecuteRequest('QUERY1', queryId, []);
const handler = newInstance(request, client, lbp);
await handler.send();
assert.strictEqual(hosts[0].prepareCalled, 1);
assert.strictEqual(hosts[0].sendStreamCalled, 2);
assert.strictEqual(hosts[1].prepareCalled, 0);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
// metadata should be updated when reprepared.
const info = client.metadata.getPreparedById(1);
assert.deepEqual(info.meta, metadata);
// metadata should have been propagated to subsequent execute request.
assert.deepEqual(executeRequest.meta, metadata);
});
it('should allow prepared statement keyspace different than connection keyspace', async () => {
const queryId = utils.allocBufferFromString('123');
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendCallback(r, h, cb) {
if (h.sendStreamCalled === 1) {
// Its the first request, send an error
const err = new errors.ResponseError(types.responseErrorCodes.unprepared, 'Test error');
err.queryId = queryId;
return cb(err);
}
cb(null, { });
});
const hosts = lbp.getFixedQueryPlan();
const client = newClient({
getPreparedById: function (id) {
return { query: 'QUERY1', id: id, keyspace: 'ks1'};
}
}, lbp);
const request = new requests.ExecuteRequest('QUERY1', queryId, [], ExecutionOptions.empty());
const handler = newInstance(request, client, lbp);
await handler.send();
// should have been initial request, unprepared sent back, and error raised before preparing.
assert.strictEqual(hosts[0].prepareCalled, 1);
assert.strictEqual(hosts[0].sendStreamCalled, 2);
assert.strictEqual(hosts[1].prepareCalled, 0);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
});
it('should throw an error if prepared statement was on different keyspace than connection with older protocol version', async () => {
const queryId = utils.allocBufferFromString('123');
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendCallback(r, h, cb) {
if (h.sendStreamCalled === 1) {
// Its the first request, send an error
const err = new errors.ResponseError(types.responseErrorCodes.unprepared, 'Test error');
err.queryId = queryId;
return cb(err);
}
cb(null, { });
}, types.protocolVersion.dseV1);
const hosts = lbp.getFixedQueryPlan();
const client = newClient({
getPreparedById: function (id) {
return { query: 'QUERY1', id: id, keyspace: 'ks1'};
}
}, lbp);
const request = new requests.ExecuteRequest('QUERY1', queryId, []);
const handler = newInstance(request, client, lbp);
const err = await helper.assertThrowsAsync(handler.send());
helper.assertContains(err.message, 'Query was prepared on keyspace ks1');
// should have been initial request, unprepared sent back, and error raised before preparing.
assert.strictEqual(hosts[0].prepareCalled, 0);
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].prepareCalled, 0);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
});
it('should move to next host when PREPARE response is an error', async () => {
const queryId = utils.allocBufferFromString('123');
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], function prepareCallback(q, h, cb) {
if (h.address === '0') {
return cb(new Error('Test error'));
}
cb(null, { });
}, function sendFake(r, h, cb) {
if (h.sendStreamCalled === 1) {
// Its the first request, send an error
const err = new errors.ResponseError(types.responseErrorCodes.unprepared, 'Test error');
err.queryId = queryId;
return cb(err);
}
cb(null, { });
});
const hosts = lbp.getFixedQueryPlan();
const client = newClient({
getPreparedById: function (id) {
return { query: 'QUERY1', id: id };
}
}, lbp);
const request = new requests.ExecuteRequest('QUERY1', queryId, [], ExecutionOptions.empty());
const handler = newInstance(request, client, lbp);
await handler.send();
assert.strictEqual(hosts[0].prepareCalled, 1);
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].prepareCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 2);
});
it('should update prepared cache when rows response received with new result id', async () => {
const queryId = utils.allocBufferFromString('123');
const resultId = utils.allocBufferFromString('8675');
const newResultId = utils.allocBufferFromString('309');
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendCallback(r, h, cb) {
// mock a result having meta with a newResultId
cb(null, { meta: { newResultId: newResultId } });
});
const hosts = lbp.getFixedQueryPlan();
const preparedCacheData = { query: 'QUERY1', meta: { resultId: resultId }};
const client = newClient({
getPreparedById: function (id) {
preparedCacheData.id = id;
return preparedCacheData;
}
}, lbp);
const request = new requests.ExecuteRequest('QUERY1', queryId, []);
const handler = newInstance(request, client, lbp);
await handler.send();
assert.strictEqual(hosts[0].prepareCalled, 0);
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].prepareCalled, 0);
assert.strictEqual(hosts[1].sendStreamCalled, 0);
// metadata should be updated by newResultId detected in result.
const info = client.metadata.getPreparedById(1);
assert.deepEqual(info.meta.resultId, newResultId);
});
});
context('with speculative executions', function () {
it('should use the query plan to use next hosts as coordinators', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {}, {}], undefined, function sendStreamCb(r, h, cb) {
const op = new OperationState(r, null, cb);
if (h.address !== '2') {
setTimeout(function () {
op.setResult(null, {});
}, 60);
return op;
}
op.setResult(null, {});
return op;
});
const client = newClient(null, lbp);
client.options.policies.speculativeExecution =
new speculativeExecution.ConstantSpeculativeExecutionPolicy(20, 2);
const handler = newInstance(queryRequest, client, lbp, null, true);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
// Used the third host to get the response
assert.strictEqual(result.info.queriedHost, '2');
assert.deepEqual(Object.keys(result.info.triedHosts), [ '0', '1', '2' ]);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
assert.strictEqual(hosts[2].sendStreamCalled, 1);
});
it('should use the query plan to use next hosts as coordinators with zero delay', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {} ], undefined, function sendStreamCb(r, h, cb) {
const op = new OperationState(r, null, cb);
if (h.address !== '1') {
setTimeout(function () {
op.setResult(null, {});
}, 40);
return op;
}
op.setResult(null, {});
return op;
});
const client = newClient(null, lbp);
client.options.policies.speculativeExecution =
new speculativeExecution.ConstantSpeculativeExecutionPolicy(0, 2);
const handler = newInstance(queryRequest, client, lbp, null, true);
const result = await handler.send();
helper.assertInstanceOf(result, types.ResultSet);
// Used the second host to get the response
assert.strictEqual(result.info.queriedHost, '1');
assert.deepEqual(Object.keys(result.info.triedHosts), [ '0', '1' ]);
const hosts = lbp.getFixedQueryPlan();
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
});
it('should callback in error when any of execution responses is an error that cant be retried', async () => {
const lbp = helper.getLoadBalancingPolicyFake([ {}, {}, {}], undefined, function sendStreamCb(r, h, cb) {
const op = new OperationState(r, null, cb);
if (h.address !== '0') {
setTimeout(function () {
op.setResult(null, {});
}, 60);
return op;
}
// The first request is going to be completed with an error
setTimeout(function () {
op.setResult(new Error('Test error'));
}, 60);
return op;
});
const client = newClient(null, lbp);
client.options.policies.speculativeExecution =
new speculativeExecution.ConstantSpeculativeExecutionPolicy(20, 2);
const handler = newInstance(queryRequest, client, lbp, null, true);
await helper.assertThrowsAsync(handler.send());
const hosts = lbp.getFixedQueryPlan();
// 3 hosts were queried but the first responded with an error
assert.strictEqual(hosts[0].sendStreamCalled, 1);
assert.strictEqual(hosts[1].sendStreamCalled, 1);
assert.strictEqual(hosts[2].sendStreamCalled, 1);
});
});
});
});
/**
* @param {Request} request
* @param {Client} client
* @param {LoadBalancingPolicy} lbp
* @param {RetryPolicy} [retry]
* @param {Boolean} [isIdempotent]
* @param {Host} [host]
* @returns {RequestHandler}
*/
function newInstance(request, client, lbp, retry, isIdempotent, host) {
client = client || newClient(null, lbp);
const options = {
executionProfile: new ExecutionProfile('abc', { loadBalancing: lbp }), retry: retry, isIdempotent: isIdempotent, host: host
};
const execOptions = new DefaultExecutionOptions(options, client);
return new RequestHandler(request, execOptions, client);
}
function newClient(metadata, lbp) {
const options = defaultOptions();
options.logEmitter = utils.noop;
options.policies.loadBalancing = lbp || options.policies.loadBalancing;
return {
profileManager: new ProfileManager(options),
options: options,
metadata: metadata,
metrics: new ClientMetrics()
};
}
/** @extends RetryPolicy */
function TestRetryPolicy(retryOnRequestError, retryOnUnavailable, retryOnReadTimeout, retryOnWriteTimeout) {
this._retryOnRequestError = ifUndefined(retryOnRequestError, true);
this._retryOnUnavailable = ifUndefined(retryOnUnavailable, true);
this._retryOnReadTimeout = ifUndefined(retryOnReadTimeout, true);
this._retryOnWriteTimeout = ifUndefined(retryOnWriteTimeout, true);
this.requestErrors = [];
this.unavailableErrors = [];
this.writeTimeoutErrors = [];
this.readTimeoutErrors = [];
}
util.inherits(TestRetryPolicy, retry.RetryPolicy);
TestRetryPolicy.prototype.onRequestError = function () {
this.requestErrors.push(Array.prototype.slice.call(arguments));
return this._retryOnRequestError ? this.retryResult(undefined, false) : this.rethrowResult();
};
TestRetryPolicy.prototype.onUnavailable = function () {
this.unavailableErrors.push(Array.prototype.slice.call(arguments));
return this._retryOnUnavailable ? this.retryResult(undefined, false) : this.rethrowResult();
};
TestRetryPolicy.prototype.onReadTimeout = function () {
this.readTimeoutErrors.push(Array.prototype.slice.call(arguments));
return this._retryOnReadTimeout ? this.retryResult(undefined, false) : this.rethrowResult();
};
TestRetryPolicy.prototype.onWriteTimeout = function () {
this.writeTimeoutErrors.push(Array.prototype.slice.call(arguments));
return this._retryOnWriteTimeout ? this.retryResult(undefined, false) : this.rethrowResult();
};
function ifUndefined(value, valueIfUndefined) {
return value === undefined ? valueIfUndefined : value;
}