项目地址:Pegasus scala client
下载:
git clone https://github.com/XiaoMi/pegasus-scala-client.git cd pegasus-scala-client
选择所使用的版本并构建,建议使用master版本。同时注意,scala客户端构建依赖Java客户端,请参考 获取Java客户端 在项目中添加Java依赖。你可以打包成Jar包进行使用:
sbt package
或者,安装到本地的sbt repository,方便在sbt项目中使用:
sbt publish-local
或者,安装到本地的maven repository:
sbt publish-m2
项目默认使用scala-2.11进行构建,打包发布时则同时发布2.11版本(pegasus-scala-client_2.11)和2.12版本(pegasus-scala-client_2.12),如果你的项目使用sbt构建,则可配置为:
//使用sbt仓库,不需要添加后缀,默认使用当前scala版本号,即使用2.12 scalaVersion := "2.12.8" libraryDependencies ++= Seq( "com.xiaomi.infra" %% "pegasus-scala-client" % "1.11.4-1-SNAPSHOT" )
或者配置为:
//使用maven仓库(你可以使用resolvers ++= Seq()添加自定义maven仓库),需要添加后缀 scalaVersion := "2.12.8" libraryDependencies ++= Seq( "com.xiaomi.infra" % "pegasus-scala-client_2.11" % "1.11.4-1-SNAPSHOT" )
如果你的项目通过maven构建,则可通过maven配置在项目中使用,例如:
<dependency> <groupId>com.xiaomi.infra</groupId> <artifactId>pegasus-scala-client_2.11</artifactId> <version>1.11.4-1</version> </dependency>
通过指定server配置信息获取实例,Scala提供两种获取实例的接口:
1、文件路径作为配置参数: 参见 Java客户端文件配置
def createClient(configPath: String): ScalaPegasusClient
例如:
val pegasusClient = ScalaPegasusClientFactory.createClient("resource:///pegasus.properties")
2、Properties对象作为配置:
def createClient(props: Properties): ScalaPegasusClient
例如:
Properties pegasusConfig = new Properties(); pegasusConfig.setProperty("meta_servers", "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"); pegasusConfig.setProperty("operation_timeout", 100); val pegasusClient = ScalaPegasusClientFactory.createClient(pegasusConfig)
注意:调用函数前请确认导入Serializers._
,详情参阅实现原理
val hashKey = 12345L pegasusClient.set(table, hashKey, "sort_1", "value_1") val value = pegasusClient.get(table, hashKey, "sort_1").as[String] pegasusClient.del(table, hashKey, "sort_1") pegasusClient.exists(table, hashKey, "sort_1") pegasusClient.sortKeyCount(table, hashKey) pegasusClient.close
scala的客户端类地址为:com.xiaomi.infra.pegasus.scalaclient
,主要包括以下四个类:
类名 | 功能 |
---|---|
ScalaPegasusClientFactory | Client工厂类,用于创建Client实例 |
ScalaPegasusClient | Client类,封装了各种同步API,也可用于创建Table实例 |
ScalaPegasusTable | Table类,封装了操作单个Table数据的同步API |
ScalaPegasusAsyncTable | Table类,封装了操作单个Table数据的异步API |
用户可以选择使用Client类(ScalaPegasusClient)或者是Table类(ScalaPegasusTable或者ScalaPegasusAsyncTable)存取数据,区别如下:
ScalaPegasusClient
接口通过持有ScalaPegasusTable
实现对特定表的访问,而ScalaPegasusTable
实际是封装了Java client的接口PegasusTableInterface
而实现的。函数形式如下所示:
def get[H, S](table: String, hashKey: H, sortKey: S)(implicit hSer: SER[H], sSer: SER[S]) = { getTable(table).get(hashKey, sortKey) }
每一个数据表的操作函数都被定义为泛型函数,参数列表(table: String, hashKey: H, sortKey: S)
是实际传入的参数,同时使用隐式参数(implicit hSer: SER[H], sSer: SER[S])
完成对参数列表(table: String, hashKey: H, sortKey: S)
泛型的转换。其中SER[H]是类Serializers
的泛型声明,该类包含对不同泛型对象的隐式转换函数(转换成Java client中PegasusTableInterface
的byte[]参数
,在scala中对应为Array[Byte]
,例子展示的是当泛型在使用的时候被定义为String
时的隐式转换函数:
implicit object Utf8String extends Serializer[String] { override def serialize(obj: String): Array[Byte] = if (obj == null) null else obj.getBytes("UTF-8") override def deserialize(bytes: Array[Byte]): String = if (bytes == null) null else new String(bytes, "UTF-8") }
客户端在调用ScalaPegasusClient
提供的方法时,当对第一个参数列表的泛型参数传入String
类型变量的时候,将被自动转换为Array[Byte]
类型变量,并传入PegasusTableInterface
的对应方法中。请确保包含Serializers._
,否则无法完成参数的类型转换,你可以使用:
import com.xiaomi.infra.pegasus.scalaclient.Serializers._
导入依赖,目前接受的自动类型转换包括String
、Boolean
、Int
、Long
、Short
、Double
,这些类型可自动转换为Array[Byte]
。
判断key是否存在,参见 Java客户端文档#exist
def exists[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
return: 返回是否存在,boolean
类型
获取一个hashkey下的sortkey值,参见 Java客户端文档#sortKeyCount
def sortKeyCount[H](table: String, hashKey: H)
table:表名,通常为String
类型
hashKey:通常为String
类型
return:返回sortKeys个数,long
类型
获取一条数据,参见Java客户端文档#get
def get[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
return:返回获取值,Array[Byte]
类型,你可以使用as[String]
转换为String
类型
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchGet
def batchGet[H, S](table: String, keys: List[PegasusKey[H, S]])
table:表名,通常为String
类型
keys:PegasusKey列表,由hashKey和SortKey组成
return:返回获取值列表,PegasusResultList
类型
读取一批数据,对get函数的批量封装。该函数并发地向server发送异步请求,但与上面batchGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchGet2
def batchGet2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
table:表名,通常为String
类型
keys:PegasusKey列表,有hashKey和SortKey组成
return:返回获取值列表,PegasusResultList
类型
Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:
public boolean multiGet(String tableName, byte[] hashKey, List<byte[]> sortKeys, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;
支持最大数据量maxFetchCount
和最大数据大小maxFetchSize
的参数设置,参见Java客户端文档#multiGet
def multiGet[H, S](table: String, hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKeys:sortKey列表
maxFetchCount:最大获取数据量,这里默认为100
maxFetchSize:最大获取数据值大小,这里默认为1000000字节
return:返回获取值列表,convertMultiGetResult
类型
Java client包含多种multiGet接口,提供读同一HashKey下的多行数据功能,这里封装的是:
public boolean multiGet(String tableName, byte[] hashKey, byte[] startSortKey, byte[] stopSortKey, MultiGetOptions options, int maxFetchCount, int maxFetchSize, List<Pair<byte[], byte[]>> values) throws PException;
可以支持SortKey的范围查询和条件过滤,只读取满足特定条件的数据,参见Java客户端文档#multiGet
def multiGetRange[H, S](hashKey: H, startSortKey: S, stopSortKey: S, options: Options.MultiGet, maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli)
table:表名,通常为String
类型
hashKey:hashKey,通常为String
类型
startSortKey:sortKey范围的起始值
stopSortKey:sortKey范围的终止值
options:查询条件
maxFetchCount:最大数据量,默认为100
maxFetchSize:最大数据值大小,默认为1000000字节
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,convertMultiGetResult
类型
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常。如果抛出了异常,则values中的结果是未定义的,参见Java客户端文档#batchMultiGet
def batchMultiGet[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
keys:hashKey-sortKeys列表,如:Seq(("1",Seq("1","2")),("1",Seq("1","2")))
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,List
类型
对multiGet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiGet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiGet2
def batchMultiGet2[H, S](keys: Seq[(H, Seq[S])], timeout: Duration = 0 milli)
keys:hashKey-sortKeys列表,如:Seq(("1",Seq("1","2")),("1",Seq("1","2")))
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:返回获取值列表,List
类型
写单行数据
def set[H, S, V](hashKey: H, sortKey: S, value: V, ttl: Duration = 0 second, timeout: Duration = 0 milli)
hashKey:通常为String
类型
sortKey:通常为String
类型
value:对应key的写入值,通常为String
类型
ttl:写入值保留时间,默认为0,表示永久保留
timeout:获取数据超时时间,默认为0,表示使用配置文件中的数值
return:无返回值
写一批数据,对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchSet
def batchSet[H, S, V](table: String, items: Seq[SetItem[H, S, V]])
table:表名,通常为String
类型
items:写入值列表,由hashKey、sortKey、value组成
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)
对set函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchSet2
Java client有两种接口,提供写同一HashKey下的多行数据,这里封装的是:
public void multiSet(String tableName, byte[] hashKey, List<Pair<byte[], byte[]>> values, int ttl_seconds) throws PException;
支持数据过期时间设定
def multiSet[H, S, V](table: String, hashKey: H, values: Seq[(S, V)], ttl: Duration = 0 second)
table:表名,通常为String
类型
hashKey:通常为String
类型
value:写入值列表,由sortkey、value组成,如Seq(("hashKey1","sortKey1"),("hashKey2","sortKey2"))
ttl:写入值保留时间,默认为0,表示永久保留
return:无返回值
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiSet
def batchMultitSet[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
table:表名,通常为String
类型
items:批量写入数据列表
ttl:写入值保留时间,默认为0,表示永久保留
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)
对multiSet函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiSet不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultitSet2
def batchMultiSet2[H, S, V](table: String, items: Seq[HashKeyData[H, S, V]], ttl: Duration = 0 second)
table:表名,通常为String
类型
items:批量写入数据列表
ttl:写入值保留时间,默认为0,表示永久保留
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)
删除单行数据,参见Java客户端文档#del
def del[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortkey:通常为String
类型
return:无返回值
删除一批数据,对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchDel
batchDel[H, S](table: String, keys: Seq[PegasusKey[H, S]])
table:表名,通常为String
类型
keys:键值列表,由hashKey和sortKey组成
return:请求成功的个数(该方法不是原子的,有可能出现部分成功部分失败的情况,用户可以选择只使用成功的结果)
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchDel2
def batchDel2[H, S](table: String, keys: Seq[PegasusKey[H, S]])
删同一HashKey下的多行数据,参见Java客户端文档#multiDel
def multiDel[H, S](table: String, hashKey: H, sortKeys: Seq[S])
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKeys:sortKey列表
return:无返回值
对multiDel函数的批量封装。该函数并发地向server发送异步请求,并等待结果。如果有任意一个请求失败,就提前终止并抛出异常,参见Java客户端文档#batchMultiDel
def batchMultiDel[H, S](table: String, keys: Seq[(H, Seq[S])])
table:表名,通常为String
类型
keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
return:无返回值
对del函数的批量封装。该函数并发地向server发送异步请求,并等待结果。但与上面batchMultiDel不同的是,无论请求成功还是失败,它都会等待所有请求结束,参见Java客户端文档#batchMultiDel2
def batchMultiDel2[H, S](table: String, keys: Seq[(H, Seq[S])])
table:表名,通常为String
类型
keys:键列表,由hashKey、sortKeys组成,如Seq(("hashKey1",(sortKey1,sortKey2),("hashKey2",(sortKey3,sortKey4))
return:无返回值
获取单行数据的TTL时间。TTL表示Time To Live,表示该数据还能存活多久。如果超过存活时间,数据就读不到了,参见Java客户端文档#ttl
def ttl[H, S](table: String, hashKey: H, sortKey: S)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKeys:通常为String
类型
return:TTL时间,单位为秒。如果该数据没有设置TTL,返回-1;如果该数据不存在,返回-2
单行原子增(减)操作,详细说明参见单行原子操作,该操作先将key所指向的value的字节串转换为int64类型(实现上类似于Java的Long.parseLong()函数),然后加上increment,将结果转换为字节串设置为新值。当参数increment为正数时,即原子加;当参数increment为负数时,即原子减,参见Java客户端文档#incr。
def incr[H, S](table: String, hashKey: H, sortKey: S, increment: Long, ttl: Duration = 0 milli)
table:表名,通常为String
类型
hashKey:通常为String
类型
sortKey:通常为String
类型
increment:增加值
ttl:值保留时间,默认为0,表示永久保留
return:操作成功后的新值
ScalaPegasusTable接口提供的方法均为同步API,ScalaPegasusClient
接口即默认封装该接口,详细API信息参见ScalaPegasusClient接口
ScalaPegasusAsyncTable接口提供的方法均为异步API,封装了java client的异步接口。对应API功能可参考ScalaPegasusClient接口和Java客户端文档#PegasusTableInterface接口,接口封装形式如:
@throws[PException] def multiGet[H, S](hashKey: H, sortKeys: Seq[S], maxFetchCount: Int = 100, maxFetchSize: Int = 1000000, timeout: Duration = 0 milli) (implicit hSer: SER[H], sSer: SER[S]): Future[MultiGetResult[S, Array[Byte]]] = { val result = table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout) toScala(result)(convertMultiGetResult[S]) }
其中table.asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout)
即Java client接口,参数传递原理参见实现原理,toScala(result)(convertMultiGetResult[S])
的完整形式如下:
implicit private [scalaclient] def toScala[A, B](future: NFuture[A])(implicit f: A => B): Future[B] = { val promise = Promise[B]() future.addListener(new GenericFutureListener[NFuture[_ >: A]] { override def operationComplete(future: NFuture[_ >: A]): Unit = { if (future.isSuccess) { promise.success(f(future.get.asInstanceOf[A])) } else { promise.failure(future.cause()) } } }) promise.future }
使用隐式转换
实现Java的异步编程到Scala的异步编程变换,该函数利用io.netty.util.concurrent.{GenericFutureListener, Future => NFuture}
实现异步编程。