blob: e7caa1aa75f567a6870e62b8aa5e2cde37fa105b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
"use strict";
const EventEmitter = require('events').EventEmitter;
const util = require('util');
const Cluster = require('./session').Cluster;
const TableHandler = require('./table_handler').TableHandler;
const TableInfo = require('./table_handler').TableInfo;
const tools = require('./tools');
const _OPERATION_TIMEOUT = 1000;
let log4js = require('log4js');
let logConfig = require('../log_config');
/**
* Constructor of client
* @param {Object} configs
* @constructor
*/
function Client(configs) {
if (!(this instanceof Client)) {
return new Client(configs);
}
EventEmitter.call(this);
this.rpcTimeOut = configs.operationTimeout || _OPERATION_TIMEOUT;
this.metaList = configs.metaServers;
if (configs.log) {
this.log = configs.log;
} else {
log4js.configure(logConfig);
this.log = log4js.getLogger('pegasus');
}
this.cachedTableInfo = {}; //tableName -> tableInfo
this.cluster = new Cluster({ //Current connections
'metaList': this.metaList,
'timeout': this.rpcTimeOut,
'log': this.log,
});
}
util.inherits(Client, EventEmitter);
/**
* Create a client instance
* @param {Object} configs
* {Array} configs.metaServers required
* {String} configs.metaServers[i] required
* {Number} configs.operationTimeout(ms) optional
* {Object} configs.log optional
* @return {Client} client instance
* @throws {InvalidParamException}
*/
Client.create = function (configs) {
tools.validateClientConfigs(configs);
return new Client(configs);
};
/**
* Close client and it sessions
*/
Client.prototype.close = function () {
this.cluster.close();
};
/**
* Get value
* @param {String} tableName
* @param {Object} args
* {Buffer} args.hashKey required
* {Buffer} args.sortKey required
* {Number} args.timeout(ms) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.get = function (tableName, args, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(args.hashKey, 'hashKey', Buffer, true, callback) ||
tools.validateParam(args.sortKey, 'sortKey', Buffer, true, callback)) {
return;
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.get(args, callback);
} else {
callback(err, null);
}
});
};
/**
* Set Value
* @param {String} tableName
* @param {Object} args
* {Buffer} args.hashKey required
* {Buffer} args.sortKey required
* {Buffer} args.value required
* {Number} args.ttl(s) optional
* {Number} args.timeout(ms) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.set = function (tableName, args, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(args.hashKey, 'hashKey', Buffer, true, callback) ||
tools.validateParam(args.sortKey, 'sortKey', Buffer, true, callback) ||
tools.validateParam(args.value, 'value', Buffer, true, callback)) {
return;
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.set(args, callback);
} else {
callback(err, null);
}
});
};
/**
* Batch Set value
* @param {String} tableName
* @param {Array} argsArray
* {Buffer} argsArray[i].hashKey required
* {Buffer} argsArray[i].sortKey required
* {Buffer} argsArray[i].value required
* {Number} argsArray[i].ttl optional
* {Number} argsArray[i].timeout(ms) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.batchSet = function (tableName, argsArray, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(argsArray, 'argsArray', Array, true, callback)) {
return;
}
let i, len = argsArray.length;
for (i = 0; i < len; ++i) {
if (tools.validateParam(argsArray[i].hashKey, 'hashKey', Buffer, true, callback) ||
tools.validateParam(argsArray[i].sortKey, 'sortKey', Buffer, true, callback) ||
tools.validateParam(argsArray[i].value, 'value', Buffer, true, callback)) {
return;
}
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.batchSetPromise(argsArray, callback);
} else {
callback(err, null);
}
});
};
/**
* Batch Get value
* @param {String} tableName
* @param {Array} argsArray
* {Buffer} argsArray[i].hashKey required
* {Buffer} argsArray[i].sortKey required
* {Number} argsArray[i].timeout(ms) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.batchGet = function (tableName, argsArray, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(argsArray, 'argsArray', Array, true, callback)) {
return;
}
let i, len = argsArray.length;
for (i = 0; i < len; ++i) {
if (tools.validateParam(argsArray[i].hashKey, 'hashKey', Buffer, true, callback) ||
tools.validateParam(argsArray[i].sortKey, 'sortKey', Buffer, true, callback)) {
return;
}
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.batchGetPromise(argsArray, callback);
} else {
callback(err, null);
}
});
};
/**
* Delete value
* @param {String} tableName
* @param {Object} args
* {Buffer} args.hashKey required
* {Buffer} args.sortKey required
* {Number} args.timeout(ms) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.del = function (tableName, args, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(args.hashKey, 'hashKey', Buffer, true, callback) ||
tools.validateParam(args.sortKey, 'sortKey', Buffer, true, callback)) {
return;
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.del(args, callback);
} else {
callback(err, null);
}
});
};
/**
* Multi Get
* @param {String} tableName
* @param {Object} args
* {Buffer} args.hashKey required
* {Array} args.sortKeyArray required
* {Buffer} args.sortKeyArray[i] required
* {Number} args.timeout(ms) optional
* {Number} args.maxFetchCount optional
* {Number} args.maxFetchSize optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.multiGet = function (tableName, args, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(args.hashKey, 'hashKey', Buffer, true, callback)) {
return;
}
if (tools.validateParam(args.sortKeyArray, 'sortKeyArray', Array, true, callback)) {
return;
}
let i, len = args.sortKeyArray.length;
for (i = 0; i < len; ++i) {
if (tools.validateParam(args.sortKeyArray[i], 'sortKey', Buffer, true, callback)) {
return;
}
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.multiGet(args, callback);
} else {
callback(err, null);
}
});
};
/**
* Multi Get
* @param {String} tableName
* @param {Object} args
* {Buffer} args.hashKey required
* {Array} args.sortKeyValueArray required
* {'key' : sortKey, 'value' : value}
* {Number} args.timeout(ms) optional
* {Number} args.ttl(s) optional
* @param {Function} callback
* @throws{InvalidParamException} callback is not function
*/
Client.prototype.multiSet = function (tableName, args, callback) {
tools.validateFunction(callback, 'callback');
if (tools.validateParam(args.hashKey, 'hashKey', Buffer, true, callback)) {
return;
}
if (tools.validateParam(args.sortKeyValueArray, 'sortKeyValueArray', Array, true, callback)) {
return;
}
let i, len = args.sortKeyValueArray.length;
for (i = 0; i < len; ++i) {
if (tools.validateParam(args.sortKeyValueArray[i].key, 'sortKey', Buffer, true, callback) ||
tools.validateParam(args.sortKeyValueArray[i].value, 'value', Buffer, true, callback)) {
return;
}
}
this.getTable(tableName, function (err, tableInfo) {
if (err === null && tableInfo !== null) {
tableInfo.multiSet(args, callback);
} else {
callback(err, null);
}
});
};
/**
* Get TableInfo and execute callback
* @param {String} tableName
* @param {Function} callback
*/
Client.prototype.getTable = function (tableName, callback) {
if (tools.validateParam(tableName, 'tableName', 'string', false, callback)) {
return;
}
let self = this;
let tableInfo = self.cachedTableInfo[tableName];
if (!tableInfo) {
new TableHandler(self.cluster, tableName, function (err, tableHandler) {
if (err === null && tableHandler !== null) {
tableInfo = new TableInfo(self, tableHandler, self.rpcTimeOut);
self.cachedTableInfo[tableName] = tableInfo;
callback(err, tableInfo);
} else {
callback(err, null);
}
});
} else { //use cached tableInfo structure
callback(null, tableInfo);
}
};
module.exports = Client;