blob: 33d1a3911af44b061061961c5c51909e4ce3dbc1 [file] [log] [blame]
package com.daumkakao.s2graph.core
import HBaseElement._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.util.Bytes
import GraphConstant._
import org.hbase.async.{ScanFilter, ColumnRangeFilter}
object Query {
val initialScore = 1.0
object DuplicatePolicy extends Enumeration {
type DuplicatePolicy = Value
val First, Sum, CountSum, Raw = Value
def apply(policy: String): Value = {
policy match {
case "sum" => Query.DuplicatePolicy.Sum
case "countSum" => Query.DuplicatePolicy.CountSum
case "raw" => Query.DuplicatePolicy.Raw
case _ => DuplicatePolicy.First
}
}
}
}
case class Query(vertices: Seq[Vertex], steps: List[Step],
unique: Boolean = true, removeCycle: Boolean = false) {
import Query._
lazy val startEdgesWithScore = (for {
firstStep <- steps.headOption
} yield {
for (v <- vertices; qParam <- firstStep.queryParams) yield {
val ts = System.currentTimeMillis()
val srcVertex = Vertex(CompositeId(v.id.colId, v.innerId, isEdge = true, useHash = true), ts)
(Edge(srcVertex, Vertex.emptyVertex, qParam.labelWithDir, GraphUtil.operations("insert"), ts, ts, Map.empty[Byte, InnerValWithTs]), Query.initialScore)
}
}).getOrElse(List.empty[(Edge, Double)])
}
case class VertexParam(vertices: Seq[Vertex]) {
import Query._
var filters: Option[Map[Byte, InnerVal]] = None
def has(what: Option[Map[Byte, InnerVal]]): VertexParam = {
what match {
case None => this
case Some(w) => has(w)
}
}
def has(what: Map[Byte, InnerVal]): VertexParam = {
this.filters = Some(what)
this
}
}
object RankParam {
def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = {
new RankParam(labelId, keyAndWeights)
}
}
class RankParam(val labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = Seq.empty[(Byte, Double)]) { // empty => Count
def defaultKey() = {
this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0))
this
}
def singleKey(key: String) = {
this.keySeqAndWeights =
LabelMeta.findByName(labelId, key) match {
case None => List.empty[(Byte, Double)]
case Some(ktype) => List((ktype.seq, 1.0))
}
this
}
def multipleKey(keyAndWeights: Seq[(String, Double)]) = {
this.keySeqAndWeights =
for ((key, weight) <- keyAndWeights; row <- LabelMeta.findByName(labelId, key)) yield (row.seq, weight)
this
}
}
case class Where(val clauses: Seq[Clause] = Seq.empty[Clause]) {
def filter(edge: Edge): Boolean = {
clauses.map(_.filter(edge)).forall(r => r)
}
}
abstract class Clause {
def and(otherField: Clause): Clause = And(this, otherField)
def or(otherField: Clause): Clause = Or(this, otherField)
def filter(edge: Edge): Boolean = ???
}
case class Equal(val propKey: Byte, val value: InnerVal) extends Clause {
override def filter(edge: Edge): Boolean = {
propKey match {
case LabelMeta.from.seq => edge.srcVertex.innerId == value
case LabelMeta.to.seq => edge.tgtVertex.innerId == value
case _ =>
edge.props.get(propKey) match {
case None => true
case Some(edgeVal) => edgeVal == value
}
}
}
}
case class IN(val propKey: Byte, val values: Set[InnerVal]) extends Clause {
override def filter(edge: Edge): Boolean = {
propKey match {
case LabelMeta.from.seq => values.contains(edge.srcVertex.innerId)
case LabelMeta.to.seq => values.contains(edge.tgtVertex.innerId)
case _ =>
edge.props.get(propKey) match {
case None => true
case Some(edgeVal) => values.contains(edgeVal)
}
}
}
}
case class Between(val propKey: Byte, val minValue: InnerVal, val maxValue: InnerVal) extends Clause {
override def filter(edge: Edge): Boolean = {
propKey match {
case LabelMeta.from.seq => minValue <= edge.srcVertex.innerId && edge.srcVertex.innerId <= maxValue
case LabelMeta.to.seq => minValue <= edge.tgtVertex.innerId && edge.tgtVertex.innerId <= maxValue
case _ =>
edge.props.get(propKey) match {
case None => true
case Some(edgeVal) => minValue <= edgeVal && edgeVal <= maxValue
}
}
}
}
case class And(val left: Clause, val right: Clause) extends Clause {
override def filter(edge: Edge): Boolean = {
left.filter(edge) && right.filter(edge)
}
}
case class Or(val left: Clause, val right: Clause) extends Clause {
override def filter(edge: Edge): Boolean = {
left.filter(edge) || right.filter(edge)
}
}
case class QueryParam(labelWithDir: LabelWithDirection) {
import Query.DuplicatePolicy._
import Query.DuplicatePolicy
val label = Label.findById(labelWithDir.labelId)
val defaultKey = LabelIndex.defaultSeq
val fullKey = defaultKey
var labelOrderSeq = fullKey
var outputField: Option[Byte] = None
// var start = OrderProps.empty
// var end = OrderProps.empty
var limit = 10
var offset = 0
var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1))
var isRowKeyOnly = false
var duration: Option[(Long, Long)] = None
// var direction = 0
// var props = OrderProps.empty
var isInverted: Boolean = false
// var filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
// val scanFilters = ListBuffer.empty[ScanFilter]
// var filters = new FilterList(List.empty[ScanFilter], FilterList.Operator.MUST_PASS_ALL)
var columnRangeFilter: ColumnRangeFilter = null
// var columnPaginationFilter: ColumnPaginationFilter = null
var exclude = false
var include = false
var hasFilters: Map[Byte, InnerVal] = Map.empty[Byte, InnerVal]
// var propsFilters: PropsFilter = PropsFilter()
var where: Option[Where] = None
var duplicatePolicy = DuplicatePolicy.First
var rpcTimeoutInMillis = 100
var maxAttempt = 2
def isRowKeyOnly(isRowKeyOnly: Boolean): QueryParam = {
this.isRowKeyOnly = isRowKeyOnly
this
}
def isInverted(isInverted: Boolean): QueryParam = {
this.isInverted = isInverted
this
}
def labelOrderSeq(labelOrderSeq: Byte): QueryParam = {
this.labelOrderSeq = labelOrderSeq
this
}
def limit(offset: Int, limit: Int): QueryParam = {
this.limit = limit
this.offset = offset
// this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset)
this
}
def interval(fromTo: Option[(Seq[(Byte, InnerVal)], Seq[(Byte, InnerVal)])]): QueryParam = {
fromTo match {
case Some((from, to)) => interval(from, to)
case _ => this
}
}
def interval(from: Seq[(Byte, InnerVal)], to: Seq[(Byte, InnerVal)]): QueryParam = {
import HBaseElement._
// val len = label.orderTypes.size.toByte
// val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
// Logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
// Logger.error(s"index length: $len")
val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(InnerVal.minMetaByte))
val fromVal = Bytes.add(propsToBytes(from), Array.fill(1)(InnerVal.maxMetaByte))
toVal(0) = len
fromVal(0) = len
val maxBytes = fromVal
val minBytes = toVal
val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
// queryLogger.info(s"Interval: ${rangeFilter.getMinColumn().toList} ~ ${rangeFilter.getMaxColumn().toList}: ${Bytes.compareTo(minBytes, maxBytes)}")
// this.filters.(rangeFilter)
this.columnRangeFilter = rangeFilter
this
}
def duration(minMaxTs: Option[(Long, Long)]): QueryParam = {
minMaxTs match {
case Some((minTs, maxTs)) => duration(minTs, maxTs)
case _ => this
}
}
def duration(minTs: Long, maxTs: Long): QueryParam = {
this.duration = Some((minTs, maxTs))
queryLogger.debug(s"Duration: $minTs ~ $maxTs")
this
}
def rank(r: RankParam): QueryParam = {
this.rank = r
this
}
def exclude(filterOut: Boolean): QueryParam = {
this.exclude = filterOut
this
}
def include(filterIn: Boolean): QueryParam = {
this.include = filterIn
this
}
def outputField(ofOpt: Option[Byte]): QueryParam = {
this.outputField = ofOpt
this
}
def has(hasFilters: Map[Byte, InnerVal]): QueryParam = {
this.hasFilters = hasFilters
this
}
def where(whereOpt: Option[Where]): QueryParam = {
this.where = whereOpt
this
}
def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = {
this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First)
this
}
def rpcTimeout(millis: Int): QueryParam = {
this.rpcTimeoutInMillis = millis
this
}
def maxAttempt(attempt: Int): QueryParam = {
this.maxAttempt = attempt;
this
}
override def toString(): String = {
List(label.label, labelOrderSeq, offset, limit, rank, isRowKeyOnly,
duration, isInverted, exclude, include, hasFilters, outputField).mkString("\t")
}
def buildGetRequest(srcVertex: Vertex) = {
val rowKey = EdgeRowKey(srcVertex.id.updateUseHash(true), labelWithDir, labelOrderSeq, isInverted)
val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue))
val client = Graph.getClient(label.hbaseZkAddr)
val filters = ListBuffer.empty[ScanFilter]
Graph.singleGet(label.hbaseTableName.getBytes, rowKey.bytes, edgeCf, offset, limit, minTs, maxTs, maxAttempt, rpcTimeoutInMillis, columnRangeFilter)
}
}
case class Step(queryParams: List[QueryParam]) {
lazy val excludes = queryParams.filter(qp => qp.exclude)
lazy val includes = queryParams.filterNot(qp => qp.exclude)
lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
}