blob: f3194ae0ddbb281cb46a58a7304936d62dfccd79 [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.storage.hbase
import io.prediction.data.storage.Event
import io.prediction.data.storage.PropertyMap
import io.prediction.data.storage.LEvents
import io.prediction.data.storage.LEventAggregator
import io.prediction.data.storage.StorageError
import io.prediction.data.storage.hbase.HBEventsUtil.RowKey
import io.prediction.data.storage.hbase.HBEventsUtil.RowKeyException
import grizzled.slf4j.Logging
import org.joda.time.DateTime
import org.apache.hadoop.hbase.NamespaceDescriptor
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
class HBLEvents(val client: HBClient, val namespace: String)
extends LEvents with Logging {
// implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
def resultToEvent(result: Result, appId: Int): Event =
HBEventsUtil.resultToEvent(result, appId)
def getTable(appId: Int): HTableInterface = client.connection.getTable(
HBEventsUtil.tableName(namespace, appId))
override
def init(appId: Int): Boolean = {
// check namespace exist
val existingNamespace = client.admin.listNamespaceDescriptors()
.map(_.getName)
if (!existingNamespace.contains(namespace)) {
val nameDesc = NamespaceDescriptor.create(namespace).build()
info(s"The namespace ${namespace} doesn't exist yet. Creating now...")
client.admin.createNamespace(nameDesc)
}
val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId))
if (!client.admin.tableExists(tableName)) {
info(s"The table ${tableName.getNameAsString()} doesn't exist yet." +
" Creating now...")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("e"))
tableDesc.addFamily(new HColumnDescriptor("r")) // reserved
client.admin.createTable(tableDesc)
}
true
}
override
def remove(appId: Int): Boolean = {
val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId))
try {
if (client.admin.tableExists(tableName)) {
info(s"Removing table ${tableName.getNameAsString()}...")
client.admin.disableTable(tableName)
client.admin.deleteTable(tableName)
} else {
info(s"Table ${tableName.getNameAsString()} doesn't exist." +
s" Nothing is deleted.")
}
true
} catch {
case e: Exception => {
error(s"Fail to remove table for appId ${appId}. Exception: ${e}")
false
}
}
}
override
def close(): Unit = {
client.admin.close()
client.connection.close()
}
override
def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, String]] = {
Future {
val table = getTable(appId)
val (put, rowKey) = HBEventsUtil.eventToPut(event, appId)
table.put(put)
table.flushCommits()
table.close()
Right(rowKey.toString)
}/* .recover {
case e: Exception => Left(StorageError(e.toString))
} */
}
override
def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Option[Event]]] = {
Future {
val table = getTable(appId)
val rowKey = RowKey(eventId)
val get = new Get(rowKey.toBytes)
val result = table.get(get)
table.close()
if (!result.isEmpty()) {
val event = resultToEvent(result, appId)
Right(Some(event))
} else {
Right(None)
}
}.recover {
case e: RowKeyException => Left(StorageError(e.toString))
case e: Exception => throw e
}
}
override
def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Boolean]] = {
Future {
val table = getTable(appId)
val rowKey = RowKey(eventId)
val exists = table.exists(new Get(rowKey.toBytes))
table.delete(new Delete(rowKey.toBytes))
table.close()
Right(exists)
}
}
override
def futureGetByAppId(appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Iterator[Event]]] = {
futureFind(
appId = appId,
startTime = None,
untilTime = None,
entityType = None,
entityId = None,
eventNames = None,
limit = None,
reversed = None)
}
override
def futureGetByAppIdAndTime(appId: Int, startTime: Option[DateTime],
untilTime: Option[DateTime])(implicit ec: ExecutionContext):
Future[Either[StorageError, Iterator[Event]]] = {
futureFind(
appId = appId,
startTime = startTime,
untilTime = untilTime,
entityType = None,
entityId = None,
eventNames = None,
limit = None,
reversed = None)
}
override
def futureGetByAppIdAndTimeAndEntity(appId: Int,
startTime: Option[DateTime],
untilTime: Option[DateTime],
entityType: Option[String],
entityId: Option[String])(implicit ec: ExecutionContext):
Future[Either[StorageError, Iterator[Event]]] = {
futureFind(
appId = appId,
startTime = startTime,
untilTime = untilTime,
entityType = entityType,
entityId = entityId,
eventNames = None,
limit = None,
reversed = None)
}
override
def futureFind(
appId: Int,
startTime: Option[DateTime] = None,
untilTime: Option[DateTime] = None,
entityType: Option[String] = None,
entityId: Option[String] = None,
eventNames: Option[Seq[String]] = None,
targetEntityType: Option[Option[String]] = None,
targetEntityId: Option[Option[String]] = None,
limit: Option[Int] = None,
reversed: Option[Boolean] = None)(implicit ec: ExecutionContext):
Future[Either[StorageError, Iterator[Event]]] = {
Future {
val table = getTable(appId)
val scan = HBEventsUtil.createScan(
startTime = startTime,
untilTime = untilTime,
entityType = entityType,
entityId = entityId,
eventNames = eventNames,
targetEntityType = targetEntityType,
targetEntityId = targetEntityId,
reversed = reversed)
val scanner = table.getScanner(scan)
table.close()
val eventsIter = scanner.iterator()
// Get all events if None or Some(-1)
val results: Iterator[Result] = limit match {
case Some(-1) => eventsIter
case None => eventsIter
case Some(x) => eventsIter.take(x)
}
val eventsIt = results.map { resultToEvent(_, appId) }
Right(eventsIt)
}
}
override
def futureAggregateProperties(
appId: Int,
entityType: String,
startTime: Option[DateTime] = None,
untilTime: Option[DateTime] = None,
required: Option[Seq[String]] = None)(implicit ec: ExecutionContext):
Future[Either[StorageError, Map[String, PropertyMap]]] = {
futureFind(
appId = appId,
startTime = startTime,
untilTime = untilTime,
entityType = Some(entityType),
eventNames = Some(LEventAggregator.eventNames)
).map{ either =>
either.right.map{ eventIt =>
val dm = LEventAggregator.aggregateProperties(eventIt)
if (required.isDefined) {
dm.filter { case (k, v) =>
required.get.map(v.contains(_)).reduce(_ && _)
}
} else dm
}
}
}
override
def futureAggregatePropertiesSingle(
appId: Int,
entityType: String,
entityId: String,
startTime: Option[DateTime] = None,
untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext):
Future[Either[StorageError, Option[PropertyMap]]] = {
futureFind(
appId = appId,
startTime = startTime,
untilTime = untilTime,
entityType = Some(entityType),
entityId = Some(entityId),
eventNames = Some(LEventAggregator.eventNames)
).map{ either =>
either.right.map{ eventIt =>
LEventAggregator.aggregatePropertiesSingle(eventIt)
}
}
}
override
def futureDeleteByAppId(appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Unit]] = {
Future {
// TODO: better way to handle range delete
val table = getTable(appId)
val scan = new Scan()
val scanner = table.getScanner(scan)
val it = scanner.iterator()
while (it.hasNext()) {
val result = it.next()
table.delete(new Delete(result.getRow()))
}
scanner.close()
table.close()
Right(())
}
}
}