blob: 1d03634e122f2d40a1362c85aa59d114e8080f53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.predictionio.data.store.python
import java.sql.Timestamp
import org.apache.predictionio.data.store.PEventStore
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.joda.time.DateTime
/** This object provides a set of operation to access Event Store
* with Spark's parallelization
*/
object PPythonEventStore {
/** Read events from Event Store
*
* @param appName return events of this app
* @param channelName return events of this channel (default channel if it's None)
* @param startTime return events with eventTime >= startTime
* @param untilTime return events with eventTime < untilTime
* @param entityType return events of this entityType
* @param entityId return events of this entityId
* @param eventNames return events with any of these event names.
* @param targetEntityType return events of this targetEntityType:
* - None means no restriction on targetEntityType
* - Some(None) means no targetEntityType for this event
* - Some(Some(x)) means targetEntityType should match x.
* @param targetEntityId return events of this targetEntityId
* - None means no restriction on targetEntityId
* - Some(None) means no targetEntityId for this event
* - Some(Some(x)) means targetEntityId should match x.
* @param spark Spark context
* @return DataFrame
*/
def find(
appName: String,
channelName: String,
startTime: Timestamp,
untilTime: Timestamp,
entityType: String,
entityId: String,
eventNames: Array[String],
targetEntityType: String,
targetEntityId: String
)(spark: SparkSession): DataFrame = {
import spark.implicits._
val colNames: Seq[String] =
Seq(
"eventId",
"event",
"entityType",
"entityId",
"targetEntityType",
"targetEntityId",
"eventTime",
"tags",
"prId",
"creationTime",
"fields"
)
PEventStore.find(appName,
Option(channelName),
Option(startTime).map(t => new DateTime(t.getTime)),
Option(untilTime).map(t => new DateTime(t.getTime)),
Option(entityType),
Option(entityId),
Option(eventNames),
Option(Option(targetEntityType)),
Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
(
e.eventId,
e.event,
e.entityType,
e.entityId,
e.targetEntityType.orNull,
e.targetEntityId.orNull,
new Timestamp(e.eventTime.getMillis),
e.tags.mkString("\t"),
e.prId.orNull,
new Timestamp(e.creationTime.getMillis),
e.properties.fields.mapValues(_.values.toString)
)
}.toDF(colNames: _*)
}
/** Aggregate properties of entities based on these special events:
* \$set, \$unset, \$delete events.
*
* @param appName use events of this app
* @param entityType aggregate properties of the entities of this entityType
* @param channelName use events of this channel (default channel if it's None)
* @param startTime use events with eventTime >= startTime
* @param untilTime use events with eventTime < untilTime
* @param required only keep entities with these required properties defined
* @param spark Spark session
* @return DataFrame DataFrame of entityId and PropetyMap pair
*/
def aggregateProperties(
appName: String,
entityType: String,
channelName: String,
startTime: Timestamp,
untilTime: Timestamp,
required: Array[String]
)
(spark: SparkSession): DataFrame = {
import spark.implicits._
val colNames: Seq[String] =
Seq(
"entityId",
"firstUpdated",
"lastUpdated",
"fields"
)
PEventStore.aggregateProperties(appName,
entityType,
Option(channelName),
Option(startTime).map(t => new DateTime(t.getTime)),
Option(untilTime).map(t => new DateTime(t.getTime)),
Option(required.toSeq))(spark.sparkContext).map { x =>
val m = x._2
(x._1,
new Timestamp(m.firstUpdated.getMillis),
new Timestamp(m.lastUpdated.getMillis),
m.fields.mapValues(_.values.toString)
)
}.toDF(colNames: _*)
}
}