/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.predictionio.data.storage.hbase

import org.apache.predictionio.data.storage.Event
import org.apache.predictionio.data.storage.EventValidation
import org.apache.predictionio.data.storage.DataMap

import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.QualifierFilter
import org.apache.hadoop.hbase.filter.SkipFilter

import org.json4s.DefaultFormats
import org.json4s.JObject
import org.json4s.native.Serialization.{ read, write }

import org.joda.time.DateTime
import org.joda.time.DateTimeZone

import org.apache.commons.codec.binary.Base64
import java.security.MessageDigest

import java.util.UUID

/* common utility function for accessing EventsStore in HBase */
object HBEventsUtil {

  implicit val formats = DefaultFormats

  def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = {
    channelId.map { ch =>
      s"${namespace}:events_${appId}_${ch}"
    }.getOrElse {
      s"${namespace}:events_${appId}"
    }
  }

  // column names for "e" column family
  val colNames: Map[String, Array[Byte]] = Map(
    "event" -> "e",
    "entityType" -> "ety",
    "entityId" -> "eid",
    "targetEntityType" -> "tety",
    "targetEntityId" -> "teid",
    "properties" -> "p",
    "prId" -> "prid",
    "eventTime" -> "et",
    "eventTimeZone" -> "etz",
    "creationTime" -> "ct",
    "creationTimeZone" -> "ctz"
  ).mapValues(Bytes.toBytes(_))

  def hash(entityType: String, entityId: String): Array[Byte] = {
    val s = entityType + "-" + entityId
    // get a new MessageDigest object each time for thread-safe
    val md5 = MessageDigest.getInstance("MD5")
    md5.digest(Bytes.toBytes(s))
  }

  class RowKey(
    val b: Array[Byte]
  ) {
    require((b.size == 32), s"Incorrect b size: ${b.size}")
    lazy val entityHash: Array[Byte] = b.slice(0, 16)
    lazy val millis: Long = Bytes.toLong(b.slice(16, 24))
    lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32))

    lazy val toBytes: Array[Byte] = b

    override def toString: String = {
      Base64.encodeBase64URLSafeString(toBytes)
    }
  }

  object RowKey {
    def apply(
      entityType: String,
      entityId: String,
      millis: Long,
      uuidLow: Long): RowKey = {
        // add UUID least significant bits for multiple actions at the same time
        // (UUID's most significant bits are actually timestamp,
        // use eventTime instead).
        val b = hash(entityType, entityId) ++
          Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow)
        new RowKey(b)
      }

    // get RowKey from string representation
    def apply(s: String): RowKey = {
      try {
        apply(Base64.decodeBase64(s))
      } catch {
        case e: Exception => throw new RowKeyException(
          s"Failed to convert String ${s} to RowKey because ${e}", e)
      }
    }

    def apply(b: Array[Byte]): RowKey = {
      if (b.size != 32) {
        val bString = b.mkString(",")
        throw new RowKeyException(
          s"Incorrect byte array size. Bytes: ${bString}.")
      }
      new RowKey(b)
    }

  }

  class RowKeyException(val msg: String, val cause: Exception)
    extends Exception(msg, cause) {
      def this(msg: String) = this(msg, null)
    }

  case class PartialRowKey(entityType: String, entityId: String,
    millis: Option[Long] = None) {
    val toBytes: Array[Byte] = {
      hash(entityType, entityId) ++
        (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]()))
    }
  }

  def eventToPut(event: Event, appId: Int): (Put, RowKey) = {
    // generate new rowKey if eventId is None
    val rowKey = event.eventId.map { id =>
      RowKey(id) // create rowKey from eventId
    }.getOrElse {
      // TODO: use real UUID. not pseudo random
      val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits
      RowKey(
        entityType = event.entityType,
        entityId = event.entityId,
        millis = event.eventTime.getMillis,
        uuidLow = uuidLow
      )
    }

    val eBytes = Bytes.toBytes("e")
    // use eventTime as HBase's cell timestamp
    val put = new Put(rowKey.toBytes, event.eventTime.getMillis)

    def addStringToE(col: Array[Byte], v: String): Put = {
      put.add(eBytes, col, Bytes.toBytes(v))
    }

    def addLongToE(col: Array[Byte], v: Long): Put = {
      put.add(eBytes, col, Bytes.toBytes(v))
    }

    addStringToE(colNames("event"), event.event)
    addStringToE(colNames("entityType"), event.entityType)
    addStringToE(colNames("entityId"), event.entityId)

    event.targetEntityType.foreach { targetEntityType =>
      addStringToE(colNames("targetEntityType"), targetEntityType)
    }

    event.targetEntityId.foreach { targetEntityId =>
      addStringToE(colNames("targetEntityId"), targetEntityId)
    }

    // TODO: make properties Option[]
    if (!event.properties.isEmpty) {
      addStringToE(colNames("properties"), write(event.properties.toJObject))
    }

    event.prId.foreach { prId =>
      addStringToE(colNames("prId"), prId)
    }

    addLongToE(colNames("eventTime"), event.eventTime.getMillis)
    val eventTimeZone = event.eventTime.getZone
    if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) {
      addStringToE(colNames("eventTimeZone"), eventTimeZone.getID)
    }

    addLongToE(colNames("creationTime"), event.creationTime.getMillis)
    val creationTimeZone = event.creationTime.getZone
    if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) {
      addStringToE(colNames("creationTimeZone"), creationTimeZone.getID)
    }

    // can use zero-length byte array for tag cell value
    (put, rowKey)
  }

  def resultToEvent(result: Result, appId: Int): Event = {
    val rowKey = RowKey(result.getRow())

    val eBytes = Bytes.toBytes("e")
    // val e = result.getFamilyMap(eBytes)

    def getStringCol(col: String): String = {
      val r = result.getValue(eBytes, colNames(col))
      require(r != null,
        s"Failed to get value for column ${col}. " +
        s"Rowkey: ${rowKey.toString} " +
        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")

      Bytes.toString(r)
    }

    def getLongCol(col: String): Long = {
      val r = result.getValue(eBytes, colNames(col))
      require(r != null,
        s"Failed to get value for column ${col}. " +
        s"Rowkey: ${rowKey.toString} " +
        s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.")

      Bytes.toLong(r)
    }

    def getOptStringCol(col: String): Option[String] = {
      val r = result.getValue(eBytes, colNames(col))
      if (r == null) {
        None
      } else {
        Some(Bytes.toString(r))
      }
    }

    def getTimestamp(col: String): Long = {
      result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp()
    }

    val event = getStringCol("event")
    val entityType = getStringCol("entityType")
    val entityId = getStringCol("entityId")
    val targetEntityType = getOptStringCol("targetEntityType")
    val targetEntityId = getOptStringCol("targetEntityId")
    val properties: DataMap = getOptStringCol("properties")
      .map(s => DataMap(read[JObject](s))).getOrElse(DataMap())
    val prId = getOptStringCol("prId")
    val eventTimeZone = getOptStringCol("eventTimeZone")
      .map(DateTimeZone.forID(_))
      .getOrElse(EventValidation.defaultTimeZone)
    val eventTime = new DateTime(
      getLongCol("eventTime"), eventTimeZone)
    val creationTimeZone = getOptStringCol("creationTimeZone")
      .map(DateTimeZone.forID(_))
      .getOrElse(EventValidation.defaultTimeZone)
    val creationTime: DateTime = new DateTime(
      getLongCol("creationTime"), creationTimeZone)

    Event(
      eventId = Some(RowKey(result.getRow()).toString),
      event = event,
      entityType = entityType,
      entityId = entityId,
      targetEntityType = targetEntityType,
      targetEntityId = targetEntityId,
      properties = properties,
      eventTime = eventTime,
      tags = Nil,
      prId = prId,
      creationTime = creationTime
    )
  }


  // for mandatory field. None means don't care.
  // for optional field. None means don't care.
  //    Some(None) means not exist.
  //    Some(Some(x)) means it should match x
  def createScan(
    startTime: Option[DateTime] = None,
    untilTime: Option[DateTime] = None,
    entityType: Option[String] = None,
    entityId: Option[String] = None,
    eventNames: Option[Seq[String]] = None,
    targetEntityType: Option[Option[String]] = None,
    targetEntityId: Option[Option[String]] = None,
    reversed: Option[Boolean] = None): Scan = {

    val scan: Scan = new Scan()

    (entityType, entityId) match {
      case (Some(et), Some(eid)) => {
        val start = PartialRowKey(et, eid,
          startTime.map(_.getMillis)).toBytes
        // if no untilTime, stop when reach next bytes of entityTypeAndId
        val stop = PartialRowKey(et, eid,
          untilTime.map(_.getMillis).orElse(Some(-1))).toBytes

        if (reversed.getOrElse(false)) {
          // Reversed order.
          // If you specify a startRow and stopRow,
          // to scan in reverse, the startRow needs to be lexicographically
          // after the stopRow.
          scan.setStartRow(stop)
          scan.setStopRow(start)
          scan.setReversed(true)
        } else {
          scan.setStartRow(start)
          scan.setStopRow(stop)
        }
      }
      case (_, _) => {
        val minTime: Long = startTime.map(_.getMillis).getOrElse(0)
        val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue)
        scan.setTimeRange(minTime, maxTime)
        if (reversed.getOrElse(false)) {
          scan.setReversed(true)
        }
      }
    }

    val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)

    val eBytes = Bytes.toBytes("e")

    def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = {
      val comp = new BinaryComparator(value)
      new SingleColumnValueFilter(
        eBytes, colNames(col), CompareOp.EQUAL, comp)
    }

    // skip the row if the column exists
    def createSkipRowIfColumnExistFilter(col: String): SkipFilter = {
      val comp = new BinaryComparator(colNames(col))
      val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp)
      // filters an entire row if any of the Cell checks do not pass
      new SkipFilter(q)
    }

    entityType.foreach { et =>
      val compType = new BinaryComparator(Bytes.toBytes(et))
      val filterType = new SingleColumnValueFilter(
        eBytes, colNames("entityType"), CompareOp.EQUAL, compType)
      filters.addFilter(filterType)
    }

    entityId.foreach { eid =>
      val compId = new BinaryComparator(Bytes.toBytes(eid))
      val filterId = new SingleColumnValueFilter(
        eBytes, colNames("entityId"), CompareOp.EQUAL, compId)
      filters.addFilter(filterId)
    }

    eventNames.foreach { eventsList =>
      // match any of event in the eventsList
      val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE)
      eventsList.foreach { e =>
        val compEvent = new BinaryComparator(Bytes.toBytes(e))
        val filterEvent = new SingleColumnValueFilter(
          eBytes, colNames("event"), CompareOp.EQUAL, compEvent)
        eventFilters.addFilter(filterEvent)
      }
      if (!eventFilters.getFilters().isEmpty) {
        filters.addFilter(eventFilters)
      }
    }

    targetEntityType.foreach {
      case None =>
        val filter = createSkipRowIfColumnExistFilter("targetEntityType")
        filters.addFilter(filter)
      case Some(tet) =>
        val filter = createBinaryFilter(
          "targetEntityType", Bytes.toBytes(tet))
        // the entire row will be skipped if the column is not found.
        filter.setFilterIfMissing(true)
        filters.addFilter(filter)
    }

    targetEntityId.foreach {
      case None =>
        val filter = createSkipRowIfColumnExistFilter("targetEntityId")
        filters.addFilter(filter)
      case Some(teid) =>
        val filter = createBinaryFilter(
          "targetEntityId", Bytes.toBytes(teid))
        // the entire row will be skipped if the column is not found.
        filter.setFilterIfMissing(true)
        filters.addFilter(filter)
    }

    if (!filters.getFilters().isEmpty) {
      scan.setFilter(filters)
    }

    scan
  }

}
