blob: d723d07908a4041bd65fc72e45ad020f86b63556 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.predictionio.data.storage.jdbc
import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage}
import org.specs2._
import org.specs2.specification.Step
class LEventsSpec extends Specification with TestEvents {
def is = s2"""
PredictionIO Storage LEvents Specification
Events can be implemented by:
- JDBCLEvents ${jdbcLEvents}
"""
def jdbcLEvents = sequential ^ s2"""
JDBCLEvents should
- behave like any LEvents implementation ${events(jdbcDO)}
"""
val appId = 1
def events(eventClient: LEvents) = sequential ^ s2"""
init default ${initDefault(eventClient)}
insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
insert and delete by ID ${insertAndDelete(eventClient)}
insert test user events ${insertTestUserEvents(eventClient)}
find user events ${findUserEvents(eventClient)}
aggregate user properties ${aggregateUserProperties(eventClient)}
aggregate one user properties ${aggregateOneUserProperties(eventClient)}
aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
init channel ${initChannel(eventClient)}
insert 2 events to channel ${insertChannel(eventClient)}
insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
find events from channel ${findChannel(eventClient)}
remove default ${removeDefault(eventClient)}
remove channel ${removeChannel(eventClient)}
"""
val dbName = "test_pio_storage_events_" + hashCode
def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
def initDefault(eventClient: LEvents) = {
eventClient.init(appId)
}
def insertAndGetEvents(eventClient: LEvents) = {
// events from TestEvents trait
val listOfEvents = List(r1,r2,r3)
val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
val insertedEventId: List[String] = insertResp
val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
.map { case (e, id) => Some(e.copy(eventId = Some(id))) }
val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
val getEvents = getResp
insertedEvent must containTheSameElementsAs(getEvents)
}
def insertAndGetTimezone(eventClient: LEvents) = {
val listOfEvents = List(tz1, tz2, tz3)
val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
val insertedEventId: List[String] = insertResp
val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
.map { case (e, id) => Some(e.copy(eventId = Some(id))) }
val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
val getEvents = getResp
insertedEvent must containTheSameElementsAs(getEvents)
}
def insertAndDelete(eventClient: LEvents) = {
val eventId = eventClient.insert(r2, appId)
val resultBefore = eventClient.get(eventId, appId)
val expectedBefore = r2.copy(eventId = Some(eventId))
val deleteStatus = eventClient.delete(eventId, appId)
val resultAfter = eventClient.get(eventId, appId)
(resultBefore must beEqualTo(Some(expectedBefore))) and
(deleteStatus must beEqualTo(true)) and
(resultAfter must beEqualTo(None))
}
def insertTestUserEvents(eventClient: LEvents) = {
// events from TestEvents trait
val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
listOfEvents.map{ eventClient.insert(_, appId) }
success
}
def findUserEvents(eventClient: LEvents) = {
val results: List[Event] = eventClient.find(
appId = appId,
entityType = Some("user"))
.toList
.map(e => e.copy(eventId = None)) // ignore eventID
// same events in insertTestUserEvents
val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
results must containTheSameElementsAs(expected)
}
def aggregateUserProperties(eventClient: LEvents) = {
val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
appId = appId,
entityType = "user")
val expected = Map(
"u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
"u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
)
result must beEqualTo(expected)
}
def aggregateOneUserProperties(eventClient: LEvents) = {
val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
appId = appId,
entityType = "user",
entityId = "u1")
val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
result must beEqualTo(expected)
}
def aggregateNonExistentUserProperties(eventClient: LEvents) = {
val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
appId = appId,
entityType = "user",
entityId = "u999999")
result must beEqualTo(None)
}
val channelId = 12
def initChannel(eventClient: LEvents) = {
eventClient.init(appId, Some(channelId))
}
def insertChannel(eventClient: LEvents) = {
// events from TestEvents trait
val listOfEvents = List(r4,r5)
listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
success
}
def insertAndDeleteChannel(eventClient: LEvents) = {
val eventId = eventClient.insert(r2, appId, Some(channelId))
val resultBefore = eventClient.get(eventId, appId, Some(channelId))
val expectedBefore = r2.copy(eventId = Some(eventId))
val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
val resultAfter = eventClient.get(eventId, appId, Some(channelId))
(resultBefore must beEqualTo(Some(expectedBefore))) and
(deleteStatus must beEqualTo(true)) and
(resultAfter must beEqualTo(None))
}
def findChannel(eventClient: LEvents) = {
val results: List[Event] = eventClient.find(
appId = appId,
channelId = Some(channelId)
)
.toList
.map(e => e.copy(eventId = None)) // ignore eventId
// same events in insertChannel
val expected = List(r4, r5)
results must containTheSameElementsAs(expected)
}
def removeDefault(eventClient: LEvents) = {
eventClient.remove(appId)
}
def removeChannel(eventClient: LEvents) = {
eventClient.remove(appId, Some(channelId))
}
}