| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.predictionio.data.store.java |
| |
| import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutorService} |
| |
| import org.apache.predictionio.data.storage.Event |
| import org.apache.predictionio.data.store.LEventStore |
| import org.joda.time.DateTime |
| |
| import scala.collection.JavaConversions |
| import scala.concurrent.duration.Duration |
| import scala.compat.java8.FutureConverters._ |
| |
| /** This Java-friendly object provides a set of operation to access Event Store |
| * without going through Spark's parallelization. |
| * |
| * Note that blocking methods of this object uses |
| * `scala.concurrent.ExecutionContext.Implicits.global` internally. |
| * Since this is a thread pool which has a number of threads equal to available |
| * processors, parallelism is limited up to the number of processors. |
| * |
| * If this limitation become bottleneck of resource usage, you can increase the |
| * number of threads by declaring following VM options before calling "pio deploy": |
| * |
| * <pre> |
| * export JAVA_OPTS="$JAVA_OPTS \ |
| * -Dscala.concurrent.context.numThreads=1000 \ |
| * -Dscala.concurrent.context.maxThreads=1000" |
| * </pre> |
| * |
| * You can learn more about the global execution context in the Scala documentation: |
| * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]] |
| */ |
| object LJavaEventStore { |
| |
| /** Reads events of the specified entity. May use this in Algorithm's predict() |
| * or Serving logic to have fast event store access. |
| * |
| * @param appName return events of this app |
| * @param entityType return events of this entityType |
| * @param entityId return events of this entityId |
| * @param channelName return events of this channel (default channel if it's None) |
| * @param eventNames return events with any of these event names. |
| * @param targetEntityType return events of this targetEntityType: |
| * - None means no restriction on targetEntityType |
| * - Some(None) means no targetEntityType for this event |
| * - Some(Some(x)) means targetEntityType should match x. |
| * @param targetEntityId return events of this targetEntityId |
| * - None means no restriction on targetEntityId |
| * - Some(None) means no targetEntityId for this event |
| * - Some(Some(x)) means targetEntityId should match x. |
| * @param startTime return events with eventTime >= startTime |
| * @param untilTime return events with eventTime < untilTime |
| * @param limit Limit number of events. Get all events if None or Some(-1) |
| * @param latest Return latest event first |
| * @return java.util.List[Event] |
| */ |
| def findByEntity( |
| appName: String, |
| entityType: String, |
| entityId: String, |
| channelName: Option[String], |
| eventNames: Option[java.util.List[String]], |
| targetEntityType: Option[Option[String]], |
| targetEntityId: Option[Option[String]], |
| startTime: Option[DateTime], |
| untilTime: Option[DateTime], |
| limit: Option[Integer], |
| latest: Boolean, |
| timeout: Duration): java.util.List[Event] = { |
| |
| val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) |
| val limitInt = limit.map(_.intValue()) |
| |
| JavaConversions.seqAsJavaList( |
| LEventStore.findByEntity( |
| appName, |
| entityType, |
| entityId, |
| channelName, |
| eventNamesSeq, |
| targetEntityType, |
| targetEntityId, |
| startTime, |
| untilTime, |
| limitInt, |
| latest, |
| timeout |
| ).toSeq) |
| } |
| |
| /** Reads events of the specified entity. May use this in Algorithm's predict() |
| * or Serving logic to have fast event store access. |
| * |
| * @param appName return events of this app |
| * @param entityType return events of this entityType |
| * @param entityId return events of this entityId |
| * @param channelName return events of this channel (default channel if it's None) |
| * @param eventNames return events with any of these event names. |
| * @param targetEntityType return events of this targetEntityType: |
| * - None means no restriction on targetEntityType |
| * - Some(None) means no targetEntityType for this event |
| * - Some(Some(x)) means targetEntityType should match x. |
| * @param targetEntityId return events of this targetEntityId |
| * - None means no restriction on targetEntityId |
| * - Some(None) means no targetEntityId for this event |
| * - Some(Some(x)) means targetEntityId should match x. |
| * @param startTime return events with eventTime >= startTime |
| * @param untilTime return events with eventTime < untilTime |
| * @param limit Limit number of events. Get all events if None or Some(-1) |
| * @param latest Return latest event first |
| * @return CompletableFuture[java.util.List[Event]] |
| */ |
| def findByEntityAsync( |
| appName: String, |
| entityType: String, |
| entityId: String, |
| channelName: Option[String], |
| eventNames: Option[java.util.List[String]], |
| targetEntityType: Option[Option[String]], |
| targetEntityId: Option[Option[String]], |
| startTime: Option[DateTime], |
| untilTime: Option[DateTime], |
| limit: Option[Integer], |
| latest: Boolean, |
| executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = { |
| |
| val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) |
| val limitInt = limit.map(_.intValue()) |
| implicit val ec = fromExecutorService(executorService) |
| |
| LEventStore.findByEntityAsync( |
| appName, |
| entityType, |
| entityId, |
| channelName, |
| eventNamesSeq, |
| targetEntityType, |
| targetEntityId, |
| startTime, |
| untilTime, |
| limitInt, |
| latest |
| ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture |
| } |
| |
| /** Reads events generically. If entityType or entityId is not specified, it |
| * results in table scan. |
| * |
| * @param appName return events of this app |
| * @param entityType return events of this entityType |
| * - None means no restriction on entityType |
| * - Some(x) means entityType should match x. |
| * @param entityId return events of this entityId |
| * - None means no restriction on entityId |
| * - Some(x) means entityId should match x. |
| * @param channelName return events of this channel (default channel if it's None) |
| * @param eventNames return events with any of these event names. |
| * @param targetEntityType return events of this targetEntityType: |
| * - None means no restriction on targetEntityType |
| * - Some(None) means no targetEntityType for this event |
| * - Some(Some(x)) means targetEntityType should match x. |
| * @param targetEntityId return events of this targetEntityId |
| * - None means no restriction on targetEntityId |
| * - Some(None) means no targetEntityId for this event |
| * - Some(Some(x)) means targetEntityId should match x. |
| * @param startTime return events with eventTime >= startTime |
| * @param untilTime return events with eventTime < untilTime |
| * @param limit Limit number of events. Get all events if None or Some(-1) |
| * @return java.util.List[Event] |
| */ |
| def find( |
| appName: String, |
| entityType: Option[String], |
| entityId: Option[String], |
| channelName: Option[String], |
| eventNames: Option[java.util.List[String]], |
| targetEntityType: Option[Option[String]], |
| targetEntityId: Option[Option[String]], |
| startTime: Option[DateTime], |
| untilTime: Option[DateTime], |
| limit: Option[Integer], |
| timeout: Duration): java.util.List[Event] = { |
| |
| val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) |
| val limitInt = limit.map(_.intValue()) |
| |
| JavaConversions.seqAsJavaList( |
| LEventStore.find( |
| appName, |
| entityType, |
| entityId, |
| channelName, |
| eventNamesSeq, |
| targetEntityType, |
| targetEntityId, |
| startTime, |
| untilTime, |
| limitInt, |
| timeout |
| ).toSeq) |
| } |
| |
| /** Reads events generically. If entityType or entityId is not specified, it |
| * results in table scan. |
| * |
| * @param appName return events of this app |
| * @param entityType return events of this entityType |
| * - None means no restriction on entityType |
| * - Some(x) means entityType should match x. |
| * @param entityId return events of this entityId |
| * - None means no restriction on entityId |
| * - Some(x) means entityId should match x. |
| * @param channelName return events of this channel (default channel if it's None) |
| * @param eventNames return events with any of these event names. |
| * @param targetEntityType return events of this targetEntityType: |
| * - None means no restriction on targetEntityType |
| * - Some(None) means no targetEntityType for this event |
| * - Some(Some(x)) means targetEntityType should match x. |
| * @param targetEntityId return events of this targetEntityId |
| * - None means no restriction on targetEntityId |
| * - Some(None) means no targetEntityId for this event |
| * - Some(Some(x)) means targetEntityId should match x. |
| * @param startTime return events with eventTime >= startTime |
| * @param untilTime return events with eventTime < untilTime |
| * @param limit Limit number of events. Get all events if None or Some(-1) |
| * @return CompletableFuture[java.util.List[Event]] |
| */ |
| def findAsync( |
| appName: String, |
| entityType: Option[String], |
| entityId: Option[String], |
| channelName: Option[String], |
| eventNames: Option[java.util.List[String]], |
| targetEntityType: Option[Option[String]], |
| targetEntityId: Option[Option[String]], |
| startTime: Option[DateTime], |
| untilTime: Option[DateTime], |
| limit: Option[Integer], |
| executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = { |
| |
| val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) |
| val limitInt = limit.map(_.intValue()) |
| implicit val ec = fromExecutorService(executorService) |
| |
| LEventStore.findAsync( |
| appName, |
| entityType, |
| entityId, |
| channelName, |
| eventNamesSeq, |
| targetEntityType, |
| targetEntityId, |
| startTime, |
| untilTime, |
| limitInt |
| ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture |
| } |
| |
| } |