| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.s2graph.core.storage.hbase |
| |
| |
| |
| import java.util |
| import java.util.Base64 |
| import java.util.concurrent.{ExecutorService, Executors, TimeUnit} |
| |
| import com.stumbleupon.async.{Callback, Deferred} |
| import com.typesafe.config.Config |
| import org.apache.commons.io.FileUtils |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} |
| import org.apache.hadoop.hbase.io.compress.Compression.Algorithm |
| import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding |
| import org.apache.hadoop.hbase.regionserver.BloomType |
| import org.apache.hadoop.hbase.util.Bytes |
| import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} |
| import org.apache.hadoop.security.UserGroupInformation |
| import org.apache.s2graph.core._ |
| import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} |
| import org.apache.s2graph.core.storage._ |
| import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} |
| import org.apache.s2graph.core.types.{HBaseType, VertexId} |
| import org.apache.s2graph.core.utils._ |
| import org.hbase.async.FilterList.Operator.MUST_PASS_ALL |
| import org.hbase.async._ |
| |
| import scala.collection.JavaConversions._ |
| import scala.collection.mutable.ArrayBuffer |
| import scala.concurrent._ |
| import scala.concurrent.duration.Duration |
| import scala.util.Try |
| import scala.util.control.NonFatal |
| import scala.util.hashing.MurmurHash3 |
| |
| |
| object AsynchbaseStorage { |
| val vertexCf = Serializable.vertexCf |
| val edgeCf = Serializable.edgeCf |
| val emptyKVs = new util.ArrayList[KeyValue]() |
| |
| AsynchbasePatcher.init() |
| |
| def makeClient(config: Config, overrideKv: (String, String)*) = { |
| val asyncConfig: org.hbase.async.Config = |
| if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { |
| val krb5Conf = config.getString("java.security.krb5.conf") |
| val jaas = config.getString("java.security.auth.login.config") |
| |
| System.setProperty("java.security.krb5.conf", krb5Conf) |
| System.setProperty("java.security.auth.login.config", jaas) |
| new org.hbase.async.Config() |
| } else { |
| new org.hbase.async.Config() |
| } |
| |
| for (entry <- config.entrySet() if entry.getKey.contains("hbase")) { |
| asyncConfig.overrideConfig(entry.getKey, entry.getValue.unwrapped().toString) |
| } |
| |
| for ((k, v) <- overrideKv) { |
| asyncConfig.overrideConfig(k, v) |
| } |
| |
| val client = new HBaseClient(asyncConfig) |
| logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}") |
| client |
| } |
| |
| def shutdown(client: HBaseClient): Unit = { |
| client.shutdown().join() |
| } |
| |
| case class ScanWithRange(scan: Scanner, offset: Int, limit: Int) |
| type AsyncRPC = Either[GetRequest, ScanWithRange] |
| |
| def initLocalHBase(config: Config, |
| overwrite: Boolean = true): ExecutorService = { |
| import java.io.{File, IOException} |
| import java.net.Socket |
| |
| lazy val hbaseExecutor = { |
| val executor = Executors.newSingleThreadExecutor() |
| |
| Runtime.getRuntime.addShutdownHook(new Thread() { |
| override def run(): Unit = { |
| executor.shutdown() |
| } |
| }) |
| |
| val hbaseAvailable = try { |
| val (host, port) = config.getString("hbase.zookeeper.quorum").split(":") match { |
| case Array(h, p) => (h, p.toInt) |
| case Array(h) => (h, 2181) |
| } |
| |
| val socket = new Socket(host, port) |
| socket.close() |
| logger.info(s"HBase is available.") |
| true |
| } catch { |
| case e: IOException => |
| logger.info(s"HBase is not available.") |
| false |
| } |
| |
| if (!hbaseAvailable) { |
| // start HBase |
| executor.submit(new Runnable { |
| override def run(): Unit = { |
| logger.info(s"HMaster starting...") |
| val ts = System.currentTimeMillis() |
| val cwd = new File(".").getAbsolutePath |
| if (overwrite) { |
| val dataDir = new File(s"$cwd/storage/s2graph") |
| FileUtils.deleteDirectory(dataDir) |
| } |
| |
| System.setProperty("proc_master", "") |
| System.setProperty("hbase.log.dir", s"$cwd/storage/s2graph/hbase/") |
| System.setProperty("hbase.log.file", s"$cwd/storage/s2graph/hbase.log") |
| System.setProperty("hbase.tmp.dir", s"$cwd/storage/s2graph/hbase/") |
| System.setProperty("hbase.home.dir", "") |
| System.setProperty("hbase.id.str", "s2graph") |
| System.setProperty("hbase.root.logger", "INFO,RFA") |
| |
| org.apache.hadoop.hbase.master.HMaster.main(Array[String]("start")) |
| logger.info(s"HMaster startup finished: ${System.currentTimeMillis() - ts}") |
| } |
| }) |
| } |
| |
| executor |
| } |
| |
| hbaseExecutor |
| } |
| } |
| |
| |
| class AsynchbaseStorage(override val graph: S2Graph, |
| override val config: Config)(implicit ec: ExecutionContext) |
| extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) { |
| |
| import Extensions.DeferOps |
| |
| val hbaseExecutor: ExecutorService = |
| if (config.getString("hbase.zookeeper.quorum") == "localhost") |
| AsynchbaseStorage.initLocalHBase(config) |
| else |
| null |
| |
| /** |
| * Asynchbase client setup. |
| * note that we need two client, one for bulk(withWait=false) and another for withWait=true |
| */ |
| private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort |
| |
| /** |
| * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase. |
| * to fix version conflict, make this as lazy val for clients that don't require hbase client. |
| */ |
| lazy val client = AsynchbaseStorage.makeClient(config) |
| lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") |
| lazy val clients = Seq(client, clientWithFlush) |
| |
| private val emptyKeyValues = new util.ArrayList[KeyValue]() |
| private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]() |
| private val emptyStepResult = new util.ArrayList[StepResult]() |
| |
| private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client |
| |
| import CanDefer._ |
| |
| /** Future Cache to squash request */ |
| lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) |
| |
| /** Simple Vertex Cache */ |
| lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) |
| |
| private val zkQuorum = config.getString("hbase.zookeeper.quorum") |
| private val zkQuorumSlave = |
| if (config.hasPath("hbase.slave.zookeeper.quorum")) Option(config.getString("hbase.slave.zookeeper.quorum")) |
| else None |
| |
| /** v4 max next row size */ |
| private val v4_max_num_rows = 10000 |
| private def getV4MaxNumRows(limit : Int): Int = { |
| if (limit < v4_max_num_rows) limit |
| else v4_max_num_rows |
| } |
| |
| /** |
| * fire rpcs into proper hbase cluster using client and |
| * return true on all mutation success. otherwise return false. |
| */ |
| override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { |
| if (kvs.isEmpty) Future.successful(true) |
| else { |
| val _client = client(withWait) |
| val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) |
| |
| /* Asynchbase IncrementRequest does not implement HasQualifiers */ |
| val incrementsFutures = increments.map { kv => |
| val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) |
| val defer = _client.atomicIncrement(inc) |
| val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case ex: Exception => |
| logger.error(s"mutation failed. $kv", ex) |
| false |
| } |
| if (withWait) future else Future.successful(true) |
| } |
| |
| /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ |
| val othersFutures = putAndDeletes.groupBy { kv => |
| (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) |
| }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => |
| |
| val durability = groupedKeyValues.head.durability |
| val qualifiers = new ArrayBuffer[Array[Byte]]() |
| val values = new ArrayBuffer[Array[Byte]]() |
| |
| groupedKeyValues.foreach { kv => |
| if (kv.qualifier != null) qualifiers += kv.qualifier |
| if (kv.value != null) values += kv.value |
| } |
| val defer = operation match { |
| case SKeyValue.Put => |
| val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) |
| put.setDurable(durability) |
| _client.put(put) |
| case SKeyValue.Delete => |
| val delete = |
| if (qualifiers.isEmpty) |
| new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) |
| else |
| new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) |
| delete.setDurable(durability) |
| _client.delete(delete) |
| } |
| if (withWait) { |
| defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: Exception => |
| groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } |
| false |
| } |
| } else Future.successful(true) |
| } |
| for { |
| incrementRets <- Future.sequence(incrementsFutures) |
| otherRets <- Future.sequence(othersFutures) |
| } yield (incrementRets ++ otherRets).forall(identity) |
| } |
| } |
| |
| private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = { |
| val defer = fetchKeyValuesInner(rpc) |
| defer.toFuture(emptyKeyValues).map { kvsArr => |
| kvsArr.map { kv => |
| implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) |
| } |
| } |
| } |
| |
| override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = { |
| val edge = toRequestEdge(queryRequest, Nil) |
| val rpc = buildRequest(queryRequest, edge) |
| |
| fetchKeyValues(rpc) |
| } |
| |
| /** |
| * since HBase natively provide CheckAndSet on storage level, implementation becomes simple. |
| * @param rpc: key value that is need to be stored on storage. |
| * @param expectedOpt: last valid value for rpc's KeyValue.value from fetching. |
| * @return return true if expected value matches and our rpc is successfully applied, otherwise false. |
| * note that when some other thread modified same cell and have different value on this KeyValue, |
| * then HBase atomically return false. |
| */ |
| override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = { |
| val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) |
| val expected = expectedOpt.map(_.value).getOrElse(Array.empty) |
| client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) |
| } |
| |
| |
| /** |
| * given queryRequest, build storage specific RPC Request. |
| * In HBase case, we either build Scanner or GetRequest. |
| * |
| * IndexEdge layer: |
| * Tall schema(v4): use scanner. |
| * Wide schema(label's schema version in v1, v2, v3): use GetRequest with columnRangeFilter |
| * when query is given with itnerval option. |
| * SnapshotEdge layer: |
| * Tall schema(v3, v4): use GetRequest without column filter. |
| * Wide schema(label's schema version in v1, v2): use GetRequest with columnRangeFilter. |
| * Vertex layer: |
| * all version: use GetRequest without column filter. |
| * @param queryRequest |
| * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. |
| */ |
| override def buildRequest(queryRequest: QueryRequest, edge: S2Edge): AsyncRPC = { |
| import Serializable._ |
| val queryParam = queryRequest.queryParam |
| val label = queryParam.label |
| |
| val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { |
| val snapshotEdge = edge.toSnapshotEdge |
| snapshotEdgeSerializer(snapshotEdge) |
| } else { |
| val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) |
| indexEdgeSerializer(indexEdge) |
| } |
| |
| val rowKey = serializer.toRowKey |
| val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) |
| |
| val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) |
| |
| label.schemaVersion match { |
| case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => |
| val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName) |
| scanner.setFamily(edgeCf) |
| |
| /* |
| * TODO: remove this part. |
| */ |
| val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq) |
| val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) |
| |
| val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes |
| val labelWithDirBytes = indexEdge.labelWithDir.bytes |
| val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) |
| val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) |
| |
| val (startKey, stopKey) = |
| if (queryParam.intervalOpt.isDefined) { |
| // interval is set. |
| val _startKey = queryParam.cursorOpt match { |
| case Some(cursor) => Base64.getDecoder.decode(cursor) |
| case None => Bytes.add(baseKey, intervalMaxBytes) |
| } |
| (_startKey , Bytes.add(baseKey, intervalMinBytes)) |
| } else { |
| /* |
| * note: since propsToBytes encode size of property map at first byte, we are sure about max value here |
| */ |
| val _startKey = queryParam.cursorOpt match { |
| case Some(cursor) => Base64.getDecoder.decode(cursor) |
| case None => baseKey |
| } |
| (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) |
| } |
| |
| scanner.setStartKey(startKey) |
| scanner.setStopKey(stopKey) |
| |
| if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") |
| |
| scanner.setMaxVersions(1) |
| // TODO: exclusive condition innerOffset with cursorOpt |
| if (queryParam.cursorOpt.isDefined) { |
| scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit)) |
| } else { |
| scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit)) |
| } |
| scanner.setMaxTimestamp(maxTs) |
| scanner.setMinTimestamp(minTs) |
| scanner.setRpcTimeout(queryParam.rpcTimeout) |
| |
| // SET option for this rpc properly. |
| if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) |
| else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) |
| |
| case _ => |
| val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { |
| new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier) |
| } else { |
| new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) |
| } |
| |
| get.maxVersions(1) |
| get.setFailfast(true) |
| get.setMinTimestamp(minTs) |
| get.setMaxTimestamp(maxTs) |
| get.setTimeout(queryParam.rpcTimeout) |
| |
| val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) |
| val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => |
| new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) |
| } |
| get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) |
| Left(get) |
| } |
| } |
| |
| /** |
| * we are using future cache to squash requests into same key on storage. |
| * |
| * @param queryRequest |
| * @param isInnerCall |
| * @param parentEdges |
| * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. |
| * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback |
| */ |
| override def fetch(queryRequest: QueryRequest, |
| isInnerCall: Boolean, |
| parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = { |
| |
| def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { |
| val prevStepScore = queryRequest.prevStepScore |
| val fallbackFn: (Exception => StepResult) = { ex => |
| logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) |
| StepResult.Failure |
| } |
| |
| val queryParam = queryRequest.queryParam |
| fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs => |
| val (startOffset, len) = queryParam.label.schemaVersion match { |
| case HBaseType.VERSION4 => |
| val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset |
| (offset, queryParam.limit) |
| case _ => (0, kvs.length) |
| } |
| |
| toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) |
| } |
| } |
| |
| val queryParam = queryRequest.queryParam |
| val cacheTTL = queryParam.cacheTTLInMillis |
| /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ |
| |
| val edge = toRequestEdge(queryRequest, parentEdges) |
| val request = buildRequest(queryRequest, edge) |
| |
| val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) |
| val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) |
| |
| if (cacheTTL <= 0) fetchInner(request) |
| else { |
| val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) |
| |
| // val cacheKeyBytes = toCacheKeyBytes(request) |
| val cacheKey = queryParam.toCacheKey(cacheKeyBytes) |
| futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) |
| } |
| } |
| |
| override def fetches(queryRequests: Seq[QueryRequest], |
| prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { |
| val defers: Seq[Deferred[StepResult]] = for { |
| queryRequest <- queryRequests |
| } yield { |
| val queryOption = queryRequest.query.queryOption |
| val queryParam = queryRequest.queryParam |
| val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent |
| val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil |
| fetch(queryRequest, isInnerCall = false, parentEdges) |
| } |
| |
| val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers) |
| grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => |
| queryResults.toSeq |
| }.toFuture(emptyStepResult) |
| } |
| |
| |
| def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = { |
| val edge = toRequestEdge(request, Nil) |
| fetchKeyValues(buildRequest(request, edge)) |
| } |
| |
| |
| def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = fetchKeyValues(request) |
| |
| /** |
| * when withWait is given, we use client with flushInterval set to 0. |
| * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case. |
| * |
| * @param edges |
| * @param withWait |
| * @return |
| */ |
| override def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { |
| |
| val _client = client(withWait) |
| val defers: Seq[Deferred[(Boolean, Long, Long)]] = for { |
| edge <- edges |
| } yield { |
| val futures: List[Deferred[(Boolean, Long, Long)]] = for { |
| relEdge <- edge.relatedEdges |
| edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) |
| } yield { |
| val countWithTs = edge.propertyValueInner(LabelMeta.count) |
| val countVal = countWithTs.innerVal.toString().toLong |
| val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head |
| val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) |
| val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex => |
| logger.error(s"mutation failed. $request", ex) |
| (false, -1L, -1L) |
| } |
| val defer = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => |
| (true, resultCount.longValue(), countVal) |
| } |
| if (withWait) defer |
| else Deferred.fromResult((true, -1L, -1L)) |
| } |
| |
| val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.group(futures) |
| grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => resultLs.head } |
| } |
| |
| val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.groupInOrder(defers) |
| grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq) |
| } |
| |
| |
| override def flush(): Unit = clients.foreach { client => |
| super.flush() |
| val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) |
| Await.result(client.flush().toFuture(new AnyRef), timeout) |
| } |
| |
| |
| override def shutdown(): Unit = { |
| flush() |
| clients.foreach { client => |
| AsynchbaseStorage.shutdown(client) |
| } |
| if (hbaseExecutor != null) { |
| hbaseExecutor.shutdown() |
| hbaseExecutor.awaitTermination(1, TimeUnit.MINUTES) |
| } |
| } |
| |
| override def createTable(_zkAddr: String, |
| tableName: String, |
| cfs: List[String], |
| regionMultiplier: Int, |
| ttl: Option[Int], |
| compressionAlgorithm: String, |
| replicationScopeOpt: Option[Int] = None, |
| totalRegionCount: Option[Int] = None): Unit = { |
| /* TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ |
| for { |
| zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq |
| } { |
| logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") |
| withAdmin(zkAddr) { admin => |
| val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) |
| try { |
| if (!admin.tableExists(TableName.valueOf(tableName))) { |
| val desc = new HTableDescriptor(TableName.valueOf(tableName)) |
| desc.setDurability(Durability.ASYNC_WAL) |
| for (cf <- cfs) { |
| val columnDesc = new HColumnDescriptor(cf) |
| .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) |
| .setBloomFilterType(BloomType.ROW) |
| .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) |
| .setMaxVersions(1) |
| .setTimeToLive(2147483647) |
| .setMinVersions(0) |
| .setBlocksize(32768) |
| .setBlockCacheEnabled(true) |
| // FIXME: For test!! |
| .setInMemory(true) |
| if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) |
| if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) |
| desc.addFamily(columnDesc) |
| } |
| |
| if (regionCount <= 1) admin.createTable(desc) |
| else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) |
| } else { |
| logger.info(s"$zkAddr, $tableName, $cfs already exist.") |
| } |
| } catch { |
| case e: Throwable => |
| logger.error(s"$zkAddr, $tableName failed with $e", e) |
| throw e |
| } |
| } |
| } |
| } |
| |
| override def truncateTable(zkAddr: String, tableNameStr: String): Unit = { |
| withAdmin(zkAddr) { admin => |
| val tableName = TableName.valueOf(tableNameStr) |
| if (!Try(admin.tableExists(tableName)).getOrElse(false)) { |
| logger.info(s"No table to truncate ${tableNameStr}") |
| return |
| } |
| |
| Try(admin.isTableDisabled(tableName)).map { |
| case true => |
| logger.info(s"${tableNameStr} is already disabled.") |
| |
| case false => |
| logger.info(s"Before disabling to trucate ${tableNameStr}") |
| Try(admin.disableTable(tableName)).recover { |
| case NonFatal(e) => |
| logger.info(s"Failed to disable ${tableNameStr}: ${e}") |
| } |
| logger.info(s"After disabling to trucate ${tableNameStr}") |
| } |
| |
| logger.info(s"Before truncating ${tableNameStr}") |
| Try(admin.truncateTable(tableName, true)).recover { |
| case NonFatal(e) => |
| logger.info(s"Failed to truncate ${tableNameStr}: ${e}") |
| } |
| logger.info(s"After truncating ${tableNameStr}") |
| Try(admin.close()).recover { |
| case NonFatal(e) => |
| logger.info(s"Failed to close admin ${tableNameStr}: ${e}") |
| } |
| Try(admin.getConnection.close()).recover { |
| case NonFatal(e) => |
| logger.info(s"Failed to close connection ${tableNameStr}: ${e}") |
| } |
| } |
| } |
| |
| override def deleteTable(zkAddr: String, tableNameStr: String): Unit = { |
| withAdmin(zkAddr) { admin => |
| val tableName = TableName.valueOf(tableNameStr) |
| if (!admin.tableExists(tableName)) { |
| return |
| } |
| if (admin.isTableEnabled(tableName)) { |
| admin.disableTable(tableName) |
| } |
| admin.deleteTable(tableName) |
| } |
| } |
| |
| /** Asynchbase implementation override default getVertices to use future Cache */ |
| override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { |
| def fromResult(kvs: Seq[SKeyValue], |
| version: String): Option[S2Vertex] = { |
| if (kvs.isEmpty) None |
| else vertexDeserializer.fromKeyValues(kvs, None) |
| } |
| |
| val futures = vertices.map { vertex => |
| val kvs = vertexSerializer(vertex).toKeyValues |
| val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) |
| // get.setTimeout(this.singleGetTimeout.toShort) |
| get.setFailfast(true) |
| get.maxVersions(1) |
| |
| fetchVertexKeyValues(Left(get)).map { kvs => |
| fromResult(kvs, vertex.serviceColumn.schemaVersion) |
| } |
| // val cacheKey = MurmurHash3.stringHash(get.toString) |
| // vertexCache.getOrElseUpdate(cacheKey, cacheTTL = -1)(fetchVertexKeyValues(Left(get))).map { kvs => |
| // fromResult(kvs, vertex.serviceColumn.schemaVersion) |
| // } |
| } |
| |
| Future.sequence(futures).map { result => result.toList.flatten } |
| } |
| |
| //TODO: Limited to 100000 edges per hbase table. fix this later. |
| override def fetchEdgesAll(): Future[Seq[S2Edge]] = { |
| val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => |
| val distinctLabels = labels.toSet |
| val scan = AsynchbasePatcher.newScanner(client, hTableName) |
| scan.setFamily(Serializable.edgeCf) |
| scan.setMaxVersions(1) |
| |
| scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { |
| case null => Seq.empty |
| case kvsLs => |
| kvsLs.flatMap { kvs => |
| kvs.flatMap { kv => |
| val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) |
| |
| indexEdgeDeserializer.fromKeyValues(Seq(kv), None) |
| .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) |
| } |
| } |
| } |
| } |
| |
| Future.sequence(futures).map(_.flatten) |
| } |
| |
| override def fetchVerticesAll(): Future[Seq[S2Vertex]] = { |
| val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => |
| val distinctColumns = columns.toSet |
| val scan = AsynchbasePatcher.newScanner(client, hTableName) |
| scan.setFamily(Serializable.vertexCf) |
| scan.setMaxVersions(1) |
| |
| scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { |
| case null => Seq.empty |
| case kvsLs => |
| kvsLs.flatMap { kvs => |
| vertexDeserializer.fromKeyValues(kvs, None) |
| .filter(v => distinctColumns(v.serviceColumn)) |
| } |
| } |
| } |
| Future.sequence(futures).map(_.flatten) |
| } |
| |
| class V4ResultHandler(scanner: Scanner, defer: Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] { |
| val results = new util.ArrayList[KeyValue]() |
| var offsetCount = 0 |
| |
| override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object = { |
| try { |
| if (kvsLs == null) { |
| defer.callback(results) |
| Try(scanner.close()) |
| } else { |
| val curRet = new util.ArrayList[KeyValue]() |
| kvsLs.foreach(curRet.addAll(_)) |
| val prevOffset = offsetCount |
| offsetCount += curRet.size() |
| |
| val nextRet = if(offsetCount > offset){ |
| if(prevOffset < offset ) { |
| curRet.subList(offset - prevOffset, curRet.size()) |
| } else{ |
| curRet |
| } |
| } else{ |
| emptyKeyValues |
| } |
| |
| val needCount = limit - results.size() |
| if (needCount >= nextRet.size()) { |
| results.addAll(nextRet) |
| } else { |
| results.addAll(nextRet.subList(0, needCount)) |
| } |
| |
| if (results.size() < limit) { |
| scanner.nextRows().addCallback(this) |
| } else { |
| defer.callback(results) |
| Try(scanner.close()) |
| } |
| } |
| } catch{ |
| case ex: Exception => |
| logger.error(s"fetchKeyValuesInner failed.", ex) |
| defer.callback(ex) |
| Try(scanner.close()) |
| } |
| } |
| } |
| |
| /** |
| * Private Methods which is specific to Asynchbase implementation. |
| */ |
| private def fetchKeyValuesInner(rpc: AsyncRPC): Deferred[util.ArrayList[KeyValue]] = { |
| rpc match { |
| case Left(get) => client.get(get) |
| case Right(ScanWithRange(scanner, offset, limit)) => |
| val deferred = new Deferred[util.ArrayList[KeyValue]]() |
| scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, offset, limit)) |
| deferred |
| case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) |
| } |
| } |
| |
| private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { |
| /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ |
| hbaseRpc match { |
| case Left(getRequest) => getRequest.key |
| case Right(ScanWithRange(scanner, offset, limit)) => |
| Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) |
| case _ => |
| logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") |
| throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") |
| } |
| } |
| |
| private def getSecureClusterAdmin(zkAddr: String) = { |
| val jaas = config.getString("java.security.auth.login.config") |
| val krb5Conf = config.getString("java.security.krb5.conf") |
| val realm = config.getString("realm") |
| val principal = config.getString("principal") |
| val keytab = config.getString("keytab") |
| |
| System.setProperty("java.security.auth.login.config", jaas) |
| System.setProperty("java.security.krb5.conf", krb5Conf) |
| // System.setProperty("sun.security.krb5.debug", "true") |
| // System.setProperty("sun.security.spnego.debug", "true") |
| val conf = new Configuration(true) |
| val hConf = HBaseConfiguration.create(conf) |
| |
| hConf.set("hbase.zookeeper.quorum", zkAddr) |
| |
| hConf.set("hadoop.security.authentication", "Kerberos") |
| hConf.set("hbase.security.authentication", "Kerberos") |
| hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm) |
| hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm) |
| |
| System.out.println("Connecting secure cluster, using keytab\n") |
| UserGroupInformation.setConfiguration(hConf) |
| UserGroupInformation.loginUserFromKeytab(principal, keytab) |
| val currentUser = UserGroupInformation.getCurrentUser() |
| System.out.println("current user : " + currentUser + "\n") |
| |
| // get table list |
| val conn = ConnectionFactory.createConnection(hConf) |
| conn.getAdmin |
| } |
| |
| private def withAdmin(zkAddr: String)(op: Admin => Unit): Unit = { |
| val admin = getAdmin(zkAddr) |
| try { |
| op(admin) |
| } finally { |
| admin.close() |
| admin.getConnection.close() |
| } |
| } |
| /** |
| * following configuration need to come together to use secured hbase cluster. |
| * 1. set hbase.security.auth.enable = true |
| * 2. set file path to jaas file java.security.auth.login.config |
| * 3. set file path to kerberos file java.security.krb5.conf |
| * 4. set realm |
| * 5. set principal |
| * 6. set file path to keytab |
| * @param zkAddr |
| * @return |
| */ |
| private def getAdmin(zkAddr: String) = { |
| if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { |
| getSecureClusterAdmin(zkAddr) |
| } else { |
| val conf = HBaseConfiguration.create() |
| conf.set("hbase.zookeeper.quorum", zkAddr) |
| val conn = ConnectionFactory.createConnection(conf) |
| conn.getAdmin |
| } |
| } |
| |
| private def enableTable(zkAddr: String, tableName: String) = { |
| withAdmin(zkAddr) { admin => |
| admin.enableTable(TableName.valueOf(tableName)) |
| } |
| } |
| |
| private def disableTable(zkAddr: String, tableName: String) = { |
| withAdmin(zkAddr) { admin => |
| admin.disableTable(TableName.valueOf(tableName)) |
| } |
| } |
| |
| private def dropTable(zkAddr: String, tableName: String) = { |
| withAdmin(zkAddr) { admin => |
| admin.disableTable(TableName.valueOf(tableName)) |
| admin.deleteTable(TableName.valueOf(tableName)) |
| } |
| } |
| |
| private def getStartKey(regionCount: Int): Array[Byte] = { |
| Bytes.toBytes((Int.MaxValue / regionCount)) |
| } |
| |
| private def getEndKey(regionCount: Int): Array[Byte] = { |
| Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) |
| } |
| } |