blob: b4230ccd116121e56a73f13589dc398b8c0c50f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.predictionio.data.storage.jdbc
import grizzled.slf4j.Logging
import org.apache.predictionio.data.storage.DataMap
import org.apache.predictionio.data.storage.Event
import org.apache.predictionio.data.storage.LEvents
import org.apache.predictionio.data.storage.StorageClientConfig
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.json4s.JObject
import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
import scalikejdbc._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
/** JDBC implementation of [[LEvents]] */
class JDBCLEvents(
client: String,
config: StorageClientConfig,
namespace: String) extends LEvents with Logging {
implicit private val formats = org.json4s.DefaultFormats
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
// To use index, it must be varchar less than 255 characters on a VARCHAR column
val useIndex = config.properties.contains("INDEX") &&
config.properties("INDEX").equalsIgnoreCase("enabled")
val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
val entityIdIndexName = s"idx_${tableName}_ei"
val entityTypeIndexName = s"idx_${tableName}_et"
DB autoCommit { implicit session =>
if (useIndex) {
SQL(s"""
create table if not exists $tableName (
id varchar(32) not null primary key,
event varchar(255) not null,
entityType varchar(255) not null,
entityId varchar(255) not null,
targetEntityType text,
targetEntityId text,
properties text,
eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
eventTimeZone varchar(50) not null,
tags text,
prId text,
creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
creationTimeZone varchar(50) not null)""").execute().apply()
// create index
SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
} else {
SQL(s"""
create table if not exists $tableName (
id varchar(32) not null primary key,
event text not null,
entityType text not null,
entityId text not null,
targetEntityType text,
targetEntityId text,
properties text,
eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
eventTimeZone varchar(50) not null,
tags text,
prId text,
creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
creationTimeZone varchar(50) not null)""").execute().apply()
}
true
}
}
override def remove(appId: Int, channelId: Option[Int] = None): Boolean =
DB autoCommit { implicit session =>
SQL(s"""
drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
""").execute().apply()
true
}
override def close(): Unit = ConnectionPool.closeAll()
override def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[String] = Future {
DB localTx { implicit session =>
val id = event.eventId.getOrElse(JDBCUtils.generateId)
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
sql"""
insert into $tableName values(
$id,
${event.event},
${event.entityType},
${event.entityId},
${event.targetEntityType},
${event.targetEntityId},
${write(event.properties.toJObject)},
${event.eventTime},
${event.eventTime.getZone.getID},
${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
${event.prId},
${event.creationTime},
${event.creationTime.getZone.getID}
)
""".update().apply()
id
}
}
override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Seq[String]] = Future {
DB localTx { implicit session =>
val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId))
val params = events.zip(ids).map { case (event, id) =>
Seq(
'id -> id,
'event -> event.event,
'entityType -> event.entityType,
'entityId -> event.entityId,
'targetEntityType -> event.targetEntityType,
'targetEntityId -> event.targetEntityId,
'properties -> write(event.properties.toJObject),
'eventTime -> event.eventTime,
'eventTimeZone -> event.eventTime.getZone.getID,
'tags -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None),
'prId -> event.prId,
'creationTime -> event.creationTime,
'creationTimeZone -> event.creationTime.getZone.getID
)
}
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
sql"""
insert into $tableName values(
{id},
{event},
{entityType},
{entityId},
{targetEntityType},
{targetEntityId},
{properties},
{eventTime},
{eventTimeZone},
{tags},
{prId},
{creationTime},
{creationTimeZone}
)
""".batchByName(params: _*).apply()
ids
}
}
override def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Option[Event]] = Future {
DB readOnly { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
sql"""
select
id,
event,
entityType,
entityId,
targetEntityType,
targetEntityId,
properties,
eventTime,
eventTimeZone,
tags,
prId,
creationTime,
creationTimeZone
from $tableName
where id = $eventId
""".map(resultToEvent).single().apply()
}
}
override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Boolean] = Future {
DB localTx { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
sql"""
delete from $tableName where id = $eventId
""".update().apply()
true
}
}
override def futureFind(
appId: Int,
channelId: Option[Int] = None,
startTime: Option[DateTime] = None,
untilTime: Option[DateTime] = None,
entityType: Option[String] = None,
entityId: Option[String] = None,
eventNames: Option[Seq[String]] = None,
targetEntityType: Option[Option[String]] = None,
targetEntityId: Option[Option[String]] = None,
limit: Option[Int] = None,
reversed: Option[Boolean] = None
)(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
DB readOnly { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
val whereClause = sqls.toAndConditionOpt(
startTime.map(x => sqls"eventTime >= $x"),
untilTime.map(x => sqls"eventTime < $x"),
entityType.map(x => sqls"entityType = $x"),
entityId.map(x => sqls"entityId = $x"),
eventNames.map(x =>
sqls.toOrConditionOpt(x.map(y =>
Some(sqls"event = $y")
): _*)
).getOrElse(None),
targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
.getOrElse(sqls"targetEntityType IS NULL")),
targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
.getOrElse(sqls"targetEntityId IS NULL"))
).map(sqls.where(_)).getOrElse(sqls"")
val orderByClause = reversed.map(x =>
if (x) sqls"eventTime desc" else sqls"eventTime asc"
).getOrElse(sqls"eventTime asc")
val limitClause = limit.map(x =>
if (x < 0) sqls"" else sqls.limit(x)
).getOrElse(sqls"")
val q = sql"""
select
id,
event,
entityType,
entityId,
targetEntityType,
targetEntityId,
properties,
eventTime,
eventTimeZone,
tags,
prId,
creationTime,
creationTimeZone
from $tableName
$whereClause
order by $orderByClause
$limitClause
"""
q.map(resultToEvent).list().apply().toIterator
}
}
private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = {
Event(
eventId = rs.stringOpt("id"),
event = rs.string("event"),
entityType = rs.string("entityType"),
entityId = rs.string("entityId"),
targetEntityType = rs.stringOpt("targetEntityType"),
targetEntityId = rs.stringOpt("targetEntityId"),
properties = rs.stringOpt("properties").map(p =>
DataMap(read[JObject](p))).getOrElse(DataMap()),
eventTime = new DateTime(rs.jodaDateTime("eventTime"),
DateTimeZone.forID(rs.string("eventTimeZone"))),
tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
prId = rs.stringOpt("prId"),
creationTime = new DateTime(rs.jodaDateTime("creationTime"),
DateTimeZone.forID(rs.string("creationTimeZone")))
)
}
}