use java event store and data map methods
diff --git a/src/main/java/org/template/recommendation/Algorithm.java b/src/main/java/org/template/recommendation/Algorithm.java
index 3cf663f..1f89fbe 100644
--- a/src/main/java/org/template/recommendation/Algorithm.java
+++ b/src/main/java/org/template/recommendation/Algorithm.java
@@ -3,7 +3,8 @@
import com.google.common.collect.Sets;
import io.prediction.controller.PAlgorithm;
import io.prediction.data.storage.Event;
-import io.prediction.data.store.LEventStore;
+import io.prediction.data.store.JavaOptionHelper;
+import io.prediction.data.store.LJavaEventStore;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -19,7 +20,6 @@
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
-import scala.collection.JavaConversions;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
@@ -195,20 +195,19 @@
try {
List<double[]> result = new ArrayList<>();
- List<Event> events = JavaConversions.asJavaList(LEventStore.findByEntity(
+ List<Event> events = LJavaEventStore.findByEntity(
ap.getAppName(),
"user",
query.getUserEntityId(),
- Option.apply((String) null),
- Option.apply(JavaConversions.asScalaIterable(ap.getSimilarItemEvents()).toSeq()),
- Option.apply(Option.apply("item")),
- Option.apply((Option<String>) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply((Object) 10),
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.some(ap.getSimilarItemEvents()),
+ JavaOptionHelper.some(JavaOptionHelper.some("item")),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.some(10),
true,
- Duration.apply(200, TimeUnit.MILLISECONDS)
- ).toSeq());
+ Duration.apply(10, TimeUnit.SECONDS));
for (final Event event : events) {
if (event.targetEntityId().isDefined()) {
@@ -337,26 +336,25 @@
private Set<String> unavailableItemEntityIds() {
try {
- List<Event> unavailableConstraintEvents = JavaConversions.asJavaList(LEventStore.findByEntity(
+ List<Event> unavailableConstraintEvents = LJavaEventStore.findByEntity(
ap.getAppName(),
"constraint",
"unavailableItems",
- Option.apply((String) null),
- Option.apply(JavaConversions.asScalaIterable(Collections.singletonList("$set")).toSeq()),
- Option.apply(Option.apply((String) null)),
- Option.apply((Option<String>) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply((Object) 1),
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.some(Collections.singletonList("$set")),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.some(1),
true,
- Duration.apply(200, TimeUnit.MILLISECONDS)
- ).toSeq());
+ Duration.apply(10, TimeUnit.SECONDS));
if (unavailableConstraintEvents.isEmpty()) return Collections.emptySet();
Event unavailableConstraint = unavailableConstraintEvents.get(0);
- List<String> unavailableItems = JavaConversions.asJavaList(Helper.dataMapGetStringList(unavailableConstraint.properties(), "items"));
+ List<String> unavailableItems = unavailableConstraint.properties().getStringList("items");
return new HashSet<>(unavailableItems);
} catch (Exception e) {
@@ -370,22 +368,21 @@
try {
Set<String> result = new HashSet<>();
- List<Event> seenEvents = JavaConversions.asJavaList(LEventStore.findByEntity(
+ List<Event> seenEvents = LJavaEventStore.findByEntity(
ap.getAppName(),
"user",
userEntityId,
- Option.apply((String) null),
- Option.apply(JavaConversions.asScalaIterable(ap.getSeenItemEvents()).toSeq()),
- Option.apply(Option.apply("item")),
- Option.apply((Option<String>) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply(null),
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.some(ap.getSeenItemEvents()),
+ JavaOptionHelper.some(JavaOptionHelper.some("item")),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<Integer>none(),
true,
- Duration.apply(200, TimeUnit.MILLISECONDS)
- ).toSeq());
+ Duration.apply(10, TimeUnit.SECONDS));
- for (Event event: seenEvents) {
+ for (Event event : seenEvents) {
result.add(event.targetEntityId().get());
}
diff --git a/src/main/java/org/template/recommendation/DataSource.java b/src/main/java/org/template/recommendation/DataSource.java
index 3443bd2..1b400fb 100644
--- a/src/main/java/org/template/recommendation/DataSource.java
+++ b/src/main/java/org/template/recommendation/DataSource.java
@@ -6,7 +6,8 @@
import io.prediction.controller.PDataSource;
import io.prediction.data.storage.Event;
import io.prediction.data.storage.PropertyMap;
-import io.prediction.data.store.PEventStore;
+import io.prediction.data.store.JavaOptionHelper;
+import io.prediction.data.store.PJavaEventStore;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -16,7 +17,6 @@
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions$;
-import scala.collection.Seq;
import java.util.Collections;
import java.util.HashMap;
@@ -34,21 +34,21 @@
@Override
public TrainingData readTraining(SparkContext sc) {
- JavaPairRDD<String,User> usersRDD = PEventStore.aggregateProperties(
+ JavaPairRDD<String,User> usersRDD = PJavaEventStore.aggregateProperties(
dsp.getAppName(),
"user",
- Option.apply((String) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply((Seq<String>) null),
- sc).toJavaRDD()
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<List<String>>none(),
+ sc)
.mapToPair(new PairFunction<Tuple2<String, PropertyMap>, String, User>() {
@Override
public Tuple2<String, User> call(Tuple2<String, PropertyMap> entityIdProperty) throws Exception {
Set<String> keys = JavaConversions$.MODULE$.setAsJavaSet(entityIdProperty._2().keySet());
Map<String, String> properties = new HashMap<>();
for (String key : keys) {
- properties.put(key, Helper.dataMapGet(entityIdProperty._2(), key, String.class));
+ properties.put(key, entityIdProperty._2().get(key, String.class));
}
User user = new User(entityIdProperty._1(), ImmutableMap.copyOf(properties));
@@ -57,35 +57,35 @@
}
});
- JavaPairRDD<String, Item> itemsRDD = PEventStore.aggregateProperties(
+ JavaPairRDD<String, Item> itemsRDD = PJavaEventStore.aggregateProperties(
dsp.getAppName(),
"item",
- Option.apply((String) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply((Seq<String>) null),
- sc).toJavaRDD()
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<List<String>>none(),
+ sc)
.mapToPair(new PairFunction<Tuple2<String, PropertyMap>, String, Item>() {
@Override
public Tuple2<String, Item> call(Tuple2<String, PropertyMap> entityIdProperty) throws Exception {
- List<String> categories = JavaConversions$.MODULE$.seqAsJavaList(Helper.dataMapGetStringList(entityIdProperty._2(), "categories"));
+ List<String> categories = entityIdProperty._2().getStringList("categories");
Item item = new Item(entityIdProperty._1(), ImmutableSet.copyOf(categories));
return new Tuple2<>(item.getEntityId(), item);
}
});
- JavaRDD<UserItemEvent> viewEventsRDD = PEventStore.find(
+ JavaRDD<UserItemEvent> viewEventsRDD = PJavaEventStore.find(
dsp.getAppName(),
- Option.apply((String) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply("user"),
- Option.apply((String) null),
- Option.apply(JavaConversions$.MODULE$.collectionAsScalaIterable(Collections.singleton("view")).toSeq()),
- Option.apply((Option<String>) null),
- Option.apply((Option<String>) null),
- sc).toJavaRDD()
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.some("user"),
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.some(Collections.singletonList("view")),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<Option<String>>none(),
+ sc)
.map(new Function<Event, UserItemEvent>() {
@Override
public UserItemEvent call(Event event) throws Exception {
@@ -93,17 +93,17 @@
}
});
- JavaRDD<UserItemEvent> buyEventsRDD = PEventStore.find(
+ JavaRDD<UserItemEvent> buyEventsRDD = PJavaEventStore.find(
dsp.getAppName(),
- Option.apply((String) null),
- Option.apply((DateTime) null),
- Option.apply((DateTime) null),
- Option.apply("user"),
- Option.apply((String) null),
- Option.apply(JavaConversions$.MODULE$.collectionAsScalaIterable(Collections.singleton("buy")).toSeq()),
- Option.apply((Option<String>) null),
- Option.apply((Option<String>) null),
- sc).toJavaRDD()
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.<DateTime>none(),
+ JavaOptionHelper.some("user"),
+ JavaOptionHelper.<String>none(),
+ JavaOptionHelper.some(Collections.singletonList("buy")),
+ JavaOptionHelper.<Option<String>>none(),
+ JavaOptionHelper.<Option<String>>none(),
+ sc)
.map(new Function<Event, UserItemEvent>() {
@Override
public UserItemEvent call(Event event) throws Exception {
diff --git a/src/main/scala/org/template/recommendation/Helper.scala b/src/main/scala/org/template/recommendation/Helper.scala
index f24065d..5b021e0 100644
--- a/src/main/scala/org/template/recommendation/Helper.scala
+++ b/src/main/scala/org/template/recommendation/Helper.scala
@@ -1,8 +1,5 @@
package org.template.recommendation
-import io.prediction.data.storage.BiMap
-import io.prediction.data.storage.DataMap
-
object Helper {
def ofType[T]( klass: java.lang.Class[T] ) = {
val manifest = new Manifest[T] {
@@ -13,15 +10,4 @@
manifest.asInstanceOf[Manifest[T]]
}
-
- def dataMapGet[T](dataMap: DataMap, key: String, clazz: java.lang.Class[T]) = {
- val manifest = ofType(clazz)
-
- dataMap.get(key)(manifest)
- }
-
- def dataMapGetStringList(dataMap: DataMap, key: String): List[String] = {
- dataMap.get(key)(manifest[List[String]])
- }
-
}