blob: dc874c8f03433a3ff9d78a2094bdc6037f2e0d0e [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.engines.base
import io.prediction.controller.LDataSource
import io.prediction.controller.Params
import io.prediction.controller.EmptyDataParams
import io.prediction.data.view.LBatchView
import org.joda.time.DateTime
import scala.reflect.ClassTag
import grizzled.slf4j.Logger
abstract class AbstractEventsDataSourceParams extends Params {
val appId: Int
// default None to include all itypes
val itypes: Option[Set[String]] // train items with these itypes
val actions: Set[String] // actions for trainingdata
val startTime: Option[DateTime] // event starttime
val untilTime: Option[DateTime] // event untiltime
val attributeNames: AttributeNames
}
class EventsDataSource[DP: ClassTag, Q, A](
dsp: AbstractEventsDataSourceParams)
extends LDataSource[AbstractEventsDataSourceParams,
DP, TrainingData, Q, A] {
@transient lazy val logger = Logger[this.type]
@transient lazy val batchView = new LBatchView(dsp.appId,
dsp.startTime, dsp.untilTime)
override
def readTraining(): TrainingData = {
val attributeNames = dsp.attributeNames
// uid => (UserTD, uindex)
val usersMap: Map[String, (UserTD, Int)] = batchView
.aggregateProperties(attributeNames.user)
.zipWithIndex
.map { case ((entityId, dataMap), index) =>
val userTD = new UserTD(uid = entityId)
(entityId -> (userTD, index + 1)) // make index starting from 1
}
val itemsMap = batchView
.aggregateProperties(attributeNames.item)
.map { case (entityId, dataMap) =>
val itemTD = try {
new ItemTD(
iid = entityId,
itypes = dataMap.get[List[String]](attributeNames.itypes),
starttime = dataMap.getOpt[DateTime](attributeNames.starttime)
.map(_.getMillis),
endtime = dataMap.getOpt[DateTime](attributeNames.endtime)
.map(_.getMillis),
inactive = dataMap.getOpt[Boolean](attributeNames.inactive)
.getOrElse(false)
)
} catch {
case exception: Exception => {
logger.error(s"${exception}: entityType ${attributeNames.item} " +
s"entityID ${entityId}: ${dataMap}." )
throw exception
}
}
(entityId -> itemTD)
}.filter { case (id, (itemTD)) =>
dsp.itypes.map{ t =>
!(itemTD.itypes.toSet.intersect(t).isEmpty)
}.getOrElse(true)
}.zipWithIndex.map { case ((id, itemTD), index) =>
(id -> (itemTD, index + 1))
}
val u2iActions = batchView.events
.filter{ e =>
attributeNames.u2iActions.contains(e.event) &&
dsp.actions.contains(e.event) &&
usersMap.contains(e.entityId) &&
// if the event doesn't have targetEntityId, also include it
// although it's error case.
// check and flag error in next step
e.targetEntityId.map(itemsMap.contains(_)).getOrElse(true)
}.map { e =>
// make sure targetEntityId exist in this event
require((e.targetEntityId != None),
s"u2i Event: ${e} cannot have targetEntityId empty.")
try {
new U2IActionTD(
uindex = usersMap(e.entityId)._2,
iindex = itemsMap(e.targetEntityId.get)._2,
action = e.event,
v = e.properties.getOpt[Int](attributeNames.rating),
t = e.eventTime.getMillis
)
} catch {
case exception: Exception => {
logger.error(s"${exception}: event ${e}.")
throw exception
}
}
}
new TrainingData(
users = usersMap.map { case (k, (v1, v2)) => (v2, v1) },
items = itemsMap.map { case (k, (v1, v2)) => (v2, v1) },
u2iActions = u2iActions
)
}
}