blob: 55cd55f73b815007bd2531eaeb6d492c2a1ff626 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
"use strict";
const RpcAddress = require('./dsn/dsn_types').rpc_address;
const ErrorType = require('./dsn/dsn_types').error_type;
const Connection = require('./connection');
const util = require('util');
const Exception = require('./errors');
const META_DELAY = 1000;
let log = null;
/**
* Constructor of Cluster
* @param {Object} args required
* {Array} args.metaList required
* {String} args.metaList[i] required
* {Number} args.timeout(ms) required
* {Object} args.log required
* @constructor
*/
function Cluster(args) {
log = args.log;
this.log = args.log;
this.timeout = args.timeout;
this.replicaSessions = []; // {'key' :rpc_addr, 'value': ReplicaSession}
this.metaSession = new MetaSession(args);
this.queryMetaDelay = this.timeout > 3 ? this.timeout / 3 : 1;
}
/**
* Close all sessions
*/
Cluster.prototype.close = function () {
let i, len = this.replicaSessions.length;
let connection;
for (i = 0; i < len; ++i) {
connection = this.replicaSessions[i].value.connection;
if (connection) {
connection.emit('close');
}
}
log.info('Finish to close replica sessions');
len = this.metaSession.metaList.length;
for (i = 0; i < len; ++i) {
connection = this.metaSession.metaList[i];
if (connection) {
connection.emit('close');
}
}
log.info('Finish to close meta sessions');
};
/**
* Constructor of Session
* @param {Object} args
* {Number} args.timeout(ms) required
* @constructor
*/
function Session(args) {
this.timeout = args.timeout;
this.connection = null;
}
/**
* Create new Connection by rpc_address
* @param {Object} args
* {rpc_address} args.rpc_address required
* @param {Function} callback
*/
Session.prototype.getConnection = function (args, callback) {
let rpc_addr = args.rpc_address;
if (rpc_addr.invalid()) {
log.error('invalid rpc address');
}
let connection = new Connection({
'host': rpc_addr.host,
'port': rpc_addr.port,
'rpcTimeOut': this.timeout,
'log': log,
});
callback(null, connection);
};
/**
* Constructor of MetaSession
* @param {Object} args
* {Array} args.metaList required
* {String} args.metaList[i] required
* {Number} args.timeout(ms) required
* @constructor
* @extends Session
*/
function MetaSession(args) {
MetaSession.super_.call(this, args, this.constructor);
this.metaAddress = [];
this.metaList = [];
this.curLeader = 0;
this.maxRetryCounter = 5;
this.queryingTableName = {}; // tableName -> isQueryingMeta
this.lastQueryTime = {}; // tableName -> lastQueryTime
let i;
let self = this;
for (i = 0; i < args.metaList.length; ++i) {
let address = new RpcAddress();
if (address.fromString(args.metaList[i])) {
self.getConnection({
'rpc_address': address,
}, function (err, connection) {
self.metaList.push(connection);
self.metaAddress.push(address);
});
} else {
log.error('invalid meta server address %s', args.metaList[i]);
}
}
if (this.metaList.length <= 0) {
log.error('No meta connection exist!');
this.connectionError = new Exception.MetaException('ERR_NO_META_SERVER',
'Failed to connect to meta server, error is ERR_NO_META_SERVER');
}
}
util.inherits(MetaSession, Session);
/**
* Send request to meta by leader connection
* @param {MetaRequestRound} round
*/
MetaSession.prototype.query = function (round) {
if (this.metaList.length > 0) {
let entry = new RequestEntry(round.operator, function (err, op) {
round.operator = op;
this.onFinishQueryMeta(err, round);
}.bind(this));
if (round.lastConnection.connectError || round.lastConnection.closed) {
if (this.metaList[round.lastIndex] === round.lastConnection) {
log.error('%s meet error, reconnect it', round.lastConnection.name);
this.handleConnectedError(round.lastIndex);
} else if (this.metaList[round.lastIndex].connectError || this.metaList[round.lastIndex].closed || !this.metaList[round.lastIndex].connected ){
log.error('%s meet error, metaList[%d]: %s also meet error, reconnect lastIndex',
round.lastConnection.name,
round.lastIndex,
this.metaList[round.lastIndex].name);
round.lastConnection = this.metaList[round.lastIndex];
this.handleConnectedError(round.lastIndex);
}else {
log.info('%s meet error, but metaList[%d]: %s connected, use lastIndex connection',
round.lastConnection.name,
round.lastIndex,
this.metaList[round.lastIndex].name);
round.lastConnection = this.metaList[round.lastIndex];
}
}
round.lastConnection.call(entry);
} else {
log.error('There is no meta session exist');
}
};
/**
* Callback function after finishing query meta
* @param {Error} err
* @param {MetaRequestRound} round
*/
MetaSession.prototype.onFinishQueryMeta = function (err, round) {
let op = round.operator;
let needSwitch = false, needDelay = false;
let self = this;
round.maxQueryCount--;
if (round.maxQueryCount === 0) {
log.error('query meta exceed maxQueryCount, error is %s', err);
this.completeQueryMeta(err, round);
return;
}
let rpcErr = op.rpc_error.errno;
let metaErr = ErrorType.ERR_UNKNOWN;
if (ErrorType[rpcErr] === ErrorType.ERR_OK) {
metaErr = op.response.err.errno;
if (ErrorType[metaErr] === ErrorType.ERR_SERVICE_NOT_ACTIVE) { //meta server may be not ready, need to retry later
log.warn('meta server(%s) is not ready, try to query meta later', round.lastConnection.name);
needDelay = true;
needSwitch = false;
} else if (ErrorType[metaErr] === ErrorType.ERR_FORWARD_TO_OTHERS) { //current meta server is not leader, need to switch leader
log.warn('meta server(%s) is not leader, try to switch meta immediatly', round.lastConnection.name);
needDelay = false;
needSwitch = true;
} else {
log.debug('query meta server(%s) succeed', round.lastConnection.name);
this.completeQueryMeta(err, round);
return;
}
} else if (ErrorType[rpcErr] === ErrorType.ERR_SESSION_RESET || ErrorType[rpcErr] === ErrorType.ERR_TIMEOUT) {
log.warn('meta session(%s) not available, rpc error is %s, try to switch meta later', round.lastConnection.name, rpcErr);
needDelay = true;
needSwitch = true;
} else {
log.error('Unknown error while query meta(%s), rpc error is %s', round.lastConnection.name, rpcErr);
this.completeQueryMeta(err, round);
return;
}
// switch leader meta server
if (needSwitch && this.metaList[this.curLeader].hostnamePort === round.lastConnection.hostnamePort) {
this.curLeader = (this.curLeader + 1) % this.metaList.length;
}
round.lastIndex = this.curLeader;
round.lastConnection = this.metaList[round.lastIndex];
log.info("Will query meta index[%d]: %s", round.lastIndex, round.lastConnection.name);
// delay to query meta
let fun = function () {
self.query(round);
};
if (needDelay) {
setTimeout(fun, META_DELAY);
} else {
this.query(round);
}
};
/**
* Execute round callback function
* @param {Error} err
* @param {MetaRequestRound} round
*/
MetaSession.prototype.completeQueryMeta = function (err, round) {
let op = round.operator;
round.callback(err, op);
this.queryingTableName[op.request.app_name] = false;
};
/**
* Reconnect to session when original session meet error or close
* @param {Number} index
*/
MetaSession.prototype.handleConnectedError = function (index) {
let self = this;
let oriConnection = self.metaList[index];
this.getConnection({
'rpc_address': self.metaAddress[index],
}, function (err, connection) {
if (err === null && connection !== null) {
self.metaList[index] = connection;
oriConnection.emit('close');
} else {
log.error('Failed to get meta connection, %s', err.message);
}
});
};
/**
* Constructor of ReplicaSession
* @param {Object} args
* {string} args.address required
* {Number} args.timeout(ms) required
* @constructor
* @extends Session
*/
function ReplicaSession(args) {
ReplicaSession.super_.call(this, args, this.constructor);
if (!args || !args.address) {
log.error('Invalid params, Missing rpc address while creating replica session');
return;
}
let self = this;
let addr = new RpcAddress(args.address);
this.getConnection({
'rpc_address': addr,
}, function (err, connection) {
self.connection = connection;
});
}
util.inherits(ReplicaSession, Session);
/**
* Send user request by connection
* @param {ClientRequestRound} round
*/
ReplicaSession.prototype.operate = function (round) {
let entry = new RequestEntry(round.operator, function (err, op) {
round.operator = op;
this.onRpcReply(err, round);
}.bind(this));
this.connection.call(entry);
};
/**
* Reconnect to session when original session meet error or close
* @param address
*/
ReplicaSession.prototype.handleConnectedError = function (address) {
let oriConnection = this.connection;
let self = this;
this.getConnection({
'rpc_address': address,
}, function (err, connection) {
if (err === null && connection !== null) {
self.connection = connection;
oriConnection.emit('close');
} else {
log.error('Failed to get replica connection, %s', err.message);
}
});
};
/**
* Handle rpc error
* @param {Error} err
* @param {ClientRequestRound} round
*/
ReplicaSession.prototype.onRpcReply = function (err, round) {
let needQueryMeta = false;
let op = round.operator;
switch (ErrorType[op.rpc_error.errno]) {
case ErrorType.ERR_OK:
round.callback(err, op);
return;
case ErrorType.ERR_TIMEOUT:
log.warn('Table %s: rpc timeout for gpid(%d, %d), err_code is %s',
round.tableHandler.tableName,
op.pid.get_app_id(),
op.pid.get_pidx(),
op.rpc_error.errno);
break;
case ErrorType.ERR_INVALID_DATA: // maybe task code is invalid
log.error('Table %s: invalid data for gpid(%d, %d), err_code is %s',
round.tableHandler.tableName,
op.pid.get_app_id(),
op.pid.get_pidx(),
op.rpc_error.errno);
break;
case ErrorType.ERR_SESSION_RESET:
case ErrorType.ERR_OBJECT_NOT_FOUND: // replica server doesn't serve this gpid
case ErrorType.ERR_INVALID_STATE: // replica server is not primary
case ErrorType.ERR_PARENT_PARTITION_MISUSED: // partition finish split, need query meta
log.warn('Table %s: replica server not serve for gpid(%d, %d), err_code is %s',
round.tableHandler.tableName,
op.pid.get_app_id(),
op.pid.get_pidx(),
op.rpc_error.errno);
needQueryMeta = true;
break;
case ErrorType.ERR_NOT_ENOUGH_MEMBER:
case ErrorType.ERR_CAPACITY_EXCEEDED:
log.warn('Table %s: replica server cannot serve writing for gpid(%d, %d), err_code is %s',
round.tableHandler.tableName,
op.pid.get_app_id(),
op.pid.get_pidx(),
op.rpc_error.errno);
break;
default:
log.error('Table %s: unexpected error for gpid(%d, %d), err_code is %s',
round.tableHandler.tableName,
op.pid.get_app_id(),
op.pid.get_pidx(),
op.rpc_error.errno);
break;
}
if (needQueryMeta) {
round.tableHandler.callback = function (err, handler) {
// do nothing
}.bind(this);
round.tableHandler.queryMeta(round.tableHandler.tableName, round.tableHandler.onUpdateResponse.bind(round.tableHandler));
}
this.tryRetry(err, round);
};
ReplicaSession.prototype.tryRetry = function (err, round) {
let op = round.operator;
let self = this;
let delayTime = op.timeout > 3 ? op.timeout / 3 : 1;
if (op.timeout - delayTime > 0) {
op.timeout -= delayTime;
round.operator = op;
setTimeout(function () {
self.operate(round);
}, delayTime);
} else {
if (ErrorType[op.rpc_error.errno] === ErrorType.ERR_UNKNOWN) {
op.rpc_error.errno = ErrorType.ERR_TIMEOUT;
round.callback(new Exception.RPCException('ERR_TIMEOUT', err.message), op);
} else {
round.callback(new Exception.RPCException(op.rpc_error.errno, 'Failed to query server, error is ' + op.rpc_error.errno), op);
}
}
};
/**
* Constructor of RequestEntry
* @param {Operator} operator
* @param {Function} callback
* @constructor
*/
function RequestEntry(operator, callback) {
this.operator = operator;
this.callback = callback;
}
/**
* Constructor of MetaRequestRound
* @param {Operator} operator
* @param {Function} callback
* @param {Number} maxQueryCount
* @param {Number} index
* @param {Connection} lastConnection
* @constructor
*/
function MetaRequestRound(operator, callback, maxQueryCount, index, lastConnection) {
this.operator = operator;
this.callback = callback;
this.maxQueryCount = maxQueryCount;
this.lastIndex = index;
this.lastConnection = lastConnection;
}
/**
* Constructor of ClientRequestRound
* @param {TableHandler} tableHandler
* @param {Operator} operator
* @param {Function} callback
* @constructor
*/
function ClientRequestRound(tableHandler, operator, callback) {
this.tableHandler = tableHandler;
this.operator = operator;
this.callback = callback;
}
module.exports = {
Cluster: Cluster,
MetaSession: MetaSession,
ReplicaSession: ReplicaSession,
RequestEntry: RequestEntry,
MetaRequestRound: MetaRequestRound,
ClientRequestRound: ClientRequestRound,
};