blob: f8060562e18d5a2a9a544efd03968551a74fd45f [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.view
import io.prediction.data.storage.Event
import io.prediction.data.storage.EventValidation
import io.prediction.data.storage.DataMap
import io.prediction.data.storage.Storage
import org.joda.time.DateTime
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global // TODO
@deprecated("Use LEvents or LEventStore instead.", "0.9.2")
object ViewPredicates {
def getStartTimePredicate(startTimeOpt: Option[DateTime])
: (Event => Boolean) = {
startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true)
}
def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = {
e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime)))
}
def getUntilTimePredicate(untilTimeOpt: Option[DateTime])
: (Event => Boolean) = {
untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true)
}
def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = {
_.eventTime.isBefore(untilTime)
}
def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean)
= {
entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true)
}
def getEntityTypePredicate(entityType: String): (Event => Boolean) = {
(_.entityType == entityType)
}
def getEventPredicate(eventOpt: Option[String]): (Event => Boolean)
= {
eventOpt.map(getEventPredicate).getOrElse(_ => true)
}
def getEventPredicate(event: String): (Event => Boolean) = {
(_.event == event)
}
}
@deprecated("Use LEvents instead.", "0.9.2")
object ViewAggregators {
def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = {
(p, e) => {
e.event match {
case "$set" => {
if (p == None) {
Some(e.properties)
} else {
p.map(_ ++ e.properties)
}
}
case "$unset" => {
if (p == None) {
None
} else {
p.map(_ -- e.properties.keySet)
}
}
case "$delete" => None
case _ => p // do nothing for others
}
}
}
}
@deprecated("Use LEvents instead.", "0.9.2")
object EventSeq {
// Need to
// >>> import scala.language.implicitConversions
// to enable implicit conversion. Only import in the code where this is
// necessary to avoid confusion.
implicit def eventSeqToList(es: EventSeq): List[Event] = es.events
implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l)
}
@deprecated("Use LEvents instead.", "0.9.2")
class EventSeq(val events: List[Event]) {
def filter(
eventOpt: Option[String] = None,
entityTypeOpt: Option[String] = None,
startTimeOpt: Option[DateTime] = None,
untilTimeOpt: Option[DateTime] = None): EventSeq = {
events
.filter(ViewPredicates.getEventPredicate(eventOpt))
.filter(ViewPredicates.getStartTimePredicate(startTimeOpt))
.filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt))
.filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt))
}
def filter(p: (Event => Boolean)): EventSeq = events.filter(p)
def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T)
: Map[String, T] = {
events
.groupBy( _.entityId )
.mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
.toMap
}
}
@deprecated("Use LEventStore instead.", "0.9.2")
class LBatchView(
val appId: Int,
val startTime: Option[DateTime],
val untilTime: Option[DateTime]) {
@transient lazy val eventsDb = Storage.getLEvents()
@transient lazy val _events = eventsDb.find(
appId = appId,
startTime = startTime,
untilTime = untilTime).toList
@transient lazy val events: EventSeq = new EventSeq(_events)
/* Aggregate event data
*
* @param entityType only aggregate event with entityType
* @param startTimeOpt if specified, only aggregate event after (inclusive)
* startTimeOpt
* @param untilTimeOpt if specified, only aggregate event until (exclusive)
* endTimeOpt
*/
def aggregateProperties(
entityType: String,
startTimeOpt: Option[DateTime] = None,
untilTimeOpt: Option[DateTime] = None
): Map[String, DataMap] = {
events
.filter(entityTypeOpt = Some(entityType))
.filter(e => EventValidation.isSpecialEvents(e.event))
.aggregateByEntityOrdered(
init = None,
op = ViewAggregators.getDataMapAggregator())
.filter{ case (k, v) => (v != None) }
.mapValues(_.get)
}
/*
def aggregateByEntityOrdered[T](
predicate: Event => Boolean,
init: T,
op: (T, Event) => T): Map[String, T] = {
_events
.filter( predicate(_) )
.groupBy( _.entityId )
.mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
.toMap
}
*/
/*
def groupByEntityOrdered[T](
predicate: Event => Boolean,
map: Event => T): Map[String, Seq[T]] = {
_events
.filter( predicate(_) )
.groupBy( _.entityId )
.mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) )
.toMap
}
*/
}