项目地址:Pegasus Java Client
下载:
git clone https://github.com/XiaoMi/pegasus-java-client.git cd pegasus-java-client
选择所使用的版本并构建,建议使用 最新的release版本 :
git checkout v2.0.0 mvn clean package -DskipTests
安装到本地的maven repository,方便在项目中使用:
mvn clean install -DskipTests
安装完成后,通过maven配置在项目中使用:
<dependency> <groupId>com.xiaomi.infra</groupId> <artifactId>pegasus-client</artifactId> <version>2.0.0</version> </dependency>
注:2.0.0版本仅适用于服务端PegasusServer >= 2.0, 如果服务端版本较低,请使用下面的版本
<dependency> <groupId>com.xiaomi.infra</groupId> <artifactId>pegasus-client</artifactId> <version>1.11.10-thrift-0.11.0-inlined</version> </dependency>
创建Java client实例需要配置相关参数,用户可以选择两种方式进行配置:文件配置方式和参数传递方式
Java客户端需要准备配置文件,用以确定Pegasus集群的位置,以及配置默认超时时间等。
配置文件一般命名为pegasus.properties
,样例:
meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 operation_timeout = 1000 # 以下参数可以根据需要添加,否则以默认值即可 async_workers = 4 enable_perf_counter = true perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 meta_query_timeout = 5000
其中:
配置文件在创建Client实例的时候使用,需传入configPath参数:
PegasusClientInterface client = PegasusClientFactory.getSingletonClient(configPath);
其中configPath的格式为type : // path
,目前type支持三种类型:
用户可以选择构造ClientOptions实例作为创建客户端实例的参数,ClientOptions包含下列参数:
其中ClientOptions实例提供两种创建方式,你可以使用:
ClientOptions clientOptions = ClientOptions.create()
创建默认的ClientOptions实例。否则,可以参照下列样例创建自定义的实例:
ClientOptions clientOptions = ClientOptions.builder() .metaServers("127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603") .operationTimeout(Duration.ofMillis(1000)) .asyncWorkers(4) .enablePerfCounter(false) .falconPerfCounterTags("") .falconPushInterval(Duration.ofSeconds(10)) .build();
Java客户端的类都在com.xiaomi.infra.pegasus.client
包下面,主要提供了三个类:
类名 | 功能 |
---|---|
PegasusClientFactory | Client工厂类,用于创建Client实例 |
PegasusClientInterface | Client接口类,封装了各种同步API,也可用于创建Table实例 |
PegasusTableInterface | Table接口类,封装了存取单个Table数据的同步和异步API |
用户可以选择使用Client接口(PegasusClientInterface)或者是Table接(PegasusTableInterface)存取数据,区别如下:
创建Client实例有两种方式:单例和非单例。
如果程序中只需要访问单个集群,那么用单例是比较合适的,这样可以共享各种资源,譬如线程池、连接等。
注意:如果在多个地方调用getSingletonClient()
获取单例对象,需要保证传入的configPath或者ClientOptions对象是一致的,不然就会抛出异常,这样是为了保证多次调用获取到的是同一个实例。
调用PegasusClientFactory::getSingletonClient()
方法获取PegasusClientInterface的单例对象:
/** * Get the singleton client instance with default config path of "resource:///pegasus.properties". * After used, should call PegasusClientFactory.closeSingletonClient() to release resource. * * @return PegasusClientInterface PegasusClientInterface. * @throws PException throws exception if any error occurs. */ public static PegasusClientInterface getSingletonClient() throws PException; /** * Get the singleton client instance with customized config path. After used, should call * PegasusClientFactory.closeSingletonClient() to release resource. * * @param configPath configPath could be: * - zookeeper path : zk://host1:port1,host2:port2,host3:port3/path/to/config * - local file path : file:///path/to/config * - java resource : resource:///path/to/config * * @return PegasusClientInterface PegasusClientInterface. * @throws PException throws exception if any error occurs. */ public static PegasusClientInterface getSingletonClient(String configPath) throws PException; /** * Get the singleton client instance instance with ClientOptions. After used, should call * PegasusClientFactory.closeSingletonClient() to release resource. * * @param options The client option * @return PegasusClientInterface PegasusClientInterface. * @throws PException throws exception if any error occurs. */ public static PegasusClientInterface getSingletonClient(ClientOptions options) throws PException;
使用完毕后,记得close单例以释放资源,譬如:
PegasusClientInterface client = PegasusClientFactory.getSingletonClient(configPath); ... ... PegasusClientFactory.closeSingletonClient();
如果在程序中需要访问多个集群,就不能用单例了。因此我们提供了创建普通实例的接口,创建时传入一个configPath或者ClientOptions对象,不同集群使用不同的configPath或者ClientOptions对象。
注意:每个实例都拥有自己独立的资源,互不影响,因此要尽量避免重复创建实例,造成资源浪费,并且使用完后要记得调用close()释放资源。
调用PegasusClientFactory::createClient()
方法,获取非单例的client实例:
/** * Create a client instance. After used, should call client.close() to release resource. * * @param configPath client config path,could be: * - zookeeper path : zk://host1:port1,host2:port2,host3:port3/path/to/config * - local file path : file:///path/to/config * - java resource : resource:///path/to/config * * @return PegasusClientInterface. * @throws PException throws exception if any error occurs. */ public static PegasusClientInterface createClient(String configPath) throws PException; /** * Create a client instance instance with ClientOptions. After used, should call * client.close() to release resource. * * @param options The client option * @return PegasusClientInterface. * @throws PException throws exception if any error occurs. */ public static PegasusClientInterface createClient(ClientOptions options) throws PException;
譬如:
PegasusClientInterface client = PegasusClientFactory.createClient(configPath); ... ... client.close();
读单行数据。
/** * Get value. * @param tableName TableHandler name * @param hashKey used to decide which partition to get this k-v, * if null or length == 0, means hash key is "". * @param sortKey all the k-v under hashKey will be sorted by sortKey, * if null or length == 0, means sort key is "". * @return value; null if not found * @throws PException */ public byte[] get(String tableName, byte[] hashKey, byte[] sortKey) throws PException;
注:
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的。
/** * Batch get values of different keys. * Will terminate immediately if any error occurs. * @param tableName table name * @param keys hashKey and sortKey pair list. * @param values output values; should be created by caller; if succeed, the size of values will * be same with keys; the value of keys[i] is stored in values[i]; if the value of * keys[i] is not found, then values[i] will be set to null. * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchGet(String tableName, List<Pair<byte[], byte[]>> keys, List<byte[]> values) throws PException;
注:
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,但与上面batchGet不同的是,无论请求成功还是失败,它都会等待所有请求结束。
用户可以根据results中的PException是否设置来判断请求成功还是失败,并可以选择只使用成功的结果。
/** * Batch get values of different keys. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param keys hashKey and sortKey pair list. * @param results output results; should be created by caller; after call done, the size of results will * be same with keys; the results[i] is a Pair: * - if Pair.left != null : means query keys[i] failed, Pair.left is the exception. * - if Pair.left == null : means query keys[i] succeed, Pair.right is the result value. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchGet2(String tableName, List<Pair<byte[], byte[]>> keys, List<Pair<PException, byte[]>> results) throws PException;
注:
读同一HashKey下的多行数据。
/** * Get multiple value under the same hash key. * @param tableName table name * @param hashKey used to decide which partition to put this k-v, * should not be null or empty. * @param sortKeys all the k-v under hashKey will be sorted by sortKey, * if null or empty, means fetch all sortKeys under the hashKey. * @param maxFetchCount max count of k-v pairs to be fetched. * max_fetch_count <= 0 means no limit. default value is 100. * @param maxFetchSize max size of k-v pairs to be fetched. * max_fetch_size <= 0 means no limit. default value is 1000000. * @param values output values; if sortKey is not found, then it will not appear in values. * the returned sortKey is just the same one in incoming sortKeys. * @return true if all data is fetched; false if only partial data is fetched. * @throws PException */ public boolean multiGet(String tableName, byte[] hashKey, List<byte[]> sortKeys, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException; public boolean multiGet(String tableName, byte[] hashKey, List<byte[]> sortKeys, List<Pair<byte[], byte[]>> values) throws PException;
注:
multiGet还有另外一个版本的接口,可以支持SortKey的范围查询和条件过滤,只读取满足特定条件的数据。并且从1.8.0开始在MultiGetOptions中增加了reverse参数,支持逆向扫描数据。
public enum FilterType { FT_NO_FILTER(0), FT_MATCH_ANYWHERE(1), // match filter string at any position FT_MATCH_PREFIX(2), // match filter string at prefix FT_MATCH_POSTFIX(3); // match filter string at postfix } public class MultiGetOptions { public boolean startInclusive = true; // if the startSortKey is included public boolean stopInclusive = false; // if the stopSortKey is included public FilterType sortKeyFilterType = FilterType.FT_NO_FILTER; // filter type for sort key public byte[] sortKeyFilterPattern = null; // filter pattern for sort key public boolean noValue = false; // only fetch hash_key and sort_key, but not fetch value public boolean reverse = false; // if search in reverse direction } /** * Get multiple key-values under the same hashKey with sortKey range limited. * @param tableName table name * @param hashKey used to decide which partition the key may exist * should not be null or empty. * @param startSortKey the start sort key. * null means "". * @param stopSortKey the stop sort key. * null or "" means fetch to the last sort key. * @param options multi-get options. * @param maxFetchCount max count of kv pairs to be fetched * maxFetchCount <= 0 means no limit. default value is 100 * @param maxFetchSize max size of kv pairs to be fetched. * maxFetchSize <= 0 means no limit. default value is 1000000. * @param values output values; if sortKey is not found, then it will not appear in values. * the returned sortKey is just the same one in incoming sortKeys. * @return true if all data is fetched; false if only partial data is fetched. * @throws PException */ public boolean multiGet(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException; public boolean multiGet(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, List<Pair<byte[], byte[]>> values) throws PException;
注:
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的。
/** * Batch get multiple values under the same hash key. * Will terminate immediately if any error occurs. * @param tableName table name * @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all * sortKeys under the hashKey. * @param values output values; should be created by caller; if succeed, the size of values will * be same with keys; the data for keys[i] is stored in values[i]. * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchMultiGet(String tableName, List<Pair<byte[], List<byte[]>>> keys, List<HashKeyData> values) throws PException;
注:
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiGet不同的是,无论请求成功还是失败,它都会等待所有请求结束。
用户可以根据results中的PException是否设置来判断请求成功还是失败,并可以选择只使用成功的结果。
/** * Batch get multiple values under the same hash key. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all * sortKeys under the hashKey. * @param results output results; should be created by caller; after call done, the size of results will * be same with keys; the results[i] is a Pair: * - if Pair.left != null : means query keys[i] failed, Pair.left is the exception. * - if Pair.left == null : means query keys[i] succeed, Pair.right is the result value. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchMultiGet2(String tableName, List<Pair<byte[], List<byte[]>>> keys, List<Pair<PException, HashKeyData>> results) throws PException;
注:
写单行数据。
/** * Set value. * @param tableName TableHandler name * @param hashKey used to decide which partition to put this k-v, * if null or length == 0, means hash key is "". * @param sortKey all the k-v under hashKey will be sorted by sortKey, * if null or length == 0, means sort key is "". * @param value should not be null * @param ttl_seconds time to live in seconds, * 0 means no ttl. default value is 0. * @throws PException */ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, int ttl_seconds) throws PException; public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value) throws PException;
注:
写一批数据,对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。
/** * Batch set lots of values. * @param tableName TableHandler name * @param items list of items. * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchSet(String tableName, List<SetItem> items) throws PException;
注:
对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchSet不同的是,无论请求成功还是失败,它都会等待所有请求结束。
用户可以根据results中的PException是否设置来判断请求成功还是失败,并可以选择只使用成功的结果。
/** * Batch set lots of values. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param items list of items. * @param results output results; should be created by caller; after call done, the size of results will * be same with items; the results[i] is a PException: * - if results[i] != null : means set items[i] failed, results[i] is the exception. * - if results[i] == null : means set items[i] succeed. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchSet2(String tableName, List<SetItem> items, List<PException> results) throws PException;
注:
写同一HashKey下的多行数据。
/** * Set multiple value under the same hash key. * @param tableName table name * @param hashKey used to decide which partition to put this k-v, * should not be null or empty. * @param values all <sortkey,value> pairs to be set, * should not be null or empty. * @param ttl_seconds time to live in seconds, * 0 means no ttl. default value is 0. * @throws PException */ public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>> values, int ttl_seconds) throws PException; public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>> values) throws PException;
注:
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。
/** * Batch set multiple value under the same hash key. * Will terminate immediately if any error occurs. * @param tableName TableHandler name * @param items list of items. * @param ttl_seconds time to live in seconds, * 0 means no ttl. default value is 0. * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchMultiSet(String tableName, List<HashKeyData> items, int ttl_seconds) throws PException; public void batchMultiSet(String tableName, List<HashKeyData> items) throws PException;
注:
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiSet不同的是,无论请求成功还是失败,它都会等待所有请求结束。
/** * Batch set multiple value under the same hash key. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param items list of items. * @param ttl_seconds time to live in seconds, * 0 means no ttl. default value is 0. * @param results output results; should be created by caller; after call done, the size of results will * be same with items; the results[i] is a PException: * - if results[i] != null : means set items[i] failed, results[i] is the exception. * - if results[i] == null : means set items[i] succeed. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchMultiSet2(String tableName, List<HashKeyData> items, int ttl_seconds, List<PException> results) throws PException; public int batchMultiSet2(String tableName, List<HashKeyData> items, List<PException> results) throws PException;
注:
删单行数据。
/** * Delete value. * @param tableName TableHandler name * @param hashKey used to decide which partition to put this k-v, * if null or length == 0, means hash key is "". * @param sortKey all the k-v under hashKey will be sorted by sortKey, * if null or length == 0, means sort key is "". * @throws PException */ public void del(String tableName, byte[] hashKey, byte[] sortKey) throws PException;
注:
删除一批数据,对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。
/** * Batch delete values of different keys. * Will terminate immediately if any error occurs. * @param tableName table name * @param keys hashKey and sortKey pair list. * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchDel(String tableName, List<Pair<byte[], byte[]>> keys) throws PException;
注:
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchDel不同的是,无论请求成功还是失败,它都会等待所有请求结束。
用户可以根据results中的PException是否设置来判断请求成功还是失败,并可以选择只使用成功的结果。
/** * Batch delete values of different keys. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param keys hashKey and sortKey pair list. * @param results output results; should be created by caller; after call done, the size of results will * be same with keys; the results[i] is a PException: * - if results[i] != null : means del keys[i] failed, results[i] is the exception. * - if results[i] == null : means del keys[i] succeed. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchDel2(String tableName, List<Pair<byte[], byte[]>> keys, List<PException> results) throws PException;
注:
删同一HashKey下的多行数据。
/** * Delete specified sort keys under the same hash key. * @param tableName table name * @param hashKey used to decide which partition to put this k-v, * should not be null or empty. * @param sortKeys specify sort keys to be deleted. * should not be empty. * @throws PException */ public void multiDel(String tableName, byte[] hashKey, List<byte[]> sortKeys) throws PException;
注:
对multiDel函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。
/** * Batch delete specified sort keys under the same hash key. * Will terminate immediately if any error occurs. * @param tableName table name * @param keys List{hashKey,List{sortKey}} * @throws PException throws exception if any error occurs. * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public void batchMultiDel(String tableName, List<Pair<byte[], List<byte[]>>> keys) throws PException;
注:
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiDel不同的是,无论请求成功还是失败,它都会等待所有请求结束。
用户可以根据results中的PException是否设置来判断请求成功还是失败,并可以选择只使用成功的结果。
/** * Batch delete specified sort keys under the same hash key. * Will wait for all requests done even if some error occurs. * @param tableName table name * @param keys List{hashKey,List{sortKey}} * @param results output results; should be created by caller; after call done, the size of results will * be same with keys; the results[i] is a PException: * - if results[i] != null : means del keys[i] failed, results[i] is the exception. * - if results[i] == null : means del keys[i] succeed. * @return succeed count. * @throws PException * * Notice: the method is not atomic, that means, maybe some keys succeed but some keys failed. */ public int batchMultiDel2(String tableName, List<Pair<byte[], List<byte[]>>> keys, List<PException> results) throws PException;
注:
删除同一HashKey下,SortKey值在startSortKey和stopSortKey范围内的数据。删除过程中若发生错误,不影响已经删除的数据,同时会标记该范围内未删除的第一个SortKey。
/** * Delete key-values within range of startSortKey and stopSortKey under hashKey. Will terminate * immediately if any error occurs. * * @param tableName table name * @param hashKey used to decide which partition the key may exist, should not be null or empty. * @param startSortKey the start sort key. null or "" means fetch to the first sort key. * @param stopSortKey the stop sort key. null or "" means fetch to the last sort key. * @param options del range options. * @throws PException throws exception if any error occurs. */ public void delRange(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey,DelRangeOptions options) throws PException; public class DelRangeOptions { public byte[] nextSortKey = null; public boolean startInclusive = true; // whether the startSortKey is included public boolean stopInclusive = false; // whether the stopSortKey is included public FilterType sortKeyFilterType = FilterType.FT_NO_FILTER; // filter type for sort key public byte[] sortKeyFilterPattern = null; // filter pattern for sort key }
注:
单行原子增(减)操作。详细说明参见单行原子操作。
该操作先将key所指向的value的字节串转换为int64类型(实现上类似于Java的Long.parseLong()函数),然后加上increment,将结果转换为字节串设置为新值。
当参数increment为正数时,即原子加;当参数increment为负数时,即原子减。
/** * Atomically increment value. * * @param tableName the table name. * @param hashKey the hash key to increment. * @param sortKey the sort key to increment. * @param increment the increment to be added to the old value. * @return the new value. * @throws PException throws exception if any error occurs. */ public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment) throws PException;
注:
从Pegasus Server v1.11.1版本开始支持在incr操作时修改TTL,需使用Pegasus Java Client 1.11.2-thrift-0.11.0-inlined-release及以上版本来使用这个功能。
/** * Atomically increment value. * * @param tableName the table name. * @param hashKey the hash key to increment. * @param sortKey the sort key to increment. * @param increment the increment to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. * should be no less than -1. for the second method, the ttlSeconds is 0. * - if ttlSeconds == 0, the semantic is the same as redis: * - normally, increment will preserve the original ttl. * - if old data is expired by ttl, then set initial value to 0 and set no ttl. * - if ttlSeconds > 0, then update with the new ttl if increment succeed. * - if ttlSeconds == -1, then update to no ttl if increment succeed. * @return the new value. * @throws PException throws exception if any error occurs. */ public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds) throws PException; public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment) throws PException;
注:
单HashKey数据的原子CAS操作(可以理解为单行原子操作)。详细说明参见单行原子操作。
该操作先对某个SortKey(称之为CheckSortKey)的value做条件检查:
CheckSortKey和SetSortKey可以相同也可以不同。
用户还可以设置CheckAndSetOptions.returnCheckValue
来获取CheckSortKey对应的value。如果CheckSortKey和SetSortKey相同并且set成功,则获取set之前的旧值。
public enum CheckType { CT_NO_CHECK(0), // appearance CT_VALUE_NOT_EXIST(1), // value is not exist CT_VALUE_NOT_EXIST_OR_EMPTY(2), // value is not exist or value is empty CT_VALUE_EXIST(3), // value is exist CT_VALUE_NOT_EMPTY(4), // value is exist and not empty // match CT_VALUE_MATCH_ANYWHERE(5), // operand matches anywhere in value CT_VALUE_MATCH_PREFIX(6), // operand matches prefix in value CT_VALUE_MATCH_POSTFIX(7), // operand matches postfix in value // bytes compare CT_VALUE_BYTES_LESS(8), // bytes compare: value < operand CT_VALUE_BYTES_LESS_OR_EQUAL(9), // bytes compare: value <= operand CT_VALUE_BYTES_EQUAL(10), // bytes compare: value == operand CT_VALUE_BYTES_GREATER_OR_EQUAL(11), // bytes compare: value >= operand CT_VALUE_BYTES_GREATER(12), // bytes compare: value > operand // int compare: first transfer bytes to int64; then compare by int value CT_VALUE_INT_LESS(13), // int compare: value < operand CT_VALUE_INT_LESS_OR_EQUAL(14), // int compare: value <= operand CT_VALUE_INT_EQUAL(15), // int compare: value == operand CT_VALUE_INT_GREATER_OR_EQUAL(16), // int compare: value >= operand CT_VALUE_INT_GREATER(17); // int compare: value > operand } public class CheckAndSetOptions { public int setValueTTLSeconds = 0; // time to live in seconds of the set value, 0 means no ttl. public boolean returnCheckValue = false; // if return the check value in results. } public class CheckAndSetResult { /** * return value for checkAndSet * * @param setSucceed true if set value succeed. * @param checkValueReturned true if the check value is returned. * @param checkValueExist true if the check value is exist; can be used only when checkValueReturned is true. * @param checkValue return the check value if exist; can be used only when checkValueExist is true. */ boolean setSucceed; boolean checkValueReturned; boolean checkValueExist; byte[] checkValue; } /** * Atomically check and set value by key. * If the check condition is satisfied, then apply to set value. * * @param tableName the table name. * @param hashKey the hash key to check and set. * @param checkSortKey the sort key to check. * @param checkType the check type. * @param checkOperand the check operand. * @param setSortKey the sort key to set value if check condition is satisfied. * @param setValue the value to set if check condition is satisfied. * @param options the check-and-set options. * @return CheckAndSetResult * @throws PException throws exception if any error occurs. */ public PegasusTableInterface.CheckAndSetResult checkAndSet(String tableName, byte[] hashKey, byte[] checkSortKey, CheckType checkType, byte[] checkOperand, byte[] setSortKey, byte[] setValue, CheckAndSetOptions options) throws PException;
注:
checkValueReturned=true
时有意义。checkValueExist=true
时有意义。int compare
类型的操作,且CheckOperand或者CheckValue转换为int64时出错,譬如不是合法的数字或者超出int64范围。checkAndMutate是checkAndSet的扩展版本:checkAndSet只允许set一个值,而checkAndMutate允许在单个原子操作中set或者del多个值。该接口从Pegasus Java Client 1.11.0-thrift-0.11.0-inlined-release版本开始提供。
为此,我们提供了一个包装类Mutations,用户可以预先设置需要实施的set或者del操作。
class CheckAndMutateResult { /** * return value for checkAndMutate * * @param mutateSucceed true if mutate succeed. * @param checkValueReturned true if the check value is returned. * @param checkValueExist true if the check value is exist; can be used only when * checkValueReturned is true. * @param checkValue return the check value if exist; can be used only when checkValueExist is * true. */ public boolean mutateSucceed; public boolean checkValueReturned; public boolean checkValueExist; public byte[] checkValue; } /** * atomically check and mutate by key, async version. if the check condition is satisfied, then * apply to mutate. * * @param hashKey the hash key to check and mutate. * @param checkSortKey the sort key to check. * @param checkType the check type. * @param checkOperand the check operand. * @param mutations the list of mutations to perform if check condition is satisfied. * @param options the check-and-mutate options. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be * used. * @return the future for current op * <p>Future return: On success: return CheckAndMutateResult. On failure: a throwable, which * is an instance of PException * <p>Thread safety: All the listeners for the same table are guaranteed to be dispatched in * the same thread, so all the listeners for the same future are guaranteed to be executed as * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ Future<CheckAndMutateResult> asyncCheckAndMutate( byte[] hashKey, byte[] checkSortKey, CheckType checkType, byte[] checkOperand, Mutations mutations, CheckAndMutateOptions options, int timeout /*ms*/);
注:
checkValueReturned=true
时有意义。checkValueExist=true
时有意义。int compare
类型的操作,且CheckOperand或者CheckValue转换为int64时出错,譬如不是合法的数字或者超出int64范围。compareExchange是checkAndSet的特化版本:
该方法语义就是:如果SortKey对应的value存在且等于期望的值,则将其设置为新值。详细说明参见单行原子操作 。
该方法与C++库中常见的atomic_compare_exchange语义基本保持一致。
public static class CompareExchangeResult { /** * return value for CompareExchange * * @param setSucceed true if set value succeed. * @param actualValue return the actual value if set value failed; null means the actual value is not exist. */ boolean setSucceed; byte[] actualValue; } /** * Atomically compare and exchange value by key. * <p> * - if the original value for the key is equal to the expected value, then update it with the desired value, * set CompareExchangeResult.setSucceed to true, and set CompareExchangeResult.actualValue to null because * the actual value must be equal to the desired value. * - if the original value for the key is not exist or not equal to the expected value, then set * CompareExchangeResult.setSucceed to false, and set the actual value in CompareExchangeResult.actualValue. * <p> * This method is very like the C++ function in {https://en.cppreference.com/w/cpp/atomic/atomic_compare_exchange}. * * @param tableName the table name. * @param hashKey the hash key to compare and exchange. * @param sortKey the sort key to compare and exchange. * @param expectedValue the value expected to be found for the key. * @param desiredValue the desired value to set if the original value for the key is equal to the expected value. * @param ttlSeconds time to live in seconds of the desired value, 0 means no ttl. * @return CompareExchangeResult * @throws PException throws exception if any error occurs. */ public PegasusTableInterface.CompareExchangeResult compareExchange(String tableName, byte[] hashKey, byte[] sortKey, byte[] expectedValue, byte[] desiredValue, int ttlSeconds) throws PException;
注:
获取单行数据的TTL时间。TTL表示Time To Live,表示该数据还能存活多久。如果超过存活时间,数据就读不到了。
/** * Get ttl time. * @param tableName TableHandler name * @param hashKey used to decide which partition to put this k-v, * if null or length == 0, means hash key is "". * @param sortKey all the k-v under hashKey will be sorted by sortKey, * if null or length == 0, means sort key is "". * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. * @throws PException */ public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PException;
注:
检查数据是否存在。
/** * Check value exist by key from the cluster * @param tableName TableHandler name * @param hashKey used to decide which partition the key may exist. * @param sortKey all keys under the same hashKey will be sorted by sortKey * * @return true if exist, false if not exist * @throws PException */ public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException;
注:
获取某个HashKey下所有SortKey的个数。
/** * @param tableName TableHandler name * @param hashKey used to decide which partition the key may exist. * @return the count result for the hashKey * @throws PException */ public long sortKeyCount(String tableName, byte[] hashKey) throws PException;
注:
获取某个HashKey下SortKey列表。
/** * Get multiple sort keys under the same hash key. * @param tableName table name * @param hashKey used to decide which partition to put this k-v, * should not be null or empty. * @param maxFetchCount max count of k-v pairs to be fetched. * max_fetch_count <= 0 means no limit. default value is 100. * @param maxFetchSize max size of k-v pairs to be fetched. * max_fetch_size <= 0 means no limit. default value is 1000000. * @param sortKeys output sort keys. * @return true if all data is fetched; false if only partial data is fetched. * @throws PException */ public boolean multiGetSortKeys(String tableName, byte[] hashKey, int maxFetchCount, int maxFetchSize, List<byte[]> sortKeys) throws PException; public boolean multiGetSortKeys(String tableName, byte[] hashKey, List<byte[]> sortKeys) throws PException;
注:
获取遍历某个HashKey下所有数据的迭代器,用于局部扫描。
public enum FilterType { FT_NO_FILTER(0), FT_MATCH_ANYWHERE(1), // match filter string at any position FT_MATCH_PREFIX(2), // match filter string at prefix FT_MATCH_POSTFIX(3); // match filter string at postfix } public class ScanOptions { public int timeoutMillis = 5000; // operation timeout in milli-seconds. // if timeoutMillis > 0, it is a timeout value for current op, // else the timeout value in the configuration file will be used. public int batchSize = 1000; // internal buffer batch size public boolean startInclusive = true; // if the startSortKey is included public boolean stopInclusive = false; // if the stopSortKey is included public FilterType hashKeyFilterType = FilterType.FT_NO_FILTER; // filter type for hash key public byte[] hashKeyFilterPattern = null; // filter pattern for hash key public FilterType sortKeyFilterType = FilterType.FT_NO_FILTER; // filter type for sort key public byte[] sortKeyFilterPattern = null; // filter pattern for sort key public boolean noValue = false; // only fetch hash_key and sort_key, but not fetch value } /** * Get Scanner for {startSortKey, stopSortKey} within hashKey * @param tableName TableHandler name * @param hashKey used to decide which partition to put this k-v, * @param startSortKey start sort key scan from * if null or length == 0, means start from begin * @param stopSortKey stop sort key scan to * if null or length == 0, means stop to end * @param options scan options like endpoint inclusive/exclusive * @return scanner * @throws PException */ public PegasusScannerInterface getScanner(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, ScanOptions options) throws PException;
注:
获取遍历整个表的所有数据的迭代器,用于全局扫描。
/** * Get Scanners for all data in database * @param tableName TableHandler name * @param maxSplitCount how many scanner expected * @param options scan options like batchSize * @return scanners, count of which would be no more than maxSplitCount * @throws PException */ public List<PegasusScannerInterface> getUnorderedScanners(String tableName, int maxSplitCount, ScanOptions options) throws PException;
注:
通过PegasusClientInterface::openTable()
方法获取PegasusTableInterface的对象实例:
/** * Open a table. Please notice that pegasus support two kinds of API: * 1. the client-interface way, which is provided in this class. * 2. the table-interface way, which is provided by {@link PegasusTableInterface}. * With the client-interface, you don't need to create PegasusTableInterface by openTable, so * you can access the pegasus cluster conveniently. However, the client-interface's api also has * some restrictions: * 1. we don't provide async methods in client-interface. * 2. the timeout in client-interface isn't as accurate as the table-interface. * 3. the client-interface may throw an exception when open table fails. It means that * you may need to handle this exception in every data access operation, which is annoying. * 4. You can't specify a per-operation timeout. * So we recommend you to use the table-interface. * * @param tableName the table should be exist on the server, which is created before by * the system administrator * @return the table handler * @throws PException */ public PegasusTableInterface openTable(String tableName) throws PException;
注:
使用示例:
PegasusTableInterface table = client.openTable(tableName);
PegasusTableInterface中同时提供了同步和异步的API。
同步API与PegasusClientInterface基本一致,区别在于:不用指定tableName参数;可以单独指定超时时间。
同时,openTable提供了warmup功能,用于解决表的第一次rpc调用过慢的问题,具体可参考最佳实践一节。
异步API使用Future模式,具体来说是使用的 io.netty.util.concurrent.Future (参见 https://netty.io/4.1/api/index.html )。每个异步接口的返回值都是一个Future<T>,其中T是该操作返回结果的类型。Future具有如下特性:
注意:第一次调用一个表的异步API的时候,函数返回之前可能会有一些额外延迟(典型地10ms左右),这是因为第一次调用时需要从meta-server获取表的信息和路由信息。
一个典型的异步使用样例:
// 获取Table实例 PegasusTableInterface table = client.openTable(tableName); // 发起异步调用 Future<Boolean> future = table.asyncExist(hashKey, sortKey, 0); // 设置回调函数 future.addListener( new ExistListener() { public void operationComplete(Future<Boolean> future) throws Exception { if (future.isSuccess()) { Boolean result = future.getNow(); } else { future.cause().printStackTrace(); } } } ); // 等待操作完成 future.await();
异步读单行数据。
public static interface GetListener extends GenericFutureListener<Future<byte[]>> { /** * This function will be called when listened asyncGet future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<byte[]> future) throws Exception; } /** * Get value for a specific (hashKey, sortKey) pair, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param sortKey all keys under the same hashKey will be sorted by sortKey * if null or empty, means sort key is "". * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: the got value * On failure: a throwable, which is an instance of PException * * Thread safety: * The api is thread safe. * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<byte[]> asyncGet(byte[] hashKey, byte[] sortKey, int timeout/*ms*/);
注:
异步读同一HashKey下的多行数据。
public static class MultiGetResult { /** * return value for multiGet * @param allFetched true if all data on the server are fetched; false if only partial data are fetched. * @param values the got values. If sortKey in the input sortKeys is not found, it won't be in values. * The output values are ordered by the sortKey. */ public boolean allFetched; public List<Pair<byte[], byte[]>> values; } public static interface MultiGetListener extends GenericFutureListener<Future<MultiGetResult>> { /** * This function will be called when listened asyncMultiGet future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<MultiGetResult> future) throws Exception; } /** * get multiple key-values under the same hashKey, async version * @param hashKey used to decide which partition the key may exist * should not be null or empty. * @param sortKeys try to get values of sortKeys under the hashKey * if null or empty, try to get all (sortKey,value) pairs under hashKey * @param maxFetchCount max count of kv pairs to be fetched * maxFetchCount <= 0 means no limit. default value is 100 * @param maxFetchSize max size of kv pairs to be fetched. * maxFetchSize <= 0 means no limit. default value is 1000000. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: An object of type MultiGetResult * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<MultiGetResult> asyncMultiGet(byte[] hashKey, List<byte[]> sortKeys, int maxFetchCount, int maxFetchSize, int timeout/*ms*/); public Future<MultiGetResult> asyncMultiGet(byte[] hashKey, List<byte[]> sortKeys, int timeout/*ms*/);
注:
asyncMultiGet还有另外一个版本的接口,可以支持SortKey的范围查询和条件过滤,只读取满足特定条件的数据。并且从1.8.0开始在MultiGetOptions中增加了reverse参数,支持逆向扫描数据。
/** * get multiple key-values under the same hashKey with sortKey range limited, async version * @param hashKey used to decide which partition the key may exist * should not be null or empty. * @param startSortKey the start sort key. * null means "". * @param stopSortKey the stop sort key. * null or "" means fetch to the last sort key. * @param options multi-get options. * @param maxFetchCount max count of kv pairs to be fetched * maxFetchCount <= 0 means no limit. default value is 100 * @param maxFetchSize max size of kv pairs to be fetched. * maxFetchSize <= 0 means no limit. default value is 1000000. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: An object of type MultiGetResult * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<MultiGetResult> asyncMultiGet(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, int maxFetchCount, int maxFetchSize, int timeout/*ms*/); public Future<MultiGetResult> asyncMultiGet(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, int timeout/*ms*/);
注:
异步写单行数据。
public static interface SetListener extends GenericFutureListener<Future<Void>> { /** * This function will be called when listened asyncSet future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Void> future) throws Exception; } /** * Set value for a specific (hashKey, sortKey) pair, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param sortKey all keys under the same hashKey will be sorted by sortKey * if null or empty, means sort key is "". * @param value should not be null * @param ttlSeconds time to live in seconds * 0 means no ttl, default value is 0 * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: no return * On failure: a throwable, which is an instance of PException * * Thread safety: * The api is thread safe. * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Void> asyncSet(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout/*ms*/); public Future<Void> asyncSet(byte[] hashKey, byte[] sortKey, byte[] value, int timeout/*ms*/);
注:
异步写同一HashKey下的多行数据。
public static interface MultiSetListener extends GenericFutureListener<Future<Void>> { /** * This function will be called when listened asyncMultiSet future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Void> future) throws Exception; } /** * Set key-values for a specific hashKey, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param values all (sortKey, value) pairs * should not be null or empty * @param ttlSeconds time to live in seconds * 0 means no ttl, default value is 0 * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: no return * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Void> asyncMultiSet(byte[] hashKey, List<Pair<byte[], byte[]>> values, int ttlSeconds, int timeout/*ms*/); public Future<Void> asyncMultiSet(byte[] hashKey, List<Pair<byte[], byte[]>> values, int timeout/*ms*/);
注:
异步删单行数据。
public static interface DelListener extends GenericFutureListener<Future<Void>> { /** * This function will be called when listened asyncDel future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Void> future) throws Exception; } /** * delete value for a specific (hashKey, sortKey) pair, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param sortKey all keys under the same hashKey will be sorted by sortKey * if null or empty, means sort key is "". * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: no return * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Void> asyncDel(byte[] hashKey, byte[] sortKey, int timeout/*ms*/);
注:
异步删同一HashKey下的多行数据。
public static interface MultiDelListener extends GenericFutureListener<Future<Void>> { /** * This function will be called when listened asyncMultiDel future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Void> future) throws Exception; } /** * delete mutiple values for a specific hashKey, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param sortKeys all the sortKeys need to be deleted * should not be null or empty * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: no return * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Void> asyncMultiDel(byte[] hashKey, List<byte[]> sortKeys, int timeout/*ms*/);
注:
原子增(减)操作。incr的异步版本。
public static interface IncrListener extends GenericFutureListener<Future<Long>> { /** * This function will be called when listened asyncIncr future is done. * * @param future the listened future * @throws Exception throw exception if any error occurs. * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Long> future) throws Exception; } /** * atomically increment value by key, async version * * @param hashKey the hash key to increment. * @param sortKey the sort key to increment. * @param increment the increment to be added to the old value. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * @return the future for current op * <p> * Future return: * On success: return new value. * On failure: a throwable, which is an instance of PException * <p> * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Long> asyncIncr(byte[] hashKey, byte[] sortKey, long increment, int timeout/*ms*/);
注:
单HashKey数据的原子CAS操作。checkAndSet的异步版本。
public static class CheckAndSetResult { /** * return value for checkAndSet * * @param setSucceed true if set value succeed. * @param checkValueReturned true if the check value is returned. * @param checkValueExist true if the check value is exist; can be used only when checkValueReturned is true. * @param checkValue return the check value if exist; can be used only when checkValueExist is true. */ boolean setSucceed; boolean checkValueReturned; boolean checkValueExist; byte[] checkValue; } public static interface CheckAndSetListener extends GenericFutureListener<Future<CheckAndSetResult>> { /** * This function will be called when listened asyncCheckAndSet future is done. * * @param future the listened future * @throws Exception throw exception if any error occurs. * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<CheckAndSetResult> future) throws Exception; } /** * atomically check and set value by key, async version. * if the check condition is satisfied, then apply to set value. * * @param hashKey the hash key to check and set. * @param checkSortKey the sort key to check. * @param checkType the check type. * @param checkOperand the check operand. * @param setSortKey the sort key to set value if check condition is satisfied. * @param setValue the value to set if check condition is satisfied. * @param options the check-and-set options. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * @return the future for current op * <p> * Future return: * On success: return CheckAndSetResult. * On failure: a throwable, which is an instance of PException * <p> * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<CheckAndSetResult> asyncCheckAndSet(byte[] hashKey, byte[] checkSortKey, CheckType checkType, byte[] checkOperand, byte[] setSortKey, byte[] setValue, CheckAndSetOptions options, int timeout/*ms*/);
注:
单HashKey数据的原子CAS操作。compareExchange的异步版本。
public static class CompareExchangeResult { /** * return value for CompareExchange * * @param setSucceed true if set value succeed. * @param actualValue return the actual value if set value failed; null means the actual value is not exist. */ boolean setSucceed; byte[] actualValue; } public static interface CompareExchangeListener extends GenericFutureListener<Future<CompareExchangeResult>> { /** * This function will be called when listened asyncCompareExchange future is done. * * @param future the listened future * @throws Exception throw exception if any error occurs. * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<CompareExchangeResult> future) throws Exception; } /** * atomically compare and exchange value by key, async version. * <p> * - if the original value for the key is equal to the expected value, then update it with the desired value, * set CompareExchangeResult.setSucceed to true, and set CompareExchangeResult.actualValue to null because * the actual value must be equal to the desired value. * - if the original value for the key is not exist or not equal to the expected value, then set * CompareExchangeResult.setSucceed to false, and set the actual value in CompareExchangeResult.actualValue. * <p> * this method is very like the C++ function in {https://en.cppreference.com/w/cpp/atomic/atomic_compare_exchange}. * * @param hashKey the hash key to compare and exchange. * @param sortKey the sort key to compare and exchange. * @param expectedValue the value expected to be found for the key. * @param desiredValue the desired value to set if the original value for the key is equal to the expected value. * @param ttlSeconds time to live in seconds of the desired value, 0 means no ttl. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * @return the future for current op * <p> * Future return: * On success: return CompareExchangeResult. * On failure: a throwable, which is an instance of PException * <p> * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<CompareExchangeResult> asyncCompareExchange(byte[] hashKey, byte[] sortKey, byte[] expectedValue, byte[] desiredValue, int ttlSeconds, int timeout/*ms*/);
注:
异步获取单行数据的TTL时间,即该数据还能存活多久,单位为秒。
public static interface TTLListener extends GenericFutureListener<Future<Integer>> { /** * This function will be called when listened asyncTTL future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Integer> future) throws Exception; } /** * get TTL value for a specific (hashKey, sortKey) pair, async version * @param hashKey used to decide which partition the key may exist * if null or empty, means hash key is "". * @param sortKey all keys under the same hashKey will be sorted by sortKey * if null or empty, means sort key is "". * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: ttl time in seconds; -1 if no ttl set; -2 if not exist. * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Integer> asyncTTL(byte[] hashKey, byte[] sortKey, int timeout/*ms*/);
注:
异步检查数据是否存在。
public static interface ExistListener extends GenericFutureListener<Future<Boolean>> { /** * This function will be called when listened asyncExist future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Boolean> future) throws Exception; } /** * Check value existence for a specific (hashKey, sortKey) pair of current table, async version * @param hashKey used to decide which partition the key may exist * if null or length == 0, means hash key is "". * @param sortKey all keys under the same hashKey will be sorted by sortKey * if null or length == 0, means sort key is "". * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return A future for current op. * * Future return: * On success: true if exist, false if not exist * On failure: a throwable, which is an instance of PException * * Thread safety: * The api is thread safe. * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Boolean> asyncExist(byte[] hashKey, byte[] sortKey, int timeout/*ms*/);
注:
异步获取某个HashKey下所有SortKey的个数。
public static interface SortKeyCountListener extends GenericFutureListener<Future<Long>> { /** * This function will be called when listened asyncSortKeyCount future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<Long> future) throws Exception; } /** * Count the sortkeys for a specific hashKey, async version * @param hashKey used to decide which partition the key may exist * should not be null or empty * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: the count result for the hashKey * On failure: a throwable, which is an instance of PException * * Thread safety: * The api is thread safe. * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<Long> asyncSortKeyCount(byte[] hashKey, int timeout/*ms*/);
注:
异步获取某个HashKey下SortKey列表。
public static class MultiGetSortKeysResult { /** * return value for multiGetSortkeys * @param allFetched true if all data on the server are fetched; false if only partial data are fetched. * @param keys the got keys. * The output keys are in order. */ public boolean allFetched; public List<byte[]> keys; }; public static interface MultiGetSortKeysListener extends GenericFutureListener<Future<MultiGetSortKeysResult>> { /** * This function will be called when listened asyncMultiGetSortKeys future is done. * @param future the listened future * @throws Exception * * Notice: User shouldn't do any operations that may block or time-consuming */ @Override public void operationComplete(Future<MultiGetSortKeysResult> future) throws Exception; } /** * get all the sortKeys for the same hashKey * @param hashKey used to decide which partition the key may exist * should not be null or empty. * @param maxFetchCount max count of kv pairs to be fetched * maxFetchCount <= 0 means no limit. default value is 100 * @param maxFetchSize max size of kv pairs to be fetched. * maxFetchSize <= 0 means no limit. default value is 1000000. * @param timeout how long will the operation timeout in milliseconds. * if timeout > 0, it is a timeout value for current op, * else the timeout value in the configuration file will be used. * * @return the future for current op * * Future return: * On success: An object of type MultiGetSortKeysResult * On failure: a throwable, which is an instance of PException * * Thread safety: * All the listeners for the same table are guaranteed to be dispatched in the same thread, so all the * listeners for the same future are guaranteed to be executed as the same order as the listeners added. * But listeners for different tables are not guaranteed to be dispatched in the same thread. */ public Future<MultiGetSortKeysResult> asyncMultiGetSortKeys(byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout/*ms*/); public Future<MultiGetSortKeysResult> asyncMultiGetSortKeys(byte[] hashKey, int timeout/*ms*/);
注:
在scan操作时,同步获取下一条数据。
/** * Get the next item. * @return item like <<hashKey, sortKey>, value>; null returned if scan completed. * @throws PException */ public Pair<Pair<byte[], byte[]>, byte[]> next() throws PException;
注:
在scan操作时,异步获取下一条数据。
/** * Get the next item asynchronously. * @return A future for current op. * * Future return: * On success: if scan haven't reach the end then return the kv-pair, else return null. * On failure: a throwable, which is an instance of PException. */ public Future<Pair<Pair<byte[], byte[]>, byte[]>> asyncNext();
注:
表名不存在。可能原因:
meta_servers
,看集群的配置是否正确。initialize table handler
,看表名是否正确。访问超时。可能原因:
服务端状态出错。可能原因:
服务端流控达到限制。原因是:
ERR_BUSY
错误码。经常有业务有集中灌数据的场景,灌数据的过程可能是单机的也可能是分布式的,譬如使用Spark处理后将数据写入Pegasus中。
如果不做流控,很可能产生很高的QPS峰值写,对Pegasus系统造成较大压力:
因此,强烈建议业务方在灌数据的时候对写QPS进行流量控制。
客户端流控的思路就是:
FlowController用法:
使用方法很简单:
FlowController cntl = new FlowController(qps); while (true) { // call getToken before operation cntl.getToken(); client.set(...); } cntl.stop();
在分布式灌数据的场景下,用户可以先确定分布式的Task并发数,然后通过总QPS限制 / Task并发数
,得到单个Task的QPS限制,再使用FlowController进行控制。
类似实现网页列表的分页功能。 典型地,一个HashKey下有很多SortKey,一页只显示固定数量的SortKey,下一页时再显示接下来的固定数量的SortKey。
分页查询在Pegasus下有多种实现方式:
使用 getScanner
接口:
ScanOptions options = new ScanOptions(); options.startInclusive = true; options.stopInclusive = false; options.batchSize = 20; // 限制每页的大小为 20 byte[] startSortKey = null; byte[] stopSortKey = null; PegasusScannerInterface scanner = client.getScanner(tableName, hashKey, startSortKey, stopSortKey, options); // 同步方式获取 Pair<Pair<byte[], byte[]>, byte[]> item; while ((item = scanner.next()) != null) { // ... // } // 异步方式获取 Future<Pair<Pair<byte[], byte[]>, byte[]>> item; while (true) { item = scanner.asyncNext(); try { Pair<Pair<byte[], byte[]>, byte[]> pair = item.get(); if (pair == null) { break; } // ... // } catch (Exception e) { e.printStackTrace(); } }
如果你使用 multiGet
,在 MultiGetOptions
中还需设置 maxFetchCount
,限制每页条数:
// 查第一页 MultiGetOptions options = new MultiGetOptions(); options.startInclusive = true; options.stopInclusive = false; int maxFetchCount = 20; // 限制每页的大小为 20 int maxFetchSize = 20000; // 限制每页的总字节数为 20000 byte[] startSortKey = null; byte[] stopSortKey = null; List<Pair<byte[], byte[]>> values = new ArrayList<>(); boolean allFetched = client.multiGet( tableName, hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, values); if (allFetched) { return; } // ... // // 查下一页 options.startInclusive = false; options.stopInclusive = false; startSortKey = values.get(values.size() - 1); // 以上一页的最后(最大)一个值作为下一页查询的开始 stopSortKey = null; allFetched = client.multiGet( tableName, hashKey, startSortKey, stopSortKey, options, maxFetchCount, maxFetchSize, values); if (allFetched) { return; }
逆序分页需要使用multiGet
接口,并在选项中设置reverse=true
。
MultiGetOptions options = new MultiGetOptions(); options.startInclusive = true; options.stopInclusive = false; options.reverse = true;
Pegasus的key和value都是原始的字节串(Java中就是byte[]),而用户存储数据一般用struct或者class。因此,在将数据存储到Pegasus中时,需要将用户数据转化为字节串,这就是序列化;在从Pegasus中读取数据时,又需要将字节串转化为用户的数据结构,这就是反序列化。序列化和反序列化通常都是成对出现了,后面我们只描述序列化。
通常序列化有这些方式:
对于Thrift结构,使用tcompact协议进行序列化的样例:
import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; TSerializer serializer = new TSerializer(new TCompactProtocol.Factory()); byte[] bytes = serializer.serialize(data);
对于value较大(>=2kb)的业务,我们推荐在客户端使用facebook/Zstandard压缩算法(简称 Zstd)对数据进行压缩,以减少value的数据长度,提升Pegasus的服务稳定性和读写性能。Zstd算法在压缩比和压缩速率上取得较好的平衡,适合通用场景。
从Java Client 1.11.3版本开始,我们提供了Zstd压缩工具类com.xiaomi.infra.pegasus.tools.ZstdWrapper,方便用户实现压缩功能。
使用示例:
byte[] value = "xxx"; // write the record into pegasus table.set("h".getBytes(), "s".getBytes(), ZstdWrapper.compress(value), 1000); // read the record from pegasus byte[] compressedBuf = table.get("h".getBytes(), "s".getBytes(), 1000); // decompress the value byte[] orginalValue = ZstdWrapper.decompress(compressedBuf);
也可以参考测试用例代码 TestZstdWrapper.java。
以上两个优化 数据序列化 和 数据压缩 可以在客户端同时使用,都是用客户端的CPU换取Pegasus集群的稳定性和读写性能。在通常情况下这都是值得的。
有时候,业务方在开始使用Pegasus的时候,没有采用客户端压缩,但是在使用一段时间后,发现单条数据的value比较大,希望能通过压缩的办法改进性能。可以分两步:
对于已经存在的表,原来没有采用客户端压缩,如何快速评估采用客户端压缩后有多大收益?
原料:
${user_cluster_meta_list}
,其中用户表为user_table。${test_cluster_meta_list}
。src/shell/config.ini
,添加访问test_cluster集群的配置项。pegasus.properties
,设置meta_servers = ${test_cluster_meta_list}
。步骤:
./run.sh shell --cluster ${test_cluster_meta_list} >>> create user_table_no_compress -p 8 -r 3 >>> create user_table_zstd_compress -p 8 -r 3
./run.sh shell --cluster ${user_cluster_meta_list} >>> use user_table >>> copy_data -c test_cluster -a user_table_no_compress
./PegasusCli file://./pegasus.properties user_table_no_compress \ copy_data file://./pegasus.properties user_table_zstd_compress none zstd
./run.sh shell --cluster ${test_cluster_meta_list} >>> use user_table_no_compress >>> count_data -a >>> use user_table_zstd_compress >>> count_data -a
业务表原来已经有未压缩的数据,如果应用了客户端压缩,写入新的已压缩的数据,但是hashKey和sortKey保持不变,就会出现未压缩数据和已压缩数据混合存在的情况:有的value存储的是未压缩的数据,有的value存储的是已压缩的数据。
这就要求业务端在读数据的时候保证兼容性:既能读取未压缩的数据,又能读取已压缩的数据。
基于未压缩的数据采用zstd进行解压缩时基本都会失败这一事实,业务端读取的逻辑可以这样:
示例代码:
// decompress the value byte[] decompressedValue = null; try { decompressedValue = ZstdWrapper.decompress(value); } catch (PException e) { // decompress fail decompressedValue = value; }
与此同时,可以使用后台工具将未压缩数据逐渐替换掉为已压缩数据,并在替换过程中保证数据的一致性:扫描表,逐条读取数据,如果数据是未压缩的,则将其转换为已压缩的,使用check_and_set原子操作进行数据替换。
我们提供了提供了客户端连接预热(warmup)功能,在进行openTable时提前拉取路由表并建立连接。这样可以避免在该表的第一次rpc调用时,由于执行上述步骤而导致的该次rpc调用过慢的问题。
示例代码:
PegasusTableInterface table = client.openTable(tableName);