Merge branch 'livedoc' into develop

# Conflicts:
#	README.md
diff --git a/.travis.yml b/.travis.yml
index 49cbb65..70167f3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -47,118 +47,64 @@
   matrix:
     - BUILD_TYPE=Unit
       METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
     - BUILD_TYPE=Integration
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
-      PIO_ELASTICSEARCH_VERSION=1.7.3
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
-      PIO_SCALA_VERSION=2.10.6
-      PIO_SPARK_VERSION=1.6.3
-      PIO_ELASTICSEARCH_VERSION=1.7.3
-
-    - BUILD_TYPE=Unit
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-    - BUILD_TYPE=Integration
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-      PIO_ELASTICSEARCH_VERSION=1.7.3
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
+      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=S3
       PIO_ELASTICSEARCH_VERSION=1.7.3
     - BUILD_TYPE=Integration
       METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.1.1
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-
-    - BUILD_TYPE=Unit
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.2.0
-    - BUILD_TYPE=Integration
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.2.0
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.2.0
-      PIO_ELASTICSEARCH_VERSION=1.7.3
-    - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.2.0
-      PIO_ELASTICSEARCH_VERSION=5.5.2
+      PIO_ELASTICSEARCH_VERSION=5.6.9
     - BUILD_TYPE=Integration
       METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3
-      PIO_SCALA_VERSION=2.11.8
-      PIO_SPARK_VERSION=2.2.0
-      PIO_ELASTICSEARCH_VERSION=5.5.2
-
-    - BUILD_TYPE=Unit
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.12
-      PIO_SPARK_VERSION=2.3.1
-      PIO_HADOOP_VERSION=2.7.7
-    - BUILD_TYPE=Integration
-      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
-      PIO_SCALA_VERSION=2.11.12
-      PIO_SPARK_VERSION=2.3.1
-      PIO_HADOOP_VERSION=2.7.7
+      PIO_ELASTICSEARCH_VERSION=6.4.2
     - BUILD_TYPE=Integration
       METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS
+      PIO_HBASE_VERSION=1.2.6
+
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
       PIO_SCALA_VERSION=2.11.12
-      PIO_SPARK_VERSION=2.3.1
-      PIO_ELASTICSEARCH_VERSION=1.7.3
+      PIO_SPARK_VERSION=2.0.2
+      PIO_HADOOP_VERSION=2.6.5
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.1.3
+      PIO_HADOOP_VERSION=2.6.5
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.2.3
+      PIO_HADOOP_VERSION=2.6.5
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.3.3
+      PIO_HADOOP_VERSION=2.6.5
+
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.0.2
       PIO_HADOOP_VERSION=2.7.7
     - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
       PIO_SCALA_VERSION=2.11.12
-      PIO_SPARK_VERSION=2.3.1
-      PIO_ELASTICSEARCH_VERSION=5.5.2
+      PIO_SPARK_VERSION=2.1.3
       PIO_HADOOP_VERSION=2.7.7
     - BUILD_TYPE=Integration
-      METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL
       PIO_SCALA_VERSION=2.11.12
-      PIO_SPARK_VERSION=2.3.1
-      PIO_ELASTICSEARCH_VERSION=5.5.2
+      PIO_SPARK_VERSION=2.2.3
+      PIO_HADOOP_VERSION=2.7.7
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.3.3
+      PIO_HADOOP_VERSION=2.7.7
+    - BUILD_TYPE=Integration
+      METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS
+      PIO_SCALA_VERSION=2.11.12
+      PIO_SPARK_VERSION=2.4.0
       PIO_HADOOP_VERSION=2.7.7
 
     - BUILD_TYPE=LicenseCheck
diff --git a/LICENSE.txt b/LICENSE.txt
index 72f53a3..6e02f9b 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -404,15 +404,11 @@
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
-  com.sun.jersey # jersey-core # 1.8 (https://github.com/jersey/jersey-1.x)
-  com.sun.jersey # jersey-json # 1.8 (https://github.com/jersey/jersey-1.x)
-  com.sun.jersey # jersey-server # 1.8 (https://github.com/jersey/jersey-1.x)
   com.sun.jersey # jersey-core # 1.9 (https://github.com/jersey/jersey-1.x)
   com.sun.jersey # jersey-json # 1.9 (https://github.com/jersey/jersey-1.x)
   com.sun.jersey # jersey-server # 1.9 (https://github.com/jersey/jersey-1.x)
   javax.xml.bind # jaxb-api # 2.2.2
   com.sun.xml.bind # jaxb-impl # 2.2.3-1
-  org.jvnet.mimepull # mimepull # 1.9.5 (https://github.com/kohsuke/mimepull)
 
   which are available under the CDDL v1.1 license (https://glassfish.java.net/public/CDDL+GPL_1_1.html)
   
@@ -1131,7 +1127,7 @@
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
-  junit # junit # 4.11 (http://junit.org/junit4/)
+  junit # junit # 4.12 (http://junit.org/junit4/)
   
   which are available under the CPL v1.0 license (https://eclipse.org/legal/cpl-v10.html)
 
@@ -1353,19 +1349,20 @@
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
-  org.jamon # jamon-runtime # 2.3.1 (http://www.jamon.org/)
+  org.jamon # jamon-runtime # 2.4.1 (http://www.jamon.org/)
 
   which are available under the MPL v1.1 license (http://www.mozilla.org/MPL/MPL-1.1.txt)
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
   org.slf4j # slf4j-api # 1.7.25 (https://www.slf4j.org/)
-  org.slf4j # slf4j-api # 1.7.18 (https://www.slf4j.org/)
-  org.slf4j # slf4j-api # 1.7.16 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.10 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.2 (https://www.slf4j.org/)
   org.slf4j # slf4j-log4j12 # 1.7.18 (https://www.slf4j.org/)
   org.slf4j # slf4j-log4j12 # 1.7.10 (https://www.slf4j.org/)
+  org.jruby.jcodings # jcodings # 1.0.8 (https://github.com/jruby/jcodings/)
+  org.jruby.joni # joni # 2.1.2 (https://github.com/jruby/joni/)
+  
   
   which are available under the MIT license (http://opensource.org/licenses/mit-license.php)
   
@@ -1670,7 +1667,7 @@
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
-  com.jcraft # jsch # 0.1.42 (http://www.jcraft.com/jsch/)
+  com.jcraft # jsch # 0.1.54 (http://www.jcraft.com/jsch/)
 
   which is available under the BSD license (http://www.jcraft.com/jsch/LICENSE.txt)
   
@@ -1703,15 +1700,15 @@
 --------------------------------------------------------------------------------
 Binary distribution bundles
 
-  org.scala-lang # scala-library # 2.11.8 (http://scala-lang.org/)
-  org.scala-lang # scala-compiler # 2.11.8 (http://scala-lang.org/)
-  org.scala-lang # scala-reflect # 2.11.8 (http://scala-lang.org/)
-  org.scala-lang # scalap # 2.11.8 (http://scala-lang.org/)
+  org.scala-lang # scala-library # 2.11.12 (http://scala-lang.org/)
+  org.scala-lang # scala-compiler # 2.11.12 (http://scala-lang.org/)
+  org.scala-lang # scala-reflect # 2.11.12 (http://scala-lang.org/)
+  org.scala-lang # scalap # 2.11.12 (http://scala-lang.org/)
   org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/)
-  org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 (http://scala-lang.org/)
   org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/)
-  org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/)
-  org.scala-lang.modules # scala-xml_2.11 # 1.0.4 (http://scala-lang.org/)
+  org.scala-lang.modules # scala-parser-combinators_2.11 # 1.1.0 (http://scala-lang.org/)
+  org.scala-lang.modules # scala-xml_2.11 # 1.0.5 (http://scala-lang.org/)
+  org.scala-lang.modules # scala-xml_2.11 # 1.0.6 (http://scala-lang.org/)
   
   which is available under the BSD license (http://www.scala-lang.org/downloads/license.html)
 
@@ -1780,5 +1777,5 @@
 --------------------------------------------------------------------------------
 The following libraries are from the public domain.
 
-  com.github.stephenc.high-scale-lib # high-scale-lib # 1.1.1 (https://github.com/stephenc/high-scale-lib)
   org.tukaani # xz # 1.0 (http://tukaani.org/xz/java.html)
+  org.reactivestreams # reactive-streams # 1.0.2 (http://www.reactive-streams.org/)
diff --git a/PMC.md b/PMC.md
index f61d75e..b21952e 100644
--- a/PMC.md
+++ b/PMC.md
@@ -27,35 +27,40 @@
 signing key.
 2. Add your public key to the `KEYS` file at the root of the source code tree.
 3. Create a new release branch, with version bumped to the next release version.
-    * `git checkout -b release/0.13.0`
-    * Replace all `0.13.0-SNAPSHOT` in the code tree to `0.13.0`.
-    * `git commit -am "Prepare 0.13.0-rc1"`
-    * `git tag -am "Apache PredictionIO 0.13.0-rc1" v0.13.0-rc1`
-4. If you have not done so, use SVN to checkout
+    * `git checkout -b release/0.14.0`
+    * Replace all `0.14.0-SNAPSHOT` in the code tree to `0.14.0`
+    * `git commit -am "Prepare 0.14.0-rc1"`
+    * `git tag -am "Apache PredictionIO 0.14.0-rc1" v0.14.0-rc1`
+4. Push the release branch and tag to the apache git repo.
+5. Wait for Travis to pass build on the release branch.
+6. Package a clean tarball for staging a release candidate.
+    * `git archive --format tar v0.14.0-rc1 >
+  ../apache-predictionio-0.14.0-rc1.tar`
+    * `cd ..; gzip apache-predictionio-0.14.0-rc1.tar`
+7. Generate detached signature for the release candidate.
+(http://apache.org/dev/release-signing.html#openpgp-ascii-detach-sig)
+    * `gpg --armor --output apache-predictionio-0.14.0-rc1.tar.gz.asc
+  --detach-sig apache-predictionio-0.14.0-rc1.tar.gz`
+8. Generate SHA512 checksums for the release candidate.
+    * `gpg --print-md SHA512 apache-predictionio-0.14.0-rc1.tar.gz >
+  apache-predictionio-0.14.0-rc1.tar.gz.sha512`
+9. Run `./make-distribution.sh` and repeat steps 6 to 8 to create binary distribution release.
+    * `mv PredictionIO-0.14.0.tar.gz apache-predictionio-0.14.0-bin.tar.gz`
+    * `gpg --armor --output apache-predictionio-0.14.0-bin.tar.gz.asc
+  --detach-sig apache-predictionio-0.14.0-bin.tar.gz`
+    * `gpg --print-md SHA512 apache-predictionio-0.14.0-bin.tar.gz >
+  apache-predictionio-0.14.0-bin.tar.gz.sha512`
+10. If you have not done so, use SVN to checkout
 https://dist.apache.org/repos/dist/dev/predictionio. This is the area
 for staging release candidates for voting.
     * `svn co https://dist.apache.org/repos/dist/dev/predictionio`
-5.  Package a clean tarball for staging a release candidate.
-    * `git archive --format tar v0.13.0-rc1 >
-  ../apache-predictionio-0.13.0-rc1.tar`
-    * `cd ..; gzip apache-predictionio-0.13.0-rc1.tar`
-6. Generate detached signature for the release candidate.
-(http://apache.org/dev/release-signing.html#openpgp-ascii-detach-sig)
-    * `gpg --armor --output apache-predictionio-0.13.0-rc1.tar.gz.asc
-  --detach-sig apache-predictionio-0.13.0-rc1.tar.gz`
-7. Generate SHA512 checksums for the release candidate.
-    * `gpg --print-md SHA512 apache-predictionio-0.13.0-rc1.tar.gz >
-  apache-predictionio-0.13.0-rc1.tar.gz.sha512`
-8. Run `./make-distribution.sh` and repeat steps 5 to 7 to create binary distribution release.
-9. Create a subdirectory at the SVN staging area. The area should have a `KEYS` file.
-    * `mkdir apache-predictionio-0.13.0-rc1`
-    * `cp apache-predictionio-0.13.0-rc1.*
-  apache-predictionio-0.13.0-rc1`
-10. If you have updated the `KEYS` file, also copy that to the staging area.
-11. `svn commit`
-12. Set up credentials with Apache Nexus using the SBT Sonatype plugin. Put this
-in `~/.sbt/0.13/sonatype.sbt`. You can generate username and password tokens
-from ASF's Nexus instance.
+11. Create a subdirectory at the SVN staging area. The area should have a `KEYS` file.
+    * `mkdir apache-predictionio-0.14.0-rc1`
+    * `cp apache-predictionio-0.14.0-* apache-predictionio-0.14.0-rc1`
+12. If you have updated the `KEYS` file, also copy that to the staging area.
+13. `svn commit -m "Apache PredictionIO 0.14.0-rc1"`
+14. Set up credentials with Apache Nexus using the SBT Sonatype plugin. Put this
+in `~/.sbt/0.13/sonatype.sbt`.
 
   ```
   publishTo := {
@@ -66,69 +71,71 @@
         Some("releases" at nexus + "service/local/staging/deploy/maven2")
   }
 
-  credentials += Credentials("Sonatype Nexus Repository Manager", "repository.apache.org", "username_token", "password_token")
+  credentials += Credentials("Sonatype Nexus Repository Manager", "repository.apache.org", "<YOUR APACHE LDAP USERNAME>", "<YOUR APACHE LDAP PASSWORD>")
   ```
-13. `sbt/sbt +publishSigned +storage/publishSigned
-+dataElasticsearch/publishSigned` then close the staged repository on Apache
-Nexus.
-    * You may need to run `sbt/sbt publishLocal` first to avoid depedency errors.
-14. Wait for Travis to pass build on the release branch.
-15. Tag the release branch with a rc tag, e.g. `0.13.0-rc1`.
+15. Run `sbt/sbt +publishLocal` first and then run `sbt/sbt +publishSigned +storage/publishSigned`.
+Close the staged repository on Apache Nexus.
 16. Send out email for voting on PredictionIO dev mailing list.
 
   ```
-  Subject: [VOTE] Apache PredictionIO 0.13.0 Release (RC1)
+  Subject: [VOTE] Apache PredictionIO 0.14.0 Release (RC1)
 
-  This is the vote for 0.13.0 of Apache PredictionIO.
+  This is the vote for 0.14.0 of Apache PredictionIO.
 
   The vote will run for at least 72 hours and will close on Apr 7th, 2017.
 
-  The release candidate artifacts can be downloaded here: https://dist.apache.org/repos/dist/dev/predictionio/0.13.0-rc1/
+  The release candidate artifacts can be downloaded here: https://dist.apache.org/repos/dist/dev/predictionio/apache-predictionio-0.14.0-rc1/
 
-  Test results of RC5 can be found here: https://travis-ci.org/apache/predictionio/builds/xxx
+  Test results of RC1 can be found here: https://travis-ci.org/apache/predictionio/builds/xxx
 
   Maven artifacts are built from the release candidate artifacts above, and are provided as convenience for testing with engine templates. The Maven artifacts are provided at the Maven staging repo here: https://repository.apache.org/content/repositories/orgapachepredictionio-nnnn/
 
-  All JIRAs completed for this release are tagged with 'FixVersion = 0.13.0'. You can view them here: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320420&version=12337844
+  All JIRAs completed for this release are tagged with 'FixVersion = 0.14.0'. You can view them here: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320420&version=12337844
 
   The artifacts have been signed with Key : YOUR_KEY_ID
 
   Please vote accordingly:
 
-  [ ] +1, accept RC as the official 0.13.0 release
-  [ ] -1, do not accept RC as the official 0.13.0 release because...
+  [ ] +1, accept RC as the official 0.14.0 release
+  [ ] -1, do not accept RC as the official 0.14.0 release because...
   ```
-17. After the vote has been accepted, use SVN to checkout
+17. After the vote has been accepted, update `RELEASE.md`.
+18. Create a release tag
+19. Repeat steps 6 to 8 to create the official release, and step 15 to publish it.
+20. Use SVN to checkout
 https://dist.apache.org/repos/dist/release/predictionio/. This is the area
 for staging actual releases.
-18. Repeat steps 5 to 7 to create the official release, and step 13 to publish it.
-Also, remove old releases from the ASF distribution mirrors.
-19. Document breaking changes in http://predictionio.apache.org/resources/upgrade/.
-20. Update `RELEASE.md`.
-21. Send out an email to the following mailing lists: announce, general, user, dev.
+21. Create a subdirectory at the SVN staging area. The area should have a `KEYS` file.
+    * `mkdir 0.14.0`
+    * Copy the binary distribution from the dev/ tree to the release/ tree
+    * Copy the official release to the release/ tree
+22. If you have updated the `KEYS` file, also copy that to the staging area.
+23. Remove old releases from the ASF distribution mirrors.
+(https://www.apache.org/dev/mirrors.html#location)
+    * `svn delete 0.13.0`
+24. `svn commit -m "Apache PredictionIO 0.14.0"`
+25. Document breaking changes in https://predictionio.apache.org/resources/upgrade/.
+26. Mark the version as released on JIRA.
+(https://issues.apache.org/jira/projects/PIO?selectedItem=com.atlassian.jira.jira-projects-plugin%3Arelease-page&status=no-filter)
+27. Send out an email to the following mailing lists: announce, user, dev.
 
   ```
-  Subject: [ANNOUNCE] Apache PredictionIO 0.13.0 Release
+  Subject: [ANNOUNCE] Apache PredictionIO 0.14.0 Release
 
-  The Apache PredictionIO team would like to announce the release of Apache
-  PredictionIO 0.13.0.
+  The Apache PredictionIO team would like to announce the release of Apache PredictionIO 0.14.0.
 
   Release notes are here:
-  https://github.com/apache/predictionio/blob/release/0.13.0/RELEASE.md
+  https://github.com/apache/predictionio/blob/release/0.14.0/RELEASE.md
 
-  Apache PredictionIO is an open source Machine Learning Server built on
-  top of state-of-the-art open source stack, that enables developers to
-  manage and deploy production-ready predictive services for various kinds
-  of machine learning tasks.
+  Apache PredictionIO is an open source Machine Learning Server built on top of state-of-the-art open source stack, that enables developers to manage and deploy production-ready predictive services for various kinds of machine learning tasks.
 
   More details regarding Apache PredictionIO can be found here:
-  http://predictionio.apache.org/
+  https://predictionio.apache.org/
 
   The release artifacts can be downloaded here:
-  https://dist.apache.org/repos/dist/release/predictionio/0.13.0/
+  https://www.apache.org/dyn/closer.lua/predictionio/0.14.0/apache-predictionio-0.14.0-bin.tar.gz
 
-  All JIRAs completed for this release are tagged with 'FixVersion =
-  0.13.0'; the JIRA release notes can be found here:
+  All JIRAs completed for this release are tagged with 'FixVersion = 0.13.0'; the JIRA release notes can be found here:
   https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320420&version=12337844
 
   Thanks!
diff --git a/README.md b/README.md
index b8604ca..2b6134c 100644
--- a/README.md
+++ b/README.md
@@ -47,7 +47,6 @@
 *   [Installing Apache PredictionIO with
     Docker](http://predictionio.apache.org/install/install-docker/)
 
-
 ## Quick Start
 
 *   [Recommendation Engine Template Quick
diff --git a/bin/install.sh b/bin/install.sh
index d785b12..93e4eb5 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -20,7 +20,7 @@
 OS=`uname`
 SPARK_VERSION=2.1.1
 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring
-ELASTICSEARCH_VERSION=5.5.2
+ELASTICSEARCH_VERSION=5.6.9
 HBASE_VERSION=1.2.6
 POSTGRES_VERSION=42.0.0
 MYSQL_VERSION=5.1.41
diff --git a/bin/pio-class b/bin/pio-class
index 9e01a40..67ae562 100755
--- a/bin/pio-class
+++ b/bin/pio-class
@@ -44,7 +44,7 @@
 
 # Make sure the Apache Spark version meets the prerequisite if it is a binary
 # distribution
-MIN_SPARK_VERSION="1.6.3"
+MIN_SPARK_VERSION="2.0.2"
 if [ -z "$SPARK_HOME" ]; then
   echo -e "\033[0;31mSPARK_HOME must be set in conf/pio-env.sh, or in the environment!\033[0m"
   exit 1
diff --git a/bin/pio-shell b/bin/pio-shell
index cd119cd..e23041a 100755
--- a/bin/pio-shell
+++ b/bin/pio-shell
@@ -65,7 +65,6 @@
   # Get paths of assembly jars to pass to pyspark
   . ${PIO_HOME}/bin/compute-classpath.sh
   shift
-  export PYTHONSTARTUP=${PIO_HOME}/python/pypio/shell.py
   export PYTHONPATH=${PIO_HOME}/python
   ${SPARK_HOME}/bin/pyspark --jars ${ASSEMBLY_JARS} $@
 else
diff --git a/bin/pio-start-all b/bin/pio-start-all
index 15ac1a6..8eade0b 100755
--- a/bin/pio-start-all
+++ b/bin/pio-start-all
@@ -34,8 +34,6 @@
   echo "Starting Elasticsearch..."
   if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then
     ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME
-  elif [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOME" ]; then
-    ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOME
   fi
   if [ -n "$ELASTICSEARCH_HOME" ]; then
     if [ -n "$JAVA_HOME" ]; then
diff --git a/build.sbt b/build.sbt
index 308b409..7f2bc09 100644
--- a/build.sbt
+++ b/build.sbt
@@ -16,53 +16,17 @@
  */
 import PIOBuild._
 
-lazy val scalaSparkDepsVersion = Map(
-  "2.10" -> Map(
-    "1.6" -> Map(
-      "akka" -> "2.3.15",
-      "hadoop" -> "2.6.5",
-      "json4s" -> "3.2.10"),
-    "2.0" -> Map(
-      "akka" -> "2.3.16",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11"),
-    "2.1" -> Map(
-      "akka" -> "2.3.16",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11")),
-  "2.11" -> Map(
-    "1.6" -> Map(
-      "akka" -> "2.3.15",
-      "hadoop" -> "2.6.5",
-      "json4s" -> "3.2.10"),
-    "2.0" -> Map(
-      "akka" -> "2.4.17",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11"),
-    "2.1" -> Map(
-      "akka" -> "2.4.17",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11"),
-    "2.2" -> Map(
-      "akka" -> "2.4.17",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11"),
-    "2.3" -> Map(
-      "akka" -> "2.4.17",
-      "hadoop" -> "2.7.3",
-      "json4s" -> "3.2.11")))
-
 name := "apache-predictionio-parent"
 
-version in ThisBuild := "0.13.0"
+version in ThisBuild := "0.14.0-SNAPSHOT"
 
 organization in ThisBuild := "org.apache.predictionio"
 
-scalaVersion in ThisBuild := sys.props.getOrElse("scala.version", "2.11.8")
+scalaVersion in ThisBuild := sys.props.getOrElse("scala.version", "2.11.12")
 
 scalaBinaryVersion in ThisBuild := binaryVersion(scalaVersion.value)
 
-crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8")
+crossScalaVersions in ThisBuild := Seq("2.11.12")
 
 scalacOptions in ThisBuild ++= Seq("-deprecation", "-unchecked", "-feature")
 
@@ -73,35 +37,26 @@
   "-Xlint:deprecation", "-Xlint:unchecked")
 
 // Ignore differentiation of Spark patch levels
-sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", (if (scalaBinaryVersion.value == "2.10") "1.6.3" else "2.1.1"))
+sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", "2.1.3")
 
 sparkBinaryVersion in ThisBuild := binaryVersion(sparkVersion.value)
 
-akkaVersion in ThisBuild := sys.props.getOrElse(
-  "akka.version",
-  scalaSparkDepsVersion(scalaBinaryVersion.value)(sparkBinaryVersion.value)("akka"))
+hadoopVersion in ThisBuild := sys.props.getOrElse("hadoop.version", "2.7.7")
 
-lazy val es = sys.props.getOrElse("elasticsearch.version", "5.5.2")
+akkaVersion in ThisBuild := sys.props.getOrElse("akka.version", "2.5.17")
+
+lazy val es = sys.props.getOrElse("elasticsearch.version", "5.6.9")
 
 elasticsearchVersion in ThisBuild := es
 
-json4sVersion in ThisBuild := scalaSparkDepsVersion(scalaBinaryVersion.value)(sparkBinaryVersion.value)("json4s")
+hbaseVersion in ThisBuild := sys.props.getOrElse("hbase.version", "1.2.6")
 
-hadoopVersion in ThisBuild := sys.props.getOrElse(
-  "hadoop.version",
-  scalaSparkDepsVersion(scalaBinaryVersion.value)(sparkBinaryVersion.value)("hadoop"))
-
-val pioBuildInfoSettings = buildInfoSettings ++ Seq(
-  sourceGenerators in Compile += buildInfo,
-  buildInfoKeys := Seq[BuildInfoKey](
-    name,
-    version,
-    scalaVersion,
-    scalaBinaryVersion,
-    sbtVersion,
-    sparkVersion,
-    hadoopVersion),
-  buildInfoPackage := "org.apache.predictionio.core")
+json4sVersion in ThisBuild := {
+  sparkBinaryVersion.value match {
+    case "2.0" | "2.1" | "2.2" | "2.3" => "3.2.11"
+    case "2.4" => "3.5.3"
+  }
+}
 
 val conf = file("conf")
 
@@ -154,8 +109,6 @@
   settings(commonSettings: _*).
   settings(commonTestSettings: _*).
   enablePlugins(GenJavadocPlugin).
-  settings(unmanagedSourceDirectories in Compile +=
-    sourceDirectory.value / s"main/spark-${majorVersion(sparkVersion.value)}").
   disablePlugins(sbtassembly.AssemblyPlugin)
 
 val core = (project in file("core")).
@@ -163,23 +116,34 @@
   settings(commonSettings: _*).
   settings(commonTestSettings: _*).
   enablePlugins(GenJavadocPlugin).
-  settings(pioBuildInfoSettings: _*).
+  enablePlugins(BuildInfoPlugin).
+  settings(
+    buildInfoKeys := Seq[BuildInfoKey](
+      name,
+      version,
+      scalaVersion,
+      scalaBinaryVersion,
+      sbtVersion,
+      sparkVersion,
+      hadoopVersion),
+    buildInfoPackage := "org.apache.predictionio.core"
+  ).
   enablePlugins(SbtTwirl).
   disablePlugins(sbtassembly.AssemblyPlugin)
 
+val e2 = (project in file("e2")).
+  dependsOn(core).
+  settings(commonSettings: _*).
+  enablePlugins(GenJavadocPlugin).
+  disablePlugins(sbtassembly.AssemblyPlugin)
+
 val tools = (project in file("tools")).
-  dependsOn(core).
-  dependsOn(data).
+  dependsOn(e2).
   settings(commonSettings: _*).
   settings(commonTestSettings: _*).
+  settings(skip in publish := true).
   enablePlugins(GenJavadocPlugin).
-  enablePlugins(SbtTwirl).
-  settings(publishArtifact := false)
-
-val e2 = (project in file("e2")).
-  settings(commonSettings: _*).
-  enablePlugins(GenJavadocPlugin).
-  disablePlugins(sbtassembly.AssemblyPlugin)
+  enablePlugins(SbtTwirl)
 
 val dataEs = if (majorVersion(es) == 1) dataElasticsearch1 else dataElasticsearch
 
@@ -192,9 +156,9 @@
     dataS3)
 
 val storage = (project in file("storage"))
+  .settings(skip in publish := true)
   .aggregate(storageSubprojects map Project.projectToRef: _*)
   .disablePlugins(sbtassembly.AssemblyPlugin)
-  .settings(publishArtifact := false)
 
 val assembly = (project in file("assembly")).
   settings(commonSettings: _*)
@@ -203,6 +167,7 @@
   settings(commonSettings: _*).
   enablePlugins(ScalaUnidocPlugin).
   settings(
+    skip in publish := true,
     unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(dataElasticsearch, dataElasticsearch1),
     unidocProjectFilter in (JavaUnidoc, unidoc) := inAnyProject -- inProjects(dataElasticsearch, dataElasticsearch1),
     scalacOptions in (ScalaUnidoc, unidoc) ++= Seq(
@@ -292,7 +257,7 @@
   </parent>
   <scm>
     <connection>scm:git:github.com/apache/predictionio</connection>
-    <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/predictionio.git</developerConnection>
+    <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/predictionio.git</developerConnection>
     <url>github.com/apache/predictionio</url>
   </scm>
   <developers>
@@ -327,12 +292,9 @@
 testOptions in Test += Tests.Argument("-oDF")
 
 printBuildInfo := {
-  if (scalaBinaryVersion.value == "2.10")
-    streams.value.log.warn("Support for Scala 2.10 is deprecated. Please upgrade to a newer version of Scala.")
-  if (sparkBinaryVersion.value == "1.6")
-    streams.value.log.warn("Support for Spark 1.6 is deprecated. Please upgrade to a newer version of Spark.")
   println(s"PIO_SCALA_VERSION=${scalaVersion.value}")
   println(s"PIO_SPARK_VERSION=${sparkVersion.value}")
-  println(s"PIO_ELASTICSEARCH_VERSION=${elasticsearchVersion.value}")
   println(s"PIO_HADOOP_VERSION=${hadoopVersion.value}")
+  println(s"PIO_ELASTICSEARCH_VERSION=${elasticsearchVersion.value}")
+  println(s"PIO_HBASE_VERSION=${hbaseVersion.value}")
 }
diff --git a/common/build.sbt b/common/build.sbt
index 19e4f04..f9fd97b 100644
--- a/common/build.sbt
+++ b/common/build.sbt
@@ -20,9 +20,11 @@
 name := "apache-predictionio-common"
 
 libraryDependencies ++= Seq(
-  "io.spray"          %% "spray-can"     % "1.3.3",
-  "io.spray"          %% "spray-routing" % "1.3.3",
-  "com.typesafe.akka" %% "akka-actor"    % akkaVersion.value,
-  "com.typesafe.akka" %% "akka-slf4j"    % akkaVersion.value)
+  "com.typesafe.akka" %% "akka-actor"           % akkaVersion.value,
+  "com.typesafe.akka" %% "akka-slf4j"           % akkaVersion.value,
+  "com.typesafe.akka" %% "akka-http"            % "10.1.5",
+  "org.json4s"        %% "json4s-native"        % json4sVersion.value,
+  "com.typesafe.akka" %% "akka-stream"          % "2.5.12"
+)
 
 pomExtra := childrenPomExtra.value
diff --git a/common/src/main/resources/application.conf b/common/src/main/resources/application.conf
index c47d909..f0e6c8a 100644
--- a/common/src/main/resources/application.conf
+++ b/common/src/main/resources/application.conf
@@ -3,10 +3,3 @@
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "INFO"
 }
-
-spray.can {
-  server {
-    verbose-error-messages = "on"
-    request-timeout = 35s
-  }
-}
diff --git a/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala
new file mode 100644
index 0000000..62cb8de
--- /dev/null
+++ b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.akkahttpjson4s
+
+// Referenced from https://github.com/hseeberger/akka-http-json
+// because of the difference of supported json4s version.
+import java.lang.reflect.InvocationTargetException
+
+import akka.http.scaladsl.marshalling.{ Marshaller, ToEntityMarshaller }
+import akka.http.scaladsl.model.ContentTypeRange
+import akka.http.scaladsl.model.MediaType
+import akka.http.scaladsl.model.MediaTypes.`application/json`
+import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller }
+import akka.util.ByteString
+import org.json4s.{ Formats, MappingException, Serialization }
+import scala.collection.immutable.Seq
+
+/**
+  * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol.
+  *
+  * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope.
+  */
+object Json4sSupport extends Json4sSupport {
+
+  sealed abstract class ShouldWritePretty
+
+  final object ShouldWritePretty {
+    final object True  extends ShouldWritePretty
+    final object False extends ShouldWritePretty
+  }
+}
+
+/**
+  * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol.
+  *
+  * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope.
+  */
+trait Json4sSupport {
+  import Json4sSupport._
+
+  def unmarshallerContentTypes: Seq[ContentTypeRange] =
+    mediaTypes.map(ContentTypeRange.apply)
+
+  def mediaTypes: Seq[MediaType.WithFixedCharset] =
+    List(`application/json`)
+
+  private val jsonStringUnmarshaller =
+    Unmarshaller.byteStringUnmarshaller
+      .forContentTypes(unmarshallerContentTypes: _*)
+      .mapWithCharset {
+        case (ByteString.empty, _) => throw Unmarshaller.NoContentException
+        case (data, charset)       => data.decodeString(charset.nioCharset.name)
+      }
+
+  private val jsonStringMarshaller =
+    Marshaller.oneOf(mediaTypes: _*)(Marshaller.stringMarshaller)
+
+  /**
+    * HTTP entity => `A`
+    *
+    * @tparam A type to decode
+    * @return unmarshaller for `A`
+    */
+  implicit def unmarshaller[A: Manifest](implicit serialization: Serialization,
+    formats: Formats): FromEntityUnmarshaller[A] =
+    jsonStringUnmarshaller
+      .map(s => serialization.read(s))
+      .recover { _ => _ =>
+      { case MappingException(_, ite: InvocationTargetException) => throw ite.getCause }
+      }
+
+  /**
+    * `A` => HTTP entity
+    *
+    * @tparam A type to encode, must be upper bounded by `AnyRef`
+    * @return marshaller for any `A` value
+    */
+  implicit def marshaller[A <: AnyRef](implicit serialization: Serialization,
+    formats: Formats,
+    shouldWritePretty: ShouldWritePretty =
+    ShouldWritePretty.False): ToEntityMarshaller[A] =
+    shouldWritePretty match {
+      case ShouldWritePretty.False =>
+        jsonStringMarshaller.compose(serialization.write[A])
+      case ShouldWritePretty.True =>
+        jsonStringMarshaller.compose(serialization.writePretty[A])
+    }
+}
diff --git a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
index fa950aa..08ae09a 100644
--- a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
+++ b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
@@ -23,11 +23,10 @@
   * It is highly recommended to implement a stonger authentication mechanism
   */
 
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.headers.HttpChallenge
+import akka.http.scaladsl.server.{AuthenticationFailedRejection, Rejection, RequestContext}
 import com.typesafe.config.ConfigFactory
-import spray.http.HttpRequest
-import spray.routing.authentication._
-import spray.routing.{AuthenticationFailedRejection, RequestContext}
-
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
@@ -42,19 +41,18 @@
     val param = "accessKey"
   }
 
-  def withAccessKeyFromFile: RequestContext => Future[Authentication[HttpRequest]] = {
+  def withAccessKeyFromFile: RequestContext => Future[Either[Rejection, HttpRequest]] = {
     ctx: RequestContext =>
-      val accessKeyParamOpt = ctx.request.uri.query.get(ServerKey.param)
+      val accessKeyParamOpt = ctx.request.uri.query().get(ServerKey.param)
       Future {
-
         val passedKey = accessKeyParamOpt.getOrElse {
           Left(AuthenticationFailedRejection(
-            AuthenticationFailedRejection.CredentialsRejected, Nil))
+            AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None)))
         }
 
         if (!ServerKey.authEnforced || passedKey.equals(ServerKey.get)) Right(ctx.request)
         else Left(AuthenticationFailedRejection(
-          AuthenticationFailedRejection.CredentialsRejected, Nil))
+          AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None)))
 
       }
   }
diff --git a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
index 9292e21..7880b13 100644
--- a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
+++ b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala
@@ -15,19 +15,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.configuration
 
-/**
-  * Created by ykhodorkovsky on 2/26/16.
-  */
-
 import java.io.FileInputStream
 import java.security.KeyStore
-import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
 
 import com.typesafe.config.ConfigFactory
-import spray.io.ServerSSLEngineProvider
+import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
 
 trait SSLConfiguration {
 
@@ -39,7 +33,6 @@
   private val keyAlias = serverConfig.getString("org.apache.predictionio.server.ssl-key-alias")
 
   private val keyStore = {
-
     // Loading keystore from specified file
     val clientStore = KeyStore.getInstance("JKS")
     val inputStream = new FileInputStream(
@@ -50,7 +43,7 @@
   }
 
   // Creating SSL context
-  implicit def sslContext: SSLContext = {
+  def sslContext: SSLContext = {
     val context = SSLContext.getInstance("TLS")
     val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
     val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
@@ -60,15 +53,4 @@
     context
   }
 
-  // provide implicit SSLEngine with some protocols
-  implicit def sslEngineProvider: ServerSSLEngineProvider = {
-    ServerSSLEngineProvider { engine =>
-      engine.setEnabledCipherSuites(Array(
-        "TLS_RSA_WITH_AES_256_CBC_SHA",
-        "TLS_ECDH_ECDSA_WITH_RC4_128_SHA",
-        "TLS_RSA_WITH_AES_128_CBC_SHA"))
-      engine.setEnabledProtocols(Array("TLSv1", "TLSv1.2", "TLSv1.1"))
-      engine
-    }
-  }
 }
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index 16ebcd3..3cd2415 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -89,7 +89,7 @@
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.5.2
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.6.9
 # Optional basic HTTP auth
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret
diff --git a/conf/pio-vendors.sh b/conf/pio-vendors.sh
index 162372f..d68ff7e 100644
--- a/conf/pio-vendors.sh
+++ b/conf/pio-vendors.sh
@@ -20,19 +20,23 @@
 # `source conf/set_build_profile.sh $BUILD_PROFILE` to get the proper versions
 
 if [ -z "$PIO_SCALA_VERSION" ]; then
-    PIO_SCALA_VERSION="2.11.8"
+    PIO_SCALA_VERSION="2.11.12"
 fi
 
 if [ -z "$PIO_SPARK_VERSION" ]; then
-    PIO_SPARK_VERSION="2.1.1"
+    PIO_SPARK_VERSION="2.1.3"
 fi
 
 if [ -z "$PIO_HADOOP_VERSION" ]; then
-    PIO_HADOOP_VERSION="2.6.5"
+    PIO_HADOOP_VERSION="2.7.7"
 fi
 
 if [ -z "$PIO_ELASTICSEARCH_VERSION" ]; then
-    PIO_ELASTICSEARCH_VERSION="5.5.2"
+    PIO_ELASTICSEARCH_VERSION="5.6.9"
+fi
+
+if [ -z "$PIO_HBASE_VERSION" ]; then
+    PIO_HBASE_VERSION="1.2.6"
 fi
 
 ES_MAJOR=`echo $PIO_ELASTICSEARCH_VERSION | awk -F. '{print $1}'`
@@ -42,9 +46,12 @@
     export ES_TAG="1"
 else
     export ES_IMAGE="docker.elastic.co/elasticsearch/elasticsearch"
-    export ES_TAG="5.5.2"
+    export ES_TAG="$PIO_ELASTICSEARCH_VERSION"
 fi
 
+HBASE_MAJOR=`echo $PIO_HBASE_VERSION | awk -F. '{print $1 "." $2}'`
+export HBASE_TAG="$HBASE_MAJOR"
+
 PGSQL_JAR=postgresql-9.4-1204.jdbc41.jar
 PGSQL_DOWNLOAD=https://jdbc.postgresql.org/download/${PGSQL_JAR}
 
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
index 2447682..5642114 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
@@ -21,11 +21,7 @@
 import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
-import akka.actor._
 import akka.event.Logging
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
 import com.github.nscala_time.time.Imports.DateTime
 import com.twitter.bijection.Injection
 import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
@@ -34,7 +30,6 @@
 import grizzled.slf4j.Logging
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.predictionio.authentication.KeyAuthentication
-import org.apache.predictionio.configuration.SSLConfiguration
 import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId}
 import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
 import org.apache.predictionio.data.storage.{EngineInstance, Storage}
@@ -42,15 +37,23 @@
 import org.json4s._
 import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
-import spray.can.Http
-import spray.can.server.ServerSettings
-import spray.http.MediaTypes._
-import spray.http._
-import spray.httpx.Json4sSupport
-import spray.routing._
+import akka.actor._
+import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.model.ContentTypes._
+import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.http.scaladsl.server.Directives._
+import akka.stream.ActorMaterializer
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.apache.predictionio.configuration.SSLConfiguration
 
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.language.existentials
 import scala.util.{Failure, Random, Success}
@@ -177,18 +180,21 @@
         "master")
         implicit val timeout = Timeout(5.seconds)
         master ? StartServer()
-        actorSystem.awaitTermination
+
+        val f = actorSystem.whenTerminated
+        Await.ready(f, Duration.Inf)
+
       } getOrElse {
         error(s"Invalid engine instance ID. Aborting server.")
       }
     }
   }
 
-  def createServerActorWithEngine[TD, EIN, PD, Q, P, A](
+  def createPredictionServerWithEngine[TD, EIN, PD, Q, P, A](
     sc: ServerConfig,
     engineInstance: EngineInstance,
     engine: Engine[TD, EIN, PD, Q, P, A],
-    engineLanguage: EngineLanguage.Value): ActorRef = {
+    engineLanguage: EngineLanguage.Value): PredictionServer[Q, P] = {
 
     val engineParams = engine.engineInstanceToEngineParams(
       engineInstance, sc.jsonExtractor)
@@ -228,36 +234,50 @@
     val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
       servingParamsWithName._2)
 
-    actorSystem.actorOf(
-      Props(
-        classOf[ServerActor[Q, P]],
-        sc,
-        engineInstance,
-        engine,
-        engineLanguage,
-        engineParams.dataSourceParams._2,
-        engineParams.preparatorParams._2,
-        algorithms,
-        engineParams.algorithmParamsList.map(_._2),
-        models,
-        serving,
-        engineParams.servingParams._2))
+    new PredictionServer(
+      sc,
+      engineInstance,
+      engine,
+      engineLanguage,
+      engineParams.dataSourceParams._2,
+      engineParams.preparatorParams._2,
+      algorithms,
+      engineParams.algorithmParamsList.map(_._2),
+      models,
+      serving,
+      engineParams.servingParams._2,
+      actorSystem)
   }
 }
 
+
+object EngineServerJson4sSupport {
+  implicit val serialization = org.json4s.jackson.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats
+}
+
 class MasterActor (
     sc: ServerConfig,
     engineInstance: EngineInstance,
-    engineFactoryName: String) extends Actor with SSLConfiguration with KeyAuthentication {
+    engineFactoryName: String) extends Actor with KeyAuthentication with SSLConfiguration {
+
   val log = Logging(context.system, this)
+
   implicit val system = context.system
-  var sprayHttpListener: Option[ActorRef] = None
-  var currentServerActor: Option[ActorRef] = None
+  implicit val materializer = ActorMaterializer()
+
+  var currentServerBinding: Option[Future[ServerBinding]] = None
   var retry = 3
   val serverConfig = ConfigFactory.load("server.conf")
   val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced")
   val protocol = if (sslEnforced) "https://" else "http://"
 
+  val https: Option[HttpsConnectionContext] = if(sslEnforced){
+    val https = ConnectionContext.https(sslContext)
+    Http().setDefaultServerHttpContext(https)
+    Some(https)
+  } else None
+
   def undeploy(ip: String, port: Int): Unit = {
     val serverUrl = s"${protocol}${ip}:${port}"
     log.info(
@@ -287,81 +307,74 @@
 
   def receive: Actor.Receive = {
     case x: StartServer =>
-      val actor = createServerActor(
-        sc,
-        engineInstance,
-        engineFactoryName)
-      currentServerActor = Some(actor)
       undeploy(sc.ip, sc.port)
       self ! BindServer()
     case x: BindServer =>
-      currentServerActor map { actor =>
-        val settings = ServerSettings(system)
-        IO(Http) ! Http.Bind(
-          actor,
-          interface = sc.ip,
-          port = sc.port,
-          settings = Some(settings.copy(sslEncryption = sslEnforced)))
-      } getOrElse {
-        log.error("Cannot bind a non-existing server backend.")
+      currentServerBinding match {
+        case Some(_) =>
+          log.error("Cannot bind a non-existing server backend.")
+        case None =>
+          val server = createServer(sc, engineInstance, engineFactoryName)
+          val route = server.createRoute()
+          val binding = https match {
+            case Some(https) =>
+              Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https)
+            case None =>
+              Http().bindAndHandle(route, sc.ip, sc.port)
+          }
+          currentServerBinding = Some(binding)
+
+          val serverUrl = s"${protocol}${sc.ip}:${sc.port}"
+          log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.")
       }
     case x: StopServer =>
       log.info(s"Stop server command received.")
-      sprayHttpListener.map { l =>
-        log.info("Server is shutting down.")
-        l ! Http.Unbind(5.seconds)
-        system.shutdown()
-      } getOrElse {
-        log.warning("No active server is running.")
+      currentServerBinding match {
+        case Some(f) =>
+          f.flatMap { binding =>
+            binding.unbind()
+          }.foreach { _ =>
+            system.terminate()
+          }
+        case None =>
+          log.warning("No active server is running.")
       }
     case x: ReloadServer =>
       log.info("Reload server command received.")
-      val latestEngineInstance =
-        CreateServer.engineInstances.getLatestCompleted(
-          engineInstance.engineId,
-          engineInstance.engineVersion,
-          engineInstance.engineVariant)
-      latestEngineInstance map { lr =>
-        val actor = createServerActor(sc, lr, engineFactoryName)
-        sprayHttpListener.map { l =>
-          l ! Http.Unbind(5.seconds)
-          val settings = ServerSettings(system)
-          IO(Http) ! Http.Bind(
-            actor,
-            interface = sc.ip,
-            port = sc.port,
-            settings = Some(settings.copy(sslEncryption = sslEnforced)))
-          currentServerActor.get ! Kill
-          currentServerActor = Some(actor)
-        } getOrElse {
-          log.warning("No active server is running. Abort reloading.")
+        currentServerBinding match {
+          case Some(f) =>
+            f.flatMap { binding =>
+              binding.unbind()
+            }
+            val latestEngineInstance =
+              CreateServer.engineInstances.getLatestCompleted(
+                engineInstance.engineId,
+                engineInstance.engineVersion,
+                engineInstance.engineVariant)
+            latestEngineInstance map { lr =>
+              val server = createServer(sc, lr, engineFactoryName)
+              val route = server.createRoute()
+              val binding = https match {
+                case Some(https) =>
+                  Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https)
+                case None =>
+                  Http().bindAndHandle(route, sc.ip, sc.port)
+              }
+              currentServerBinding = Some(binding)
+            } getOrElse {
+              log.warning(
+                s"No latest completed engine instance for ${engineInstance.engineId} " +
+                  s"${engineInstance.engineVersion}. Abort reloading.")
+            }
+          case None =>
+            log.warning("No active server is running. Abort reloading.")
         }
-      } getOrElse {
-        log.warning(
-          s"No latest completed engine instance for ${engineInstance.engineId} " +
-          s"${engineInstance.engineVersion}. Abort reloading.")
-      }
-    case x: Http.Bound =>
-      val serverUrl = s"${protocol}${sc.ip}:${sc.port}"
-      log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.")
-      sprayHttpListener = Some(sender)
-    case x: Http.CommandFailed =>
-      if (retry > 0) {
-        retry -= 1
-        log.error(s"Bind failed. Retrying... ($retry more trial(s))")
-        context.system.scheduler.scheduleOnce(1.seconds) {
-          self ! BindServer()
-        }
-      } else {
-        log.error("Bind failed. Shutting down.")
-        system.shutdown()
-      }
   }
 
-  def createServerActor(
+  def createServer(
       sc: ServerConfig,
       engineInstance: EngineInstance,
-      engineFactoryName: String): ActorRef = {
+      engineFactoryName: String): PredictionServer[_, _] = {
     val (engineLanguage, engineFactory) =
       WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
     val engine = engineFactory()
@@ -373,7 +386,7 @@
 
     val deployableEngine = engine.asInstanceOf[Engine[_,_,_,_,_,_]]
 
-    CreateServer.createServerActorWithEngine(
+    CreateServer.createPredictionServerWithEngine(
       sc,
       engineInstance,
       // engine,
@@ -382,7 +395,7 @@
   }
 }
 
-class ServerActor[Q, P](
+class PredictionServer[Q, P](
     val args: ServerConfig,
     val engineInstance: EngineInstance,
     val engine: Engine[_, _, _, Q, P, _],
@@ -393,23 +406,22 @@
     val algorithmsParams: Seq[Params],
     val models: Seq[Any],
     val serving: BaseServing[Q, P],
-    val servingParams: Params) extends Actor with HttpService with KeyAuthentication {
+    val servingParams: Params,
+    val system: ActorSystem) extends KeyAuthentication {
+
+  val log = Logging(system, getClass)
   val serverStartTime = DateTime.now
-  val log = Logging(context.system, this)
 
   var requestCount: Int = 0
   var avgServingSec: Double = 0.0
   var lastServingSec: Double = 0.0
 
-  /** The following is required by HttpService */
-  def actorRefFactory: ActorContext = context
-
   implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-  val pluginsActorRef =
-    context.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor")
-  val pluginContext = EngineServerPluginContext(log, args.engineVariant)
 
-  def receive: Actor.Receive = runRoute(myRoute)
+  val pluginsActorRef =
+    system.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor")
+
+  val pluginContext = EngineServerPluginContext(log, args.engineVariant)
 
   val feedbackEnabled = if (args.feedback) {
     if (args.accessKey.isEmpty) {
@@ -433,37 +445,44 @@
     }
   }
 
-  val myRoute =
-    path("") {
-      get {
-        respondWithMediaType(`text/html`) {
-          detach() {
-            complete {
-              html.index(
-                args,
-                engineInstance,
-                algorithms.map(_.toString),
-                algorithmsParams.map(_.toString),
-                models.map(_.toString),
-                dataSourceParams.toString,
-                preparatorParams.toString,
-                servingParams.toString,
-                serverStartTime,
-                feedbackEnabled,
-                args.eventServerIp,
-                args.eventServerPort,
-                requestCount,
-                avgServingSec,
-                lastServingSec
-              ).toString
-            }
-          }
-        }
+  def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+      AuthenticationDirective[T] = {
+    extractRequestContext.flatMap { requestContext =>
+      onSuccess(authenticator(requestContext)).flatMap {
+        case Right(x) => provide(x)
+        case Left(x)  => reject(x): Directive1[T]
       }
-    } ~
-    path("queries.json") {
-      post {
-        detach() {
+    }
+  }
+
+  def createRoute(): Route = {
+    val myRoute =
+      path("") {
+        get {
+          complete(HttpResponse(entity = HttpEntity(
+            `text/html(UTF-8)`,
+            html.index(
+              args,
+              engineInstance,
+              algorithms.map(_.toString),
+              algorithmsParams.map(_.toString),
+              models.map(_.toString),
+              dataSourceParams.toString,
+              preparatorParams.toString,
+              servingParams.toString,
+              serverStartTime,
+              feedbackEnabled,
+              args.eventServerIp,
+              args.eventServerPort,
+              requestCount,
+              avgServingSec,
+              lastServingSec
+            ).toString
+          )))
+        }
+      } ~
+      path("queries.json") {
+        post {
           entity(as[String]) { queryString =>
             try {
               val servingStartTime = DateTime.now
@@ -584,9 +603,8 @@
                 (requestCount + 1)
               requestCount += 1
 
-              respondWithMediaType(`application/json`) {
-                complete(compact(render(pluginResult)))
-              }
+              complete(compact(render(pluginResult)))
+
             } catch {
               case e: MappingException =>
                 val msg = s"Query:\n$queryString\n\nStack Trace:\n" +
@@ -613,83 +631,76 @@
             }
           }
         }
-      }
-    } ~
-    path("reload") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.actorSelection("/user/master") ! ReloadServer()
-            "Reloading..."
+      } ~
+      path("reload") {
+        authenticate(withAccessKeyFromFile) { request =>
+          post {
+            system.actorSelection("/user/master") ! ReloadServer()
+            complete("Reloading...")
           }
         }
-      }
-    } ~
-    path("stop") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.system.scheduler.scheduleOnce(1.seconds) {
-              context.actorSelection("/user/master") ! StopServer()
+      } ~
+      path("stop") {
+        authenticate(withAccessKeyFromFile) { request =>
+          post {
+            system.scheduler.scheduleOnce(1.seconds) {
+              system.actorSelection("/user/master") ! StopServer()
             }
-            "Shutting down..."
+            complete("Shutting down...")
           }
         }
-      }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    } ~
-    path("plugins.json") {
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
+      } ~
+      pathPrefix("assets") {
+        getFromResourceDirectory("assets")
+      } ~
+      path("plugins.json") {
+        import EngineServerJson4sSupport._
+        get {
+          complete(
             Map("plugins" -> Map(
               "outputblockers" -> pluginContext.outputBlockers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
+                  "class"       -> p.getClass.getName,
+                  "params"      -> pluginContext.pluginParams(p.pluginName))
               },
               "outputsniffers" -> pluginContext.outputSniffers.map { case (n, p) =>
                 n -> Map(
-                  "name" -> p.pluginName,
+                  "name"        -> p.pluginName,
                   "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
+                  "class"       -> p.getClass.getName,
+                  "params"      -> pluginContext.pluginParams(p.pluginName))
               }
             ))
-          }
+          )
         }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            val pluginArgs = segments.drop(2)
-            val pluginType = segments(0)
-            val pluginName = segments(1)
-            pluginType match {
-              case EngineServerPlugin.outputBlocker =>
-                pluginContext.outputBlockers(pluginName).handleREST(
-                  pluginArgs)
-              case EngineServerPlugin.outputSniffer =>
-                pluginsActorRef ? PluginsActor.HandleREST(
-                  pluginName = pluginName,
-                  pluginArgs = pluginArgs) map {
-                  _.asInstanceOf[String]
-                }
-            }
-          }
-        }
-      }
-    }
-}
+      } ~
+      path("plugins" / Segments) { segments =>
+        import EngineServerJson4sSupport._
+        get {
+          val pluginArgs = segments.drop(2)
+          val pluginType = segments(0)
+          val pluginName = segments(1)
+          pluginType match {
+            case EngineServerPlugin.outputBlocker =>
+              complete(HttpResponse(entity = HttpEntity(
+                  `application/json`,
+                  pluginContext.outputBlockers(pluginName).handleREST(pluginArgs))))
 
-object EngineServerJson4sSupport extends Json4sSupport {
-  implicit def json4sFormats: Formats = DefaultFormats
+            case EngineServerPlugin.outputSniffer =>
+              complete(pluginsActorRef ? PluginsActor.HandleREST(
+                pluginName = pluginName,
+                pluginArgs = pluginArgs) map { json =>
+                HttpResponse(entity = HttpEntity(
+                  `application/json`,
+                  json.asInstanceOf[String]
+                ))
+              })
+          }
+        }
+      }
+
+    myRoute
+  }
 }
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
index cfc83eb..011cd95 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
@@ -55,9 +55,10 @@
       EngineServerPlugin.outputSniffer -> mutable.Map())
     val pluginParams = mutable.Map[String, JValue]()
     val serviceLoader = ServiceLoader.load(classOf[EngineServerPlugin])
-    val variantJson = parse(stringFromFile(engineVariant))
-    (variantJson \ "plugins").extractOpt[JObject].foreach { pluginDefs =>
-      pluginDefs.obj.foreach { pluginParams += _ }
+    stringFromFile(engineVariant).foreach { variantJson =>
+      (parse(variantJson) \ "plugins").extractOpt[JObject].foreach { pluginDefs =>
+        pluginDefs.obj.foreach { pluginParams += _ }
+      }
     }
     serviceLoader foreach { service =>
       pluginParams.get(service.pluginName) map { params =>
@@ -77,11 +78,15 @@
       log)
   }
 
-  private def stringFromFile(filePath: String): String = {
+  private def stringFromFile(filePath: String): Option[String] = {
     try {
-      val uri = new URI(filePath)
-      val fs = FileSystem.get(uri, new Configuration())
-      new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar))
+      val fs = FileSystem.get(new Configuration())
+      val path = new Path(new URI(filePath))
+      if (fs.exists(path)) {
+        Some(new String(ByteStreams.toByteArray(fs.open(path)).map(_.toChar)))
+      } else {
+        None
+      }
     } catch {
       case e: java.io.IOException =>
         error(s"Error reading from file: ${e.getMessage}. Aborting.")
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala b/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
index cb71f14..3aafe67 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
@@ -32,7 +32,6 @@
 import org.json4s.native.JsonMethods.pretty
 import org.json4s.native.JsonMethods.parse
 import org.json4s.native.JsonMethods.render
-import org.json4s.reflect.TypeInfo
 
 object JsonExtractor {
 
@@ -144,7 +143,13 @@
     formats: Formats,
     clazz: Class[T]): T = {
 
-    Extraction.extract(parse(json), TypeInfo(clazz, None))(formats).asInstanceOf[T]
+    implicit val f = formats
+    implicit val m = if (clazz == classOf[Map[_, _]]) {
+      Manifest.classType(clazz, manifest[String], manifest[Any])
+    } else {
+      Manifest.classType(clazz)
+    }
+    Extraction.extract(parse(json))
   }
 
   private def extractWithGson[T](
diff --git a/data/build.sbt b/data/build.sbt
index 87b0d96..6592536 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -20,14 +20,12 @@
 name := "apache-predictionio-data"
 
 libraryDependencies ++= Seq(
+  "org.scala-lang"          % "scala-reflect"  % scalaVersion.value,
   "com.github.nscala-time" %% "nscala-time"    % "2.6.0",
   "com.google.guava"        % "guava"          % "14.0.1",
-  "io.spray"               %% "spray-can"      % "1.3.3",
-  "io.spray"               %% "spray-routing"  % "1.3.3",
-  "io.spray"               %% "spray-testkit"  % "1.3.3" % "test",
+  "com.typesafe.akka"      %% "akka-http-testkit" % "10.1.5" % "test",
   "org.apache.spark"       %% "spark-sql"      % sparkVersion.value % "provided",
   "org.clapper"            %% "grizzled-slf4j" % "1.0.2",
-  "org.json4s"             %% "json4s-native"  % json4sVersion.value,
   "org.scalatest"          %% "scalatest"      % "2.1.7" % "test",
   "org.specs2"             %% "specs2"         % "3.3.1" % "test"
     exclude("org.scalaz.stream", s"scalaz-stream_${scalaBinaryVersion.value}"),
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
index 60efea2..02355ce 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
@@ -15,68 +15,58 @@
  * limitations under the License.
  */
 
-
 package org.apache.predictionio.data.api
 
-import org.apache.predictionio.data.webhooks.ConnectorException
+import akka.http.scaladsl.server._
 import org.apache.predictionio.data.storage.StorageException
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.routing.Rejection
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.json4s.{DefaultFormats, Formats}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
 
 object Common {
 
-  object Json4sProtocol extends Json4sSupport {
+  object Json4sProtocol {
+    implicit val serialization = org.json4s.native.Serialization
     implicit def json4sFormats: Formats = DefaultFormats
   }
 
   import Json4sProtocol._
 
-  val rejectionHandler = RejectionHandler {
-    case MalformedRequestContentRejection(msg, _) :: _ =>
+  val exceptionHandler = ExceptionHandler {
+    case e: ConnectorException => {
+      complete(StatusCodes.BadRequest, Map("message" -> s"${e.getMessage()}"))
+    }
+    case e: StorageException => {
+      complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}"))
+    }
+    case e: Exception => {
+      complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}"))
+    }
+  }
+
+  val rejectionHandler = RejectionHandler.newBuilder().handle {
+    case MalformedRequestContentRejection(msg, _) =>
       complete(StatusCodes.BadRequest, Map("message" -> msg))
-    case MissingQueryParamRejection(msg) :: _ =>
+
+    case MissingQueryParamRejection(msg) =>
       complete(StatusCodes.NotFound,
         Map("message" -> s"missing required query parameter ${msg}."))
-    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
+
+    case AuthenticationFailedRejection(cause, challengeHeaders) => {
       val msg = cause match {
         case AuthenticationFailedRejection.CredentialsRejected =>
           "Invalid accessKey."
         case AuthenticationFailedRejection.CredentialsMissing =>
           "Missing accessKey."
       }
-      complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
-    }
-    case ChannelRejection(msg) :: _ =>
       complete(StatusCodes.Unauthorized, Map("message" -> msg))
-    case NonExistentAppRejection(msg) :: _ =>
+    }
+    case ChannelRejection(msg) =>
       complete(StatusCodes.Unauthorized, Map("message" -> msg))
-  }
-
-  val exceptionHandler = ExceptionHandler {
-    case e: ConnectorException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.BadRequest, Map("message" -> msg))
-    }
-    case e: StorageException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-    case e: Exception => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-  }
+  }.result()
 }
 
 /** invalid channel */
 case class ChannelRejection(msg: String) extends Rejection
-
-/** the app doesn't exist */
-case class NonExistentAppRejection(msg: String) extends Rejection
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index 41dfefb..96ff4d0 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -18,609 +18,38 @@
 
 package org.apache.predictionio.data.api
 
-import akka.event.Logging
+import akka.event.{Logging, LoggingAdapter}
 import sun.misc.BASE64Decoder
-
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
-import akka.io.IO
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.{FormData, HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.model.ContentTypes._
+import akka.http.scaladsl.model.headers.HttpChallenge
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
 import akka.pattern.ask
 import akka.util.Timeout
-import org.apache.predictionio.data.Utils
-import org.apache.predictionio.data.storage.AccessKeys
-import org.apache.predictionio.data.storage.Channels
-import org.apache.predictionio.data.storage.DateTimeJson4sSupport
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventJson4sSupport
-import org.apache.predictionio.data.storage.BatchEventsJson4sSupport
-import org.apache.predictionio.data.storage.LEvents
-import org.apache.predictionio.data.storage.Storage
-import org.json4s.DefaultFormats
-import org.json4s.Formats
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-import spray.can.Http
-import spray.http.FormData
-import spray.http.MediaTypes
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-import spray.routing._
-import spray.routing.authentication.Authentication
+import akka.http.scaladsl.server.Directives._
+import akka.stream.ActorMaterializer
+import org.apache.predictionio.data.storage._
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.json4s.{DefaultFormats, Formats, JObject}
 
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Try, Success, Failure}
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
-class  EventServiceActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends HttpServiceActor {
-
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats +
-      new EventJson4sSupport.APISerializer +
-      new BatchEventsJson4sSupport.APISerializer +
-      // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
-      // some format not converted, or timezone not correct
-      new DateTimeJson4sSupport.Serializer
-  }
-
-
-  val MaxNumberOfEventsPerBatchRequest = 50
-
-  val logger = Logging(context.system, this)
-
-  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
-  // Futures
-  implicit def executionContext: ExecutionContext = context.dispatcher
-
-  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-  val rejectionHandler = Common.rejectionHandler
-
-  val jsonPath = """(.+)\.json$""".r
-  val formPath = """(.+)\.form$""".r
-
-  val pluginContext = EventServerPluginContext(logger)
-
-  private lazy val base64Decoder = new BASE64Decoder
-
-  case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
-
-  /* with accessKey in query/header, return appId if succeed */
-  def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
-    ctx: RequestContext =>
-      val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
-      val channelParamOpt = ctx.request.uri.query.get("channel")
-      Future {
-        // with accessKey in query, return appId if succeed
-        accessKeyParamOpt.map { accessKeyParam =>
-          accessKeysClient.get(accessKeyParam).map { k =>
-            channelParamOpt.map { ch =>
-              val channelMap =
-                channelsClient.getByAppid(k.appid)
-                .map(c => (c.name, c.id)).toMap
-              if (channelMap.contains(ch)) {
-                Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
-              } else {
-                Left(ChannelRejection(s"Invalid channel '$ch'."))
-              }
-            }.getOrElse{
-              Right(AuthData(k.appid, None, k.events))
-            }
-          }.getOrElse(FailedAuth)
-        }.getOrElse {
-          // with accessKey in header, return appId if succeed
-          ctx.request.headers.find(_.name == "Authorization").map { authHeader =>
-            authHeader.value.split("Basic ") match {
-              case Array(_, value) =>
-                val appAccessKey =
-                  new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
-                accessKeysClient.get(appAccessKey) match {
-                  case Some(k) => Right(AuthData(k.appid, None, k.events))
-                  case None => FailedAuth
-                }
-
-              case _ => FailedAuth
-            }
-          }.getOrElse(MissedAuth)
-        }
-      }
-  }
-
-  private val FailedAuth = Left(
-    AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsRejected, Nil
-    )
-  )
-
-  private val MissedAuth = Left(
-    AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsMissing, Nil
-    )
-  )
-
-  lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
-  lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
-
-  val route: Route =
-    pathSingleSlash {
-      import Json4sProtocol._
-
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete(Map("status" -> "alive"))
-        }
-      }
-    } ~
-    path("plugins.json") {
-      import Json4sProtocol._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            Map("plugins" -> Map(
-              "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
-              },
-              "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
-              }
-            ))
-          }
-        }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          authenticate(withAccessKey) { authData =>
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete {
-                val pluginArgs = segments.drop(2)
-                val pluginType = segments(0)
-                val pluginName = segments(1)
-                pluginType match {
-                  case EventServerPlugin.inputBlocker =>
-                    pluginContext.inputBlockers(pluginName).handleREST(
-                      authData.appId,
-                      authData.channelId,
-                      pluginArgs)
-                  case EventServerPlugin.inputSniffer =>
-                    pluginsActorRef ? PluginsActor.HandleREST(
-                      appId = authData.appId,
-                      channelId = authData.channelId,
-                      pluginName = pluginName,
-                      pluginArgs = pluginArgs) map {
-                      _.asInstanceOf[String]
-                    }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("events" / jsonPath ) { eventId =>
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"GET event ${eventId}.")
-                  val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
-                    eventOpt.map( event =>
-                      (StatusCodes.OK, event)
-                    ).getOrElse(
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    )
-                  }
-                  data
-                }
-              }
-            }
-          }
-        }
-      } ~
-      delete {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"DELETE event ${eventId}.")
-                  val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
-                    if (found) {
-                      (StatusCodes.OK, Map("message" -> "Found"))
-                    } else {
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    }
-                  }
-                  data
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              val events = authData.events
-              entity(as[Event]) { event =>
-                complete {
-                  if (events.isEmpty || authData.events.contains(event.event)) {
-                    pluginContext.inputBlockers.values.foreach(
-                      _.process(EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event), pluginContext))
-                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-                      pluginsActorRef ! EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event)
-                      val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-                      if (config.stats) {
-                        statsActorRef ! Bookkeeping(appId, result._1, event)
-                      }
-                      result
-                    }
-                    data
-                  } else {
-                    (StatusCodes.Forbidden,
-                      Map("message" -> s"${event.event} events are not allowed"))
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              parameters(
-                'startTime.as[Option[String]],
-                'untilTime.as[Option[String]],
-                'entityType.as[Option[String]],
-                'entityId.as[Option[String]],
-                'event.as[Option[String]],
-                'targetEntityType.as[Option[String]],
-                'targetEntityId.as[Option[String]],
-                'limit.as[Option[Int]],
-                'reversed.as[Option[Boolean]]) {
-                (startTimeStr, untilTimeStr, entityType, entityId,
-                  eventName,  // only support one event name
-                  targetEntityType, targetEntityId,
-                  limit, reversed) =>
-                respondWithMediaType(MediaTypes.`application/json`) {
-                  complete {
-                    logger.debug(
-                      s"GET events of appId=${appId} " +
-                      s"st=${startTimeStr} ut=${untilTimeStr} " +
-                      s"et=${entityType} eid=${entityId} " +
-                      s"li=${limit} rev=${reversed} ")
-
-                    require(!((reversed == Some(true))
-                      && (entityType.isEmpty || entityId.isEmpty)),
-                      "the parameter reversed can only be used with" +
-                      " both entityType and entityId specified.")
-
-                    val parseTime = Future {
-                      val startTime = startTimeStr.map(Utils.stringToDateTime(_))
-                      val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
-                      (startTime, untilTime)
-                    }
-
-
-                    parseTime.flatMap { case (startTime, untilTime) =>
-                      val data = eventClient.futureFind(
-                        appId = appId,
-                        channelId = channelId,
-                        startTime = startTime,
-                        untilTime = untilTime,
-                        entityType = entityType,
-                        entityId = entityId,
-                        eventNames = eventName.map(List(_)),
-                        targetEntityType = targetEntityType.map(Some(_)),
-                        targetEntityId = targetEntityId.map(Some(_)),
-                        limit = limit.orElse(Some(20)),
-                        reversed = reversed)
-                        .map { eventIter =>
-                          if (eventIter.hasNext) {
-                            (StatusCodes.OK, eventIter.toArray)
-                          } else {
-                            (StatusCodes.NotFound,
-                              Map("message" -> "Not Found"))
-                          }
-                        }
-                      data
-                    }.recover {
-                      case e: Exception =>
-                        (StatusCodes.BadRequest, Map("message" -> s"${e}"))
-                    }
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("batch" / "events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              val allowedEvents = authData.events
-
-              entity(as[Seq[Try[Event]]]) { events =>
-                complete {
-                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
-                    val eventWithIndex = events.zipWithIndex
-
-                    val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
-                      if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
-                        (Right(event), i)
-                      } else {
-                        (Left(event), i)
-                      }
-                    }
-
-                    val insertEvents = taggedEvents.collect { case (Right(event), i) =>
-                      (event, i)
-                    }
-
-                    insertEvents.foreach { case (event, i) =>
-                      pluginContext.inputBlockers.values.foreach(
-                        _.process(EventInfo(
-                          appId = appId,
-                          channelId = channelId,
-                          event = event), pluginContext))
-                    }
-
-                    val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
-                      insertEvents.map(_._1), appId, channelId).map { insertResults =>
-                      val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
-                        pluginsActorRef ! EventInfo(
-                          appId = appId,
-                          channelId = channelId,
-                          event = event)
-                        val status = StatusCodes.Created
-                        if (config.stats) {
-                          statsActorRef ! Bookkeeping(appId, status, event)
-                        }
-                        (Map(
-                          "status" -> status.intValue,
-                          "eventId" -> s"${id}"), i)
-                      } ++
-                        // Results of denied events
-                        taggedEvents.collect { case (Left(event), i) =>
-                          (Map(
-                            "status" -> StatusCodes.Forbidden.intValue,
-                            "message" -> s"${event.event} events are not allowed"), i)
-                        } ++
-                        // Results of failed to deserialze events
-                        eventWithIndex.collect { case (Failure(exception), i) =>
-                          (Map(
-                            "status" -> StatusCodes.BadRequest.intValue,
-                            "message" -> s"${exception.getMessage()}"), i)
-                        }
-
-                      // Restore original order
-                      results.sortBy { case (_, i) => i }.map { case (data, _) => data }
-                    }
-
-                    f.recover { case exception =>
-                      Map(
-                        "status" -> StatusCodes.InternalServerError.intValue,
-                        "message" -> s"${exception.getMessage()}")
-                    }
-
-                  } else {
-                    (StatusCodes.BadRequest,
-                      Map("message" -> (s"Batch request must have less than or equal to " +
-                        s"${MaxNumberOfEventsPerBatchRequest} events")))
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("stats.json") {
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                if (config.stats) {
-                  complete {
-                    statsActorRef ? GetStats(appId) map {
-                      _.asInstanceOf[Map[String, StatsSnapshot]]
-                    }
-                  }
-                } else {
-                  complete(
-                    StatusCodes.NotFound,
-                    parse("""{"message": "To see stats, launch Event Server """ +
-                      """with --stats argument."}"""))
-                }
-              }
-            }
-          }
-        }
-      }  // stats.json get
-    } ~
-    path("webhooks" / jsonPath ) { web =>
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[JObject]) { jObj =>
-                  complete {
-                    Webhooks.postJson(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = jObj,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  Webhooks.getJson(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("webhooks" / formPath ) { web =>
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[FormData]){ formData =>
-                  // logger.debug(formData.toString)
-                  complete {
-                    // respond with JSON
-                    import Json4sProtocol._
-
-                    Webhooks.postForm(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = formData,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  // respond with JSON
-                  import Json4sProtocol._
-
-                  Webhooks.getForm(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
-              }
-            }
-          }
-        }
-      }
-
-    }
-
-  def receive: Actor.Receive = runRoute(route)
-}
-
-
-
-/* message */
-case class StartServer(host: String, port: Int)
-
-class EventServerActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends Actor with ActorLogging {
-  val child = context.actorOf(
-    Props(classOf[EventServiceActor],
-      eventClient,
-      accessKeysClient,
-      channelsClient,
-      config),
-    "EventServiceActor")
-  implicit val system = context.system
-
-  def receive: Actor.Receive = {
-    case StartServer(host, portNum) => {
-      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
-    }
-    case m: Http.Bound => log.info("Bound received. EventServer is ready.")
-    case m: Http.CommandFailed => log.error("Command failed.")
-    case _ => log.error("Unknown message.")
-  }
+object Json4sProtocol {
+  implicit val serialization = org.json4s.native.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats +
+    new EventJson4sSupport.APISerializer +
+    new BatchEventsJson4sSupport.APISerializer +
+    // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
+    // some format not converted, or timezone not correct
+    new DateTimeJson4sSupport.Serializer
 }
 
 case class EventServerConfig(
@@ -630,34 +59,502 @@
   stats: Boolean = false)
 
 object EventServer {
+  import Json4sProtocol._
+  import FutureDirectives._
+  import Common._
+
+  private val MaxNumberOfEventsPerBatchRequest = 50
+  private lazy val base64Decoder = new BASE64Decoder
+  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+  private case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
+
+  private def FailedAuth[T]: Either[Rejection, T] = Left(
+    AuthenticationFailedRejection(
+      AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("eventserver", None)
+    )
+  )
+
+  private def MissedAuth[T]: Either[Rejection, T] = Left(
+    AuthenticationFailedRejection(
+      AuthenticationFailedRejection.CredentialsMissing, HttpChallenge("eventserver", None)
+    )
+  )
+
+  def createRoute(eventClient: LEvents,
+                  accessKeysClient: AccessKeys,
+                  channelsClient: Channels,
+                  logger: LoggingAdapter,
+                  statsActorRef: ActorSelection,
+                  pluginsActorRef: ActorSelection,
+                  config: EventServerConfig)(implicit executionContext: ExecutionContext): Route = {
+
+    /* with accessKey in query/header, return appId if succeed */
+    def withAccessKey: RequestContext => Future[Either[Rejection, AuthData]] = {
+      ctx: RequestContext =>
+        val accessKeyParamOpt = ctx.request.uri.query().get("accessKey")
+        val channelParamOpt = ctx.request.uri.query().get("channel")
+        Future {
+          // with accessKey in query, return appId if succeed
+          accessKeyParamOpt.map { accessKeyParam =>
+            accessKeysClient.get(accessKeyParam).map { k =>
+              channelParamOpt.map { ch =>
+                val channelMap =
+                  channelsClient.getByAppid(k.appid)
+                    .map(c => (c.name, c.id)).toMap
+                if (channelMap.contains(ch)) {
+                  Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
+                } else {
+                  Left(ChannelRejection(s"Invalid channel '$ch'."))
+                }
+              }.getOrElse{
+                Right(AuthData(k.appid, None, k.events))
+              }
+            }.getOrElse(FailedAuth)
+          }.getOrElse {
+            // with accessKey in header, return appId if succeed
+            ctx.request.headers.find(_.name == "Authorization").map { authHeader =>
+              authHeader.value.split("Basic ") match {
+                case Array(_, value) =>
+                  val appAccessKey =
+                    new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
+                  accessKeysClient.get(appAccessKey) match {
+                    case Some(k) => Right(AuthData(k.appid, None, k.events))
+                    case None => FailedAuth
+                  }
+
+                case _ => FailedAuth
+              }
+            }.getOrElse(MissedAuth)
+          }
+        }
+    }
+
+    def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+        AuthenticationDirective[T] = {
+      handleRejections(rejectionHandler).tflatMap { _ =>
+        extractRequestContext.flatMap { requestContext =>
+          onSuccess(authenticator(requestContext)).flatMap {
+            case Right(x) => provide(x)
+            case Left(x)  => reject(x): Directive1[T]
+          }
+        }
+      }
+    }
+
+    val pluginContext = EventServerPluginContext(logger)
+    val jsonPath = """(.+)\.json$""".r
+    val formPath = """(.+)\.form$""".r
+
+    val route: Route =
+      pathSingleSlash {
+        get {
+          complete(Map("status" -> "alive"))
+        }
+      } ~
+      path("plugins.json") {
+        get {
+          complete(
+            Map("plugins" -> Map(
+              "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
+                n -> Map(
+                  "name"        -> p.pluginName,
+                  "description" -> p.pluginDescription,
+                  "class"       -> p.getClass.getName)
+              },
+              "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
+                n -> Map(
+                  "name"        -> p.pluginName,
+                  "description" -> p.pluginDescription,
+                  "class"       -> p.getClass.getName)
+              }
+            ))
+          )
+        }
+      } ~
+      path("plugins" / Segments) { segments =>
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val pluginArgs = segments.drop(2)
+              val pluginType = segments(0)
+              val pluginName = segments(1)
+              pluginType match {
+                case EventServerPlugin.inputBlocker =>
+                  complete(HttpResponse(entity = HttpEntity(
+                    `application/json`,
+                    pluginContext.inputBlockers(pluginName).handleREST(
+                      authData.appId,
+                      authData.channelId,
+                      pluginArgs)
+                  )))
+
+                case EventServerPlugin.inputSniffer =>
+                  complete(pluginsActorRef ? PluginsActor.HandleREST(
+                    appId = authData.appId,
+                    channelId = authData.channelId,
+                    pluginName = pluginName,
+                    pluginArgs = pluginArgs) map { json =>
+                      HttpResponse(entity = HttpEntity(
+                        `application/json`,
+                        json.asInstanceOf[String]
+                      ))
+                    })
+              }
+            }
+          }
+        }
+      } ~
+      path("events" / jsonPath ) { eventId =>
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              logger.debug(s"GET event ${eventId}.")
+              onSuccess(eventClient.futureGet(eventId, appId, channelId)){ eventOpt =>
+                  eventOpt.map { event =>
+                    complete(StatusCodes.OK, event)
+                  }.getOrElse(
+                    complete(StatusCodes.NotFound, Map("message" -> "Not Found"))
+                  )
+              }
+            }
+          }
+        } ~
+        delete {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              logger.debug(s"DELETE event ${eventId}.")
+              onSuccess(eventClient.futureDelete(eventId, appId, channelId)){ found =>
+                if (found) {
+                  complete(StatusCodes.OK, Map("message" -> "Found"))
+                } else {
+                  complete(StatusCodes.NotFound, Map("message" -> "Not Found"))
+                }
+              }
+            }
+          }
+        }
+      } ~
+      path("events.json") {
+        post {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              val events = authData.events
+              entity(as[Event]) { event =>
+                if (events.isEmpty || authData.events.contains(event.event)) {
+                  pluginContext.inputBlockers.values.foreach(
+                    _.process(EventInfo(
+                      appId = appId,
+                      channelId = channelId,
+                      event = event), pluginContext))
+                  onSuccess(eventClient.futureInsert(event, appId, channelId)){ id =>
+                    pluginsActorRef ! EventInfo(
+                      appId = appId,
+                      channelId = channelId,
+                      event = event)
+                    val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
+                    if (config.stats) {
+                      statsActorRef ! Bookkeeping(appId, result._1, event)
+                    }
+                    complete(result)
+                  }
+                } else {
+                  complete(StatusCodes.Forbidden,
+                    Map("message" -> s"${event.event} events are not allowed"))
+                }
+              }
+            }
+          }
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              parameters(
+                'startTime.?,
+                'untilTime.?,
+                'entityType.?,
+                'entityId.?,
+                'event.?,
+                'targetEntityType.?,
+                'targetEntityId.?,
+                'limit.as[Int].?,
+                'reversed.as[Boolean].?) {
+                (startTimeStr, untilTimeStr, entityType, entityId,
+                eventName,  // only support one event name
+                targetEntityType, targetEntityId,
+                limit, reversed) =>
+                  logger.debug(
+                    s"GET events of appId=${appId} " +
+                    s"st=${startTimeStr} ut=${untilTimeStr} " +
+                    s"et=${entityType} eid=${entityId} " +
+                    s"li=${limit} rev=${reversed} ")
+
+                  require(!((reversed == Some(true))
+                    && (entityType.isEmpty || entityId.isEmpty)),
+                    "the parameter reversed can only be used with" +
+                      " both entityType and entityId specified.")
+
+                  val parseTime = Future {
+                    val startTime = startTimeStr.map(Utils.stringToDateTime(_))
+                    val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
+                    (startTime, untilTime)
+                  }
+
+
+                  val f = parseTime.flatMap { case (startTime, untilTime) =>
+                    val data = eventClient.futureFind(
+                      appId = appId,
+                      channelId = channelId,
+                      startTime = startTime,
+                      untilTime = untilTime,
+                      entityType = entityType,
+                      entityId = entityId,
+                      eventNames = eventName.map(List(_)),
+                      targetEntityType = targetEntityType.map(Some(_)),
+                      targetEntityId = targetEntityId.map(Some(_)),
+                      limit = limit.orElse(Some(20)),
+                      reversed = reversed)
+                      .map { eventIter =>
+                        if (eventIter.hasNext) {
+                          (StatusCodes.OK, eventIter.toArray)
+                        } else {
+                          (StatusCodes.NotFound, Map("message" -> "Not Found"))
+                        }
+                      }
+                    data
+                  }
+
+                  onSuccess(f){ (status, body) => complete(status, body) }
+                }
+            }
+          }
+        }
+      } ~
+      path("batch" / "events.json") {
+        post {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              val allowedEvents = authData.events
+
+              entity(as[Seq[Try[Event]]]) { events =>
+                if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+                  val eventWithIndex = events.zipWithIndex
+
+                  val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
+                    if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
+                      (Right(event), i)
+                    } else {
+                      (Left(event), i)
+                    }
+                  }
+
+                  val insertEvents = taggedEvents.collect { case (Right(event), i) =>
+                    (event, i)
+                  }
+
+                  insertEvents.foreach { case (event, i) =>
+                    pluginContext.inputBlockers.values.foreach(
+                      _.process(EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event), pluginContext))
+                  }
+
+                  val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
+                    insertEvents.map(_._1), appId, channelId).map { insertResults =>
+                    val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
+                      pluginsActorRef ! EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event)
+                      val status = StatusCodes.Created
+                      if (config.stats) {
+                        statsActorRef ! Bookkeeping(appId, status, event)
+                      }
+                      (Map(
+                        "status"  -> status.intValue,
+                        "eventId" -> s"${id}"), i)
+                    } ++
+                      // Results of denied events
+                      taggedEvents.collect { case (Left(event), i) =>
+                        (Map(
+                          "status"  -> StatusCodes.Forbidden.intValue,
+                          "message" -> s"${event.event} events are not allowed"), i)
+                      } ++
+                      // Results of failed to deserialze events
+                      eventWithIndex.collect { case (Failure(exception), i) =>
+                        (Map(
+                          "status"  -> StatusCodes.BadRequest.intValue,
+                          "message" -> s"${exception.getMessage()}"), i)
+                      }
+
+                    // Restore original order
+                    results.sortBy { case (_, i) => i }.map { case (data, _) => data }
+                  }
+
+                  onSuccess(f.recover { case exception =>
+                    Map(
+                      "status" -> StatusCodes.InternalServerError.intValue,
+                      "message" -> s"${exception.getMessage()}"
+                    )
+                  }){ res => complete(res) }
+
+                } else {
+                  complete(StatusCodes.BadRequest,
+                    Map("message" -> (s"Batch request must have less than or equal to " +
+                      s"${MaxNumberOfEventsPerBatchRequest} events")))
+                }
+              }
+            }
+          }
+        }
+      } ~
+      path("stats.json") {
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              if (config.stats) {
+                complete {
+                  statsActorRef ? GetStats(appId) map {
+                    _.asInstanceOf[Map[String, StatsSnapshot]]
+                  }
+                }
+              } else {
+                complete(
+                  StatusCodes.NotFound,
+                  Map("message" -> "To see stats, launch Event Server with --stats argument.")
+                )
+              }
+            }
+          }
+        }  // stats.json get
+      } ~
+      path("webhooks" / jsonPath ) { web =>
+        post {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              entity(as[JObject]) { jObj =>
+                onSuccess(Webhooks.postJson(
+                  appId = appId,
+                  channelId = channelId,
+                  web = web,
+                  data = jObj,
+                  eventClient = eventClient,
+                  log = logger,
+                  stats = config.stats,
+                  statsActorRef = statsActorRef
+                )){
+                  (status, body) => complete(status, body)
+                }
+              }
+            }
+          }
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              onSuccess(
+                Webhooks.getJson(
+                appId = appId,
+                channelId = channelId,
+                web = web,
+                log = logger)
+              ){
+                (status, body) => complete(status, body)
+              }
+            }
+          }
+        }
+      } ~
+      path("webhooks" / formPath ) { web =>
+        post {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              entity(as[FormData]){ formData =>
+                logger.debug(formData.toString)
+                onSuccess(Webhooks.postForm(
+                  appId = appId,
+                  channelId = channelId,
+                  web = web,
+                  data = formData,
+                  eventClient = eventClient,
+                  log = logger,
+                  stats = config.stats,
+                  statsActorRef = statsActorRef
+                )){
+                  (status, body) => complete(status, body)
+                }
+              }
+            }
+          }
+        } ~
+        get {
+          handleExceptions(exceptionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              onSuccess(Webhooks.getForm(
+                appId = appId,
+                channelId = channelId,
+                web = web,
+                log = logger
+              )){
+                (status, body) => complete(status, body)
+              }
+            }
+          }
+        }
+      }
+
+    route
+  }
+
   def createEventServer(config: EventServerConfig): ActorSystem = {
     implicit val system = ActorSystem("EventServerSystem")
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
 
     val eventClient = Storage.getLEvents()
     val accessKeysClient = Storage.getMetaDataAccessKeys()
     val channelsClient = Storage.getMetaDataChannels()
 
-    val serverActor = system.actorOf(
-      Props(
-        classOf[EventServerActor],
-        eventClient,
-        accessKeysClient,
-        channelsClient,
-        config),
-      "EventServerActor"
-    )
-    if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
-    system.actorOf(Props[PluginsActor], "PluginsActor")
-    serverActor ! StartServer(config.ip, config.port)
+    val statsActorRef = system.actorSelection("/user/StatsActor")
+    val pluginsActorRef = system.actorSelection("/user/PluginsActor")
+
+    val logger = Logging(system, getClass)
+
+    val route = createRoute(eventClient, accessKeysClient, channelsClient,
+      logger, statsActorRef, pluginsActorRef, config)
+
+    Http().bindAndHandle(route, config.ip, config.port)
+
     system
   }
 }
 
 object Run {
   def main(args: Array[String]): Unit = {
-    EventServer.createEventServer(EventServerConfig(
+    val f = EventServer.createEventServer(EventServerConfig(
       ip = "0.0.0.0",
       port = 7070))
-    .awaitTermination
+    .whenTerminated
+
+    Await.ready(f, Duration.Inf)
   }
 }
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
index 9bbbc2e..d544b1b 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
@@ -18,13 +18,11 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.StatusCode
 import org.apache.predictionio.data.storage.Event
 
-import spray.http.StatusCode
-
-import scala.collection.mutable.{ HashMap => MHashMap }
+import scala.collection.mutable.{HashMap => MHashMap}
 import scala.collection.mutable
-
 import com.github.nscala_time.time.Imports.DateTime
 
 case class EntityTypesEvent(
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
index aa9438b..627d046 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala
@@ -18,10 +18,9 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.StatusCode
 import org.apache.predictionio.data.storage.Event
 
-import spray.http.StatusCode
-
 import akka.actor.Actor
 import akka.event.Logging
 
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
index 57be037..e9a9c53 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
@@ -18,13 +18,10 @@
 
 package org.apache.predictionio.data.api
 
+import akka.http.scaladsl.model.{FormData, StatusCode, StatusCodes}
 import org.apache.predictionio.data.webhooks.ConnectorUtil
 import org.apache.predictionio.data.storage.LEvents
 
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.http.FormData
-
 import org.json4s.JObject
 
 import akka.event.LoggingAdapter
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
index 3a82e98..a73ee80 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -20,15 +20,30 @@
 
 import org.apache.predictionio.data.storage.Storage
 import org.apache.predictionio.data.storage.Event
-
 import org.joda.time.DateTime
 
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
 /** This object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase the
+  * number of threads by declaring following VM options before calling "pio deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala documentation:
+  * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LEventStore {
 
@@ -72,9 +87,62 @@
     latest: Boolean = true,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findByEntityAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit,
+      latest = latest),
+      timeout)
+  }
+
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first (default true)
+    * @return Future[Iterator[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    latest: Boolean = true)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -85,8 +153,7 @@
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
       limit = limit,
-      reversed = Some(latest)),
-      timeout)
+      reversed = Some(latest))
   }
 
   /** Reads events generically. If entityType or entityId is not specified, it
@@ -127,9 +194,62 @@
     limit: Option[Int] = None,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit), timeout)
+  }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return Future[Iterator[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -139,7 +259,7 @@
       eventNames = eventNames,
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
-      limit = limit), timeout)
+      limit = limit)
   }
 
 }
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
index f4fd676..6f39feb 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -18,15 +18,35 @@
 
 package org.apache.predictionio.data.store.java
 
+import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutorService}
+
 import org.apache.predictionio.data.storage.Event
 import org.apache.predictionio.data.store.LEventStore
 import org.joda.time.DateTime
 
 import scala.collection.JavaConversions
 import scala.concurrent.duration.Duration
+import scala.compat.java8.FutureConverters._
 
 /** This Java-friendly object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase the
+  * number of threads by declaring following VM options before calling "pio deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala documentation:
+  * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LJavaEventStore {
 
@@ -86,6 +106,61 @@
       ).toSeq)
   }
 
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    latest: Boolean,
+    executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findByEntityAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt,
+      latest
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+  }
+
   /** Reads events generically. If entityType or entityId is not specified, it
     * results in table scan.
     *
@@ -142,4 +217,61 @@
         timeout
       ).toSeq)
   }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String],
+    entityId: Option[String],
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+  }
+
 }
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
similarity index 100%
rename from data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
rename to data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
index 1c47e10..ca92e8f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
@@ -20,14 +20,10 @@
 
 import org.apache.predictionio.annotation.Experimental
 import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.SparkVersionDependent
-
 import grizzled.slf4j.Logger
 import org.apache.predictionio.data.store.PEventStore
-
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.SparkContext
 import org.joda.time.DateTime
 
@@ -52,7 +48,6 @@
     * @param name identify the DataFrame created
     * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
     *                and update whenever the function is changed.
-    * @param sqlContext SQL context
     * @tparam E the output type of the conversion function. The type needs to extend Product
     *           (e.g. case class)
     * @return a DataFrame of events
@@ -69,7 +64,7 @@
 
     @transient lazy val logger = Logger[this.type]
 
-    val sqlSession = SparkVersionDependent.sqlSession(sc)
+    val sqlSession = SparkSession.builder().getOrCreate()
 
     val beginTime = startTime match {
       case Some(t) => t
diff --git a/data/src/main/spark-1/org/apache/predictionio/data/SparkVersionDependent.scala b/data/src/main/spark-1/org/apache/predictionio/data/SparkVersionDependent.scala
deleted file mode 100644
index 0652e0b..0000000
--- a/data/src/main/spark-1/org/apache/predictionio/data/SparkVersionDependent.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-
-object SparkVersionDependent {
-
-  def sqlSession(sc: SparkContext): SQLContext = {
-    return new SQLContext(sc)
-  }
-
-}
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala b/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
deleted file mode 100644
index 3d07bdf..0000000
--- a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SparkSession
-
-object SparkVersionDependent {
-
-  def sqlSession(sc: SparkContext): SparkSession = {
-    SparkSession.builder().getOrCreate()
-  }
-
-}
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
index 7a45ca1..24cebc7 100644
--- a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
+++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
@@ -18,55 +18,39 @@
 
 package org.apache.predictionio.data.api
 
-import org.apache.predictionio.data.storage.{Storage, StorageMockContext}
-import akka.testkit.TestProbe
-import akka.actor.{ActorRef, ActorSystem, Props}
-import spray.http.HttpEntity
-import spray.http.HttpResponse
-import spray.http.ContentTypes
-import spray.httpx.RequestBuilding.Get
+import akka.event.Logging
+import org.apache.predictionio.data.storage.Storage
 import org.specs2.mutable.Specification
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
-class EventServiceSpec extends Specification {
 
-  val system = ActorSystem("EventServiceSpecSystem")
+class EventServiceSpec extends Specification with Specs2RouteTest {
+  val eventClient = Storage.getLEvents()
+  val accessKeysClient = Storage.getMetaDataAccessKeys()
+  val channelsClient = Storage.getMetaDataChannels()
 
-  def createEventServiceActor: ActorRef = {
-    val eventClient = Storage.getLEvents()
-    val accessKeysClient = Storage.getMetaDataAccessKeys()
-    val channelsClient = Storage.getMetaDataChannels()
+  val statsActorRef = system.actorSelection("/user/StatsActor")
+  val pluginsActorRef = system.actorSelection("/user/PluginsActor")
 
-    system.actorOf(
-      Props(
-        new EventServiceActor(
-          eventClient,
-          accessKeysClient,
-          channelsClient,
-          EventServerConfig()
-        )
-      )
-    )
-  }
+  val logger = Logging(system, getClass)
+  val config = EventServerConfig(ip = "0.0.0.0", port = 7070)
 
+  val route = EventServer.createRoute(
+    eventClient,
+    accessKeysClient,
+    channelsClient,
+    logger,
+    statsActorRef,
+    pluginsActorRef,
+    config
+  )
 
   "GET / request" should {
-    "properly produce OK HttpResponses" in new StorageMockContext {
-      Thread.sleep(2000)
-      val eventServiceActor = createEventServiceActor
-      val probe = TestProbe()(system)
-      probe.send(eventServiceActor, Get("/"))
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":"alive"}"""
-          )
-        )
-      )
-      success
+    "properly produce OK HttpResponses" in {
+      Get() ~> route ~> check {
+        status.intValue() shouldEqual 200
+        responseAs[String] shouldEqual """{"status":"alive"}"""
+      }
     }
   }
-
-  step(system.shutdown())
 }
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
index 5927824..297c25f 100644
--- a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
+++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
@@ -17,22 +17,20 @@
 
 package org.apache.predictionio.data.api
 
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
+import akka.event.Logging
+import akka.http.scaladsl.model.ContentTypes
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.server.Route
 import org.apache.predictionio.data.storage._
 import org.joda.time.DateTime
-import org.scalamock.specs2.MockContext
 import org.specs2.mutable.Specification
-import spray.http.HttpHeaders.RawHeader
-import spray.http.{ContentTypes, HttpEntity, HttpResponse}
-import spray.httpx.RequestBuilding._
 import sun.misc.BASE64Encoder
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
 import scala.concurrent.{ExecutionContext, Future}
 
-class SegmentIOAuthSpec extends Specification {
+class SegmentIOAuthSpec extends Specification with Specs2RouteTest {
 
-  val system = ActorSystem("EventServiceSpecSystem")
   sequential
   isolated
   val eventClient = new LEvents {
@@ -74,75 +72,51 @@
 
     override def get(k: String): Option[AccessKey] =
       k match {
-        case "abc" ⇒ Some(AccessKey(k, appId, Seq.empty))
-        case _ ⇒ None
+        case "abc" => Some(AccessKey(k, appId, Seq.empty))
+        case _ => None
       }
   }
 
-  val base64Encoder = new BASE64Encoder
+  val channelsClient = Storage.getMetaDataChannels()
 
-  def createEventServiceActor(): ActorRef = {
-    val channelsClient = Storage.getMetaDataChannels()
-    system.actorOf(
-      Props(
-        new EventServiceActor(
-          eventClient,
-          accessKeysClient,
-          channelsClient,
-          EventServerConfig()
-        )
-      )
-    )
-  }
+  val statsActorRef = system.actorSelection("/user/StatsActor")
+  val pluginsActorRef = system.actorSelection("/user/PluginsActor")
+
+  val base64Encoder = new BASE64Encoder
+  val logger = Logging(system, getClass)
+  val config = EventServerConfig(ip = "0.0.0.0", port = 7070)
+
+  val route = EventServer.createRoute(
+    eventClient,
+    accessKeysClient,
+    channelsClient,
+    logger,
+    statsActorRef,
+    pluginsActorRef,
+    config
+  )
 
   "Event Service" should {
-
     "reject with CredentialsRejected with invalid credentials" in new StorageMockContext {
-      val eventServiceActor = createEventServiceActor
       val accessKey = "abc123:"
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-          .withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKey")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Invalid accessKey."}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json")
+          .withHeaders(RawHeader("Authorization", s"Basic $accessKey")) ~> Route.seal(route) ~> check {
+        status.intValue() shouldEqual 401
+        responseAs[String] shouldEqual """{"message":"Invalid accessKey."}"""
+      }
       success
     }
+  }
 
     "reject with CredentialsMissed without credentials" in {
-      val eventServiceActor = createEventServiceActor
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Missing accessKey."}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json") ~> Route.seal(route) ~> check {
+        status.intValue() shouldEqual 401
+        responseAs[String] shouldEqual """{"message":"Missing accessKey."}"""
+      }
       success
     }
 
     "process SegmentIO identity request properly" in {
-      val eventServiceActor = createEventServiceActor
       val jsonReq =
         """
           |{
@@ -169,32 +143,15 @@
 
       val accessKey = "abc:"
       val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post(
-          "/webhooks/segmentio.json",
-          HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
-        ).withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKeyEncoded")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          201,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"eventId":"event_id"}"""
-          )
-        )
-      )
+      Post("/webhooks/segmentio.json")
+          .withHeaders(RawHeader("Authorization", s"Basic $accessKeyEncoded"))
+          .withEntity(ContentTypes.`application/json`, jsonReq) ~> route ~> check {
+        println(responseAs[String])
+        status.intValue() shouldEqual 201
+        responseAs[String] shouldEqual """{"eventId":"event_id"}"""
+      }
       success
-    }
   }
-
-  step(system.shutdown())
 }
 
 
diff --git a/doap.rdf b/doap.rdf
index 26f430f..0551978 100644
--- a/doap.rdf
+++ b/doap.rdf
@@ -35,8 +35,8 @@
     <category rdf:resource="http://projects.apache.org/category/big-data" />
     <repository>
       <GitRepository>
-        <location rdf:resource="https://git-wip-us.apache.org/repos/asf/predictionio.git"/>
-        <browse rdf:resource="https://git-wip-us.apache.org/repos/asf/predictionio.git"/>
+        <location rdf:resource="https://gitbox.apache.org/repos/asf/predictionio.git"/>
+        <browse rdf:resource="https://gitbox.apache.org/repos/asf/predictionio.git"/>
       </GitRepository>
     </repository>
     <maintainer>
diff --git a/python/pypio/shell.py b/docker/.ivy2/.keep
similarity index 79%
copy from python/pypio/shell.py
copy to docker/.ivy2/.keep
index b0295d3..ec20143 100644
--- a/python/pypio/shell.py
+++ b/docker/.ivy2/.keep
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
diff --git a/docker/JUPYTER.md b/docker/JUPYTER.md
new file mode 100644
index 0000000..c705147
--- /dev/null
+++ b/docker/JUPYTER.md
@@ -0,0 +1,160 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+Jupyter With PredictionIO
+=========================
+
+## Overview
+
+Using Jupyter based docker, you can use Jupyter Notebook with PredictionIO environment.
+It helps you with your exploratory data analysis (EDA).
+
+## Run Jupyter Notebook
+
+First of all, start Jupyter container with PredictionIO environment:
+
+```
+docker-compose -f docker-compose.jupyter.yml \
+  -f pgsql/docker-compose.base.yml \
+  -f pgsql/docker-compose.meta.yml \
+  -f pgsql/docker-compose.event.yml \
+  -f pgsql/docker-compose.model.yml \
+  up
+```
+
+Open `http://127.0.0.1:8888/` and then open a new terminal in Jupyter from `New` pulldown button.
+
+## Getting Started With Scala Based Template
+
+### Download Template
+
+Clone a template using Git:
+
+```
+cd templates/
+git clone https://github.com/apache/predictionio-template-recommender.git
+cd predictionio-template-recommender/
+```
+
+Replace a name with `MyApp1`.
+
+```
+sed -i "s/INVALID_APP_NAME/MyApp1/" engine.json
+```
+
+### Register New Application
+
+Using pio command, register a new application as `MyApp1`.
+
+```
+pio app new MyApp1
+```
+
+This command prints an access key as below.
+
+```
+[INFO] [Pio$] Access Key: bbe8xRHN1j3Sa8WeAT8TSxt5op3lUqhvXmKY1gLRjg70K-DUhHIJJ0-UzgKumxGm
+```
+
+Set it to an environment variable `ACCESS_KEY`.
+
+```
+ACCESS_KEY=bbe8xRHN1j3Sa8WeAT8TSxt5op3lUqhvXmKY1gLRjg70K-DUhHIJJ0-UzgKumxGm
+```
+
+### Import Training Data
+
+Download trainging data and import them to PredictionIO Event server.
+
+```
+curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_movielens_data.txt --create-dirs -o data/sample_movielens_data.txt
+python data/import_eventserver.py --access_key $ACCESS_KEY
+```
+
+### Build Template
+
+Build your template by the following command:
+
+```
+pio build --verbose
+```
+
+### Create Model
+
+To create a model, run:
+
+```
+pio train
+```
+
+## Getting Started With Python Based Template
+
+### Download Template
+
+Clone a template using Git:
+
+```
+cd templates/
+git clone https://github.com/jpioug/predictionio-template-iris.git
+predictionio-template-iris/
+```
+
+### Register New Application
+
+Using pio command, register a new application as `IrisApp`.
+
+```
+pio app new --access-key IRIS_TOKEN IrisApp
+```
+
+### Import Training Data
+
+Download trainging data and import them to PredictionIO Event server.
+
+```
+python data/import_eventserver.py
+```
+
+### Build Template
+
+Build your template by the following command:
+
+```
+pio build --verbose
+```
+
+### EDA
+
+To do data analysis, open `templates/predictionio-template-iris/eda.ipynb` on Jupyter.
+
+### Create Model
+
+You need to clear the following environment variables in the terminal before executing `pio train`.
+
+```
+unset PYSPARK_PYTHON
+unset PYSPARK_DRIVER_PYTHON
+unset PYSPARK_DRIVER_PYTHON_OPTS
+```
+
+To create a model, run:
+
+```
+pio train --main-py-file train.py
+```
+
+
diff --git a/docker/README.md b/docker/README.md
new file mode 100644
index 0000000..97e1bf5
--- /dev/null
+++ b/docker/README.md
@@ -0,0 +1,269 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+Apache PredictionIO Docker
+==========================
+
+## Overview
+
+PredictionIO Docker provides Docker image for use in development and production environment.
+
+
+## Usage
+
+### Run PredictionIO with Selectable docker-compose Files
+
+You can choose storages for event/meta/model to select docker-compose.yml.
+
+```
+docker-compose -f docker-compose.yml -f ... up
+```
+
+Supported storages are as below:
+
+| Type  | Storage                          |
+|:-----:|:---------------------------------|
+| Event | Postgresql, MySQL, Elasticsearch |
+| Meta  | Postgresql, MySQL, Elasticsearch |
+| Model | Postgresql, MySQL, LocalFS       |
+
+If you run PredictionIO with Postgresql, run as below:
+
+```
+docker-compose -f docker-compose.yml \
+  -f pgsql/docker-compose.base.yml \
+  -f pgsql/docker-compose.meta.yml \
+  -f pgsql/docker-compose.event.yml \
+  -f pgsql/docker-compose.model.yml \
+  up
+```
+
+To use localfs as model storage, change as below:
+
+```
+docker-compose -f docker-compose.yml \
+  -f pgsql/docker-compose.base.yml \
+  -f pgsql/docker-compose.meta.yml \
+  -f pgsql/docker-compose.event.yml \
+  -f localfs/docker-compose.model.yml \
+  up
+```
+
+## Tutorial
+
+In this demo, we will show you how to build a recommendation template.
+
+### Run PredictionIO environment
+
+The following command starts PredictionIO with an event server.
+PredictionIO docker image mounts ./templates directory to /templates.
+
+```
+$ docker-compose -f docker-compose.yml \
+    -f pgsql/docker-compose.base.yml \
+    -f pgsql/docker-compose.meta.yml \
+    -f pgsql/docker-compose.event.yml \
+    -f pgsql/docker-compose.model.yml \
+    up
+```
+
+We provide `pio-docker` command as an utility for `pio` command.
+`pio-docker` invokes `pio` command in PredictionIO container.
+
+```
+$ export PATH=`pwd`/bin:$PATH
+$ pio-docker status
+...
+[INFO] [Management$] Your system is all ready to go.
+```
+
+### Download Recommendation Template
+
+This demo uses [predictionio-template-recommender](https://github.com/apache/predictionio-template-recommender).
+
+```
+$ cd templates
+$ git clone https://github.com/apache/predictionio-template-recommender.git MyRecommendation
+$ cd MyRecommendation
+```
+
+### Register Application
+
+You need to register this application to PredictionIO:
+
+```
+$ pio-docker app new MyApp1
+[INFO] [App$] Initialized Event Store for this app ID: 1.
+[INFO] [Pio$] Created a new app:
+[INFO] [Pio$]       Name: MyApp1
+[INFO] [Pio$]         ID: 1
+[INFO] [Pio$] Access Key: i-zc4EleEM577EJhx3CzQhZZ0NnjBKKdSbp3MiR5JDb2zdTKKzH9nF6KLqjlMnvl
+```
+
+Since an access key is required in subsequent steps, set it to ACCESS_KEY.
+
+```
+$ ACCESS_KEY=i-zc4EleEM577EJhx3CzQhZZ0NnjBKKdSbp3MiR5JDb2zdTKKzH9nF6KLqjlMnvl
+```
+
+`engine.json` contains an application name, so replace `INVALID_APP_NAME` with `MyApp1`.
+
+```
+...
+"datasource": {
+  "params" : {
+    "appName": "MyApp1"
+  }
+},
+...
+```
+
+### Import Data
+
+To import training data to Event server for PredictionIO, this template provides an import tool.
+The tool depends on PredictionIO Python SDK and install as below:
+
+```
+$ pip install predictionio
+```
+and then import data:
+```
+$ curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_movielens_data.txt --create-dirs -o data/sample_movielens_data.txt
+$ python data/import_eventserver.py --access_key $ACCESS_KEY
+```
+
+### Build Template
+
+This is Scala based template.
+So, you need to build this template by `pio` command.
+
+```
+$ pio-docker build --verbose
+```
+
+### Train and Create Model
+
+To train a recommendation model, run `train` sub-command:
+
+```
+$ pio-docker train
+```
+
+### Deploy Model
+
+If a recommendation model is created successfully, deploy it to Prediction server for PredictionIO.
+
+```
+$ pio-docker deploy
+
+```
+You can check predictions as below:
+```
+$ curl -H "Content-Type: application/json" \
+-d '{ "user": "1", "num": 4 }' http://localhost:8000/queries.json
+```
+
+## Advanced Topics
+
+### Run with Elasticsearch
+
+For Elasticsearch, Meta and Event storage are available.
+To start PredictionIO with Elasticsearch,
+
+```
+docker-compose -f docker-compose.yml \
+  -f elasticsearch/docker-compose.base.yml \
+  -f elasticsearch/docker-compose.meta.yml \
+  -f elasticsearch/docker-compose.event.yml \
+  -f localfs/docker-compose.model.yml \
+  up
+```
+
+### Run with Spark Cluster
+
+Adding `docker-compose.spark.yml`, you can use Spark cluster on `pio train`.
+
+```
+docker-compose -f docker-compose.yml \
+  -f docker-compose.spark.yml \
+  -f elasticsearch/docker-compose.base.yml \
+  -f elasticsearch/docker-compose.meta.yml \
+  -f elasticsearch/docker-compose.event.yml \
+  -f localfs/docker-compose.model.yml \
+  up
+```
+
+To submit a training task to Spark Cluster, run `pio-deploy train` with `--master` option:
+
+```
+pio-docker train -- --master spark://spark-master:7077
+```
+
+See `docker-compose.spark.yml` if changing settings for Spark Cluster.
+
+### Run Engine Server
+
+To deploy your engine and start an engine server, run Docker with `docker-compose.deploy.yml`.
+
+```
+docker-compose -f docker-compose.yml \
+  -f pgsql/docker-compose.base.yml \
+  -f pgsql/docker-compose.meta.yml \
+  -f pgsql/docker-compose.event.yml \
+  -f pgsql/docker-compose.model.yml \
+  -f docker-compose.deploy.yml \
+  up
+```
+
+See `deploy/run.sh` and `docker-compose.deploy.yml` if changing a deployment.
+
+### Run with Jupyter
+
+You can launch PredictionIO with Jupyter.
+
+```
+docker-compose -f docker-compose.jupyter.yml \
+  -f pgsql/docker-compose.base.yml \
+  -f pgsql/docker-compose.meta.yml \
+  -f pgsql/docker-compose.event.yml \
+  -f pgsql/docker-compose.model.yml \
+  up
+```
+
+For more information, see [JUPYTER.md](./JUPYTER.md).
+
+## Development
+
+### Build Base Docker Image
+
+```
+docker build -t predictionio/pio pio
+```
+
+### Build Jupyter Docker Image
+
+```
+docker build -t predictionio/pio-jupyter jupyter
+```
+
+### Push Docker Image
+
+```
+docker push predictionio/pio:latest
+docker tag predictionio/pio:latest predictionio/pio:$PIO_VERSION
+docker push predictionio/pio:$PIO_VERSION
+```
diff --git a/docker/bin/pio-docker b/docker/bin/pio-docker
new file mode 100755
index 0000000..8046060
--- /dev/null
+++ b/docker/bin/pio-docker
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BASE_WORK_DIR=/templates
+CURRENT_DIR=`pwd`
+
+get_container_id() {
+  if [ x"$PIO_CONTAINER_ID" != "x" ] ; then
+    echo $PIO_CONTAINER_ID
+    return
+  fi
+  for i in `docker ps -f "name=pio" -q` ; do
+    echo $i
+    return
+  done
+}
+
+get_current_dir() {
+  if [ x"$PIO_CURRENT_DIR" != "x" ] ; then
+    echo $PIO_CURRENT_DIR
+    return
+  fi
+  D=`echo $CURRENT_DIR | sed -e "s,.*$BASE_WORK_DIR,$BASE_WORK_DIR,"`
+  if [[ $D = $BASE_WORK_DIR* ]] ; then
+    echo $D
+  else
+    echo $BASE_WORK_DIR
+  fi
+}
+
+cid=`get_container_id`
+if [ x"$cid" = "x" ] ; then
+  echo "Docker Container is not found."
+  exit 1
+fi
+
+wdir=`get_current_dir`
+
+docker exec -w $wdir -it $cid pio $@
+
diff --git a/python/pypio/shell.py b/docker/deploy/run.sh
similarity index 79%
copy from python/pypio/shell.py
copy to docker/deploy/run.sh
index b0295d3..f225a62 100644
--- a/python/pypio/shell.py
+++ b/docker/deploy/run.sh
@@ -1,3 +1,5 @@
+#!/usr/bin/env bash
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,9 +17,5 @@
 # limitations under the License.
 #
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+cd /templates/$PIO_TEMPLATE_NAME
+pio deploy
diff --git a/python/pypio/shell.py b/docker/docker-compose.deploy.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/docker-compose.deploy.yml
index b0295d3..145a960 100644
--- a/python/pypio/shell.py
+++ b/docker/docker-compose.deploy.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+version: "3"
+services:
+  pio:
+    environment:
+      - "PIO_TEMPLATE_NAME=MyRecommendation"
+      - "PIO_RUN_FILE=/deploy/run.sh"
+    volumes:
+      - ./deploy:/deploy
diff --git a/python/pypio/shell.py b/docker/docker-compose.jupyter.yml
similarity index 65%
copy from python/pypio/shell.py
copy to docker/docker-compose.jupyter.yml
index b0295d3..6e25a4a 100644
--- a/python/pypio/shell.py
+++ b/docker/docker-compose.jupyter.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+version: "3"
+services:
+  pio:
+    image: predictionio/pio-jupyter:latest
+    ports:
+      - 7070:7070
+      - 8000:8000
+      - 8888:8888
+    volumes:
+      - ./templates:/home/jovyan/templates
+      - ./.ivy2:/home/jovyan/.ivy2
+    environment:
+      - CHOWN_HOME=yes
+      - GRANT_SUDO=yes
+      - VOLUME_UID=yes
+      - "PYSPARK_DRIVER_PYTHON_OPTS=notebook --NotebookApp.token=''"
+    dns: 8.8.8.8
diff --git a/python/pypio/shell.py b/docker/docker-compose.spark.yml
similarity index 62%
copy from python/pypio/shell.py
copy to docker/docker-compose.spark.yml
index b0295d3..8f3c43e 100644
--- a/python/pypio/shell.py
+++ b/docker/docker-compose.spark.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+version: "3"
+services:
+  spark-master:
+    image: bde2020/spark-master:2.2.2-hadoop2.7
+    container_name: spark-master
+    ports:
+      - "8080:8080"
+      - "7077:7077"
+    environment:
+      - INIT_DAEMON_STEP=setup_spark
+  spark-worker-1:
+    image: bde2020/spark-worker:2.2.2-hadoop2.7
+    container_name: spark-worker-1
+    depends_on:
+      - spark-master
+    ports:
+      - "8081:8081"
+    environment:
+      - "SPARK_MASTER=spark://spark-master:7077"
diff --git a/python/pypio/shell.py b/docker/docker-compose.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/docker-compose.yml
index b0295d3..e42c907 100644
--- a/python/pypio/shell.py
+++ b/docker/docker-compose.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+version: "3"
+services:
+  pio:
+    image: predictionio/pio:latest
+    ports:
+      - 7070:7070
+      - 8000:8000
+    volumes:
+      - ./templates:/templates
+    dns: 8.8.8.8
diff --git a/docker/elasticsearch/docker-compose.base.yml b/docker/elasticsearch/docker-compose.base.yml
new file mode 100644
index 0000000..4784f4a
--- /dev/null
+++ b/docker/elasticsearch/docker-compose.base.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+services:
+  elasticsearch:
+    image: docker.elastic.co/elasticsearch/elasticsearch:5.6.4
+    environment:
+      - xpack.graph.enabled=false
+      - xpack.ml.enabled=false
+      - xpack.monitoring.enabled=false
+      - xpack.security.enabled=false
+      - xpack.watcher.enabled=false
+      - cluster.name=predictionio
+      - bootstrap.memory_lock=true
+      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
+    ulimits:
+      memlock:
+        soft: -1
+        hard: -1
+  pio:
+    depends_on:
+      - elasticsearch
+    environment:
+      PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE: elasticsearch
+      PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS: elasticsearch
+      PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS: 9200
+      PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES: http
+
diff --git a/python/pypio/shell.py b/docker/elasticsearch/docker-compose.event.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/elasticsearch/docker-compose.event.yml
index b0295d3..5a77b6a 100644
--- a/python/pypio/shell.py
+++ b/docker/elasticsearch/docker-compose.event.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME: pio_event
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE: ELASTICSEARCH
 
diff --git a/python/pypio/shell.py b/docker/elasticsearch/docker-compose.meta.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/elasticsearch/docker-compose.meta.yml
index b0295d3..0ce31dd 100644
--- a/python/pypio/shell.py
+++ b/docker/elasticsearch/docker-compose.meta.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_METADATA_NAME: pio_meta
+      PIO_STORAGE_REPOSITORIES_METADATA_SOURCE: ELASTICSEARCH
 
diff --git a/docker/jupyter/Dockerfile b/docker/jupyter/Dockerfile
new file mode 100644
index 0000000..78e71b2
--- /dev/null
+++ b/docker/jupyter/Dockerfile
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM predictionio/pio:latest
+
+ENV DEBIAN_FRONTEND noninteractive
+
+RUN apt-get update \
+    && apt install -y build-essential curl git gcc make openssl libssl-dev libbz2-dev \
+    apt-transport-https ca-certificates g++ gnupg graphviz lsb-release openssh-client zip \
+    libreadline-dev libsqlite3-dev cmake libxml2-dev wget bzip2 sudo vim unzip locales \
+    && apt-get clean \
+    && rm -rf /var/lib/apt/lists/*
+
+RUN echo "en_US.UTF-8 UTF-8" > /etc/locale.gen && locale-gen
+
+ENV LC_ALL=en_US.UTF-8 \
+    LANG=en_US.UTF-8 \
+    LANGUAGE=en_US.UTF-8 \
+    NB_USER=jovyan \
+    NB_UID=1000 \
+    NB_GID=100 \
+    CONDA_DIR=/opt/conda \
+    PIP_DEFAULT_TIMEOUT=180
+ENV PATH=$CONDA_DIR/bin:$PATH \
+    HOME=/home/$NB_USER
+
+ADD fix-permissions /usr/local/bin/fix-permissions
+RUN chmod +x /usr/local/bin/fix-permissions \
+    && groupadd wheel -g 11 \
+    && echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su \
+    && useradd -m -s /bin/bash -N -u $NB_UID $NB_USER \
+    && mkdir -p $CONDA_DIR \
+    && chmod g+w /etc/passwd \
+    && fix-permissions $HOME \
+    && fix-permissions $CONDA_DIR
+
+USER $NB_USER
+
+ENV MINICONDA_VERSION 4.4.10
+RUN wget -q https://repo.continuum.io/miniconda/Miniconda3-${MINICONDA_VERSION}-Linux-x86_64.sh -O /tmp/miniconda.sh \
+    && echo 'bec6203dbb2f53011e974e9bf4d46e93 */tmp/miniconda.sh' | md5sum -c - \
+    && bash /tmp/miniconda.sh -f -b -p $CONDA_DIR \
+    && rm /tmp/miniconda.sh \
+    && conda config --system --prepend channels conda-forge \
+    && conda config --system --set auto_update_conda false \
+    && conda config --system --set show_channel_urls true \
+    && conda install --quiet --yes conda="${MINICONDA_VERSION%.*}.*" \
+    && conda update --all --quiet --yes \
+    && conda clean -tipsy \
+    && rm -rf /home/$NB_USER/.cache/yarn \
+    && fix-permissions $CONDA_DIR \
+    && fix-permissions /home/$NB_USER
+
+RUN conda install --quiet --yes 'tini=0.18.0' \
+    && conda list tini | grep tini | tr -s ' ' | cut -d ' ' -f 1,2 >> $CONDA_DIR/conda-meta/pinned \
+    && conda clean -tipsy \
+    && fix-permissions $CONDA_DIR \
+    && fix-permissions /home/$NB_USER
+
+RUN conda install --quiet --yes 'notebook=5.6.*' 'jupyterlab=0.34.*' nodejs\
+    && jupyter labextension install @jupyterlab/hub-extension@^0.11.0 \
+    && jupyter notebook --generate-config \
+    && conda clean -tipsy \
+    && npm cache clean --force \
+    && rm -rf $CONDA_DIR/share/jupyter/lab/staging \
+    && rm -rf /home/$NB_USER/.cache/yarn \
+    && fix-permissions $CONDA_DIR \
+    && fix-permissions /home/$NB_USER
+
+ADD requirements.txt /tmp/requirements.txt
+RUN pip --no-cache-dir install -r /tmp/requirements.txt \
+    && fix-permissions $CONDA_DIR \
+    && fix-permissions /home/$NB_USER
+
+COPY jupyter_notebook_config.py /home/$NB_USER/.jupyter/
+COPY start*.sh /usr/local/bin/
+
+USER root
+RUN chmod +x /usr/local/bin/*.sh
+
+EXPOSE 8888
+WORKDIR $HOME
+ENTRYPOINT ["tini", "--"]
+CMD ["/usr/local/bin/start-jupyter.sh"]
+
diff --git a/python/pypio/shell.py b/docker/jupyter/fix-permissions
similarity index 72%
copy from python/pypio/shell.py
copy to docker/jupyter/fix-permissions
index b0295d3..ab276ec 100644
--- a/python/pypio/shell.py
+++ b/docker/jupyter/fix-permissions
@@ -1,3 +1,5 @@
+#!/usr/bin/env bash
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,9 +17,20 @@
 # limitations under the License.
 #
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
+set -e
 
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
+for d in $@; do
+  find "$d" \
+    ! \( \
+      -group $NB_GID \
+      -a -perm -g+rwX  \
+    \) \
+    -exec chgrp $NB_GID {} \; \
+    -exec chmod g+rwX {} \;
+  find "$d" \
+    \( \
+      -type d \
+      -a ! -perm -6000  \
+    \) \
+    -exec chmod +6000 {} \;
+done
diff --git a/docker/jupyter/jupyter_notebook_config.py b/docker/jupyter/jupyter_notebook_config.py
new file mode 100644
index 0000000..1bd9f2a
--- /dev/null
+++ b/docker/jupyter/jupyter_notebook_config.py
@@ -0,0 +1,18 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+c = get_config()
+c.NotebookApp.ip = '*'
+c.NotebookApp.port = 8888
+c.NotebookApp.open_browser = False
+
diff --git a/docker/jupyter/requirements.txt b/docker/jupyter/requirements.txt
new file mode 100644
index 0000000..b8a5c32
--- /dev/null
+++ b/docker/jupyter/requirements.txt
@@ -0,0 +1,14 @@
+cython
+google-cloud
+h5py
+ipywidgets
+jupyter_contrib_nbextensions
+keras
+matplotlib
+pandas
+pandas-gbq
+predictionio
+sklearn
+tensor2tensor
+tensorflow
+widgetsnbextension
diff --git a/docker/jupyter/start-jupyter.sh b/docker/jupyter/start-jupyter.sh
new file mode 100644
index 0000000..d75598f
--- /dev/null
+++ b/docker/jupyter/start-jupyter.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+# store PIO environment to pio-env.sh
+PIO_ENV_FILE=/etc/predictionio/pio-env.sh
+env | grep ^PIO_ >> $PIO_ENV_FILE
+if [ $(grep _MYSQL_ $PIO_ENV_FILE | wc -l) = 0 ] ; then
+  sed -i "s/^MYSQL/#MYSQL/" $PIO_ENV_FILE
+fi
+
+# start event server
+sh /usr/bin/pio_run &
+
+export PYSPARK_PYTHON=$CONDA_DIR/bin/python
+if [ x"$PYSPARK_DRIVER_PYTHON" = "x" ] ; then
+  export PYSPARK_DRIVER_PYTHON=$CONDA_DIR/bin/jupyter
+fi
+if [ x"$PYSPARK_DRIVER_PYTHON_OPTS" = "x" ] ; then
+  export PYSPARK_DRIVER_PYTHON_OPTS=notebook
+fi
+
+. /usr/local/bin/start.sh $PIO_HOME/bin/pio-shell --with-pyspark
+
diff --git a/docker/jupyter/start.sh b/docker/jupyter/start.sh
new file mode 100644
index 0000000..c4b9330
--- /dev/null
+++ b/docker/jupyter/start.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+if [[ "$VOLUME_UID" == "1" || "$VOLUME_UID" == 'yes' ]]; then
+  DIR_UID=`ls -lnd /home/jovyan/templates | awk '{print $3}'`
+  if [ x"$DIR_UID" != "x" -a x"$DIR_UID" != "x0" ] ; then
+    NB_UID=$DIR_UID
+  fi
+fi
+
+if [ $(id -u) == 0 ] ; then
+  if id jovyan &> /dev/null ; then
+    echo "Set username to $NB_USER"
+    usermod -d /home/$NB_USER -l $NB_USER jovyan
+  fi
+
+  if [[ "$CHOWN_HOME" == "1" || "$CHOWN_HOME" == 'yes' ]]; then
+    echo "Change ownership of /home/$NB_USER to $NB_UID"
+    chown -R $NB_UID /home/$NB_USER
+  fi
+  if [ ! -z "$CHOWN_EXTRA" ]; then
+    for extra_dir in $(echo $CHOWN_EXTRA | tr ',' ' '); do
+      chown -R $NB_UID $extra_dir
+    done
+  fi
+
+  if [[ "$NB_USER" != "jovyan" ]]; then
+    if [[ ! -e "/home/$NB_USER" ]]; then
+      echo "Move home dir to /home/$NB_USER"
+      mv /home/jovyan "/home/$NB_USER"
+    fi
+    if [[ "$PWD/" == "/home/jovyan/"* ]]; then
+      newcwd="/home/$NB_USER/${PWD:13}"
+      echo "Set CWD to $newcwd"
+      cd "$newcwd"
+    fi
+  fi
+
+  if [ "$NB_UID" != $(id -u $NB_USER) ] ; then
+    echo "Set $NB_USER to uid:$NB_UID"
+    usermod -u $NB_UID $NB_USER
+  fi
+
+  if [ "$NB_GID" != $(id -g $NB_USER) ] ; then
+    echo "Add $NB_USER to gid:$NB_GID"
+    groupadd -g $NB_GID -o ${NB_GROUP:-${NB_USER}}
+    usermod -g $NB_GID -a -G $NB_GID,100 $NB_USER
+  fi
+
+  if [[ "$GRANT_SUDO" == "1" || "$GRANT_SUDO" == 'yes' ]]; then
+    echo "Set sudo access to $NB_USER"
+    echo "$NB_USER ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/notebook
+  fi
+
+  echo "Execute command as $NB_USER"
+  exec su $NB_USER -c "env PATH=$PATH $*"
+
+else
+  echo "Execute command"
+  exec $*
+
+fi
+
diff --git a/python/pypio/shell.py b/docker/localfs/docker-compose.model.yml
similarity index 65%
copy from python/pypio/shell.py
copy to docker/localfs/docker-compose.model.yml
index b0295d3..f38ff53 100644
--- a/python/pypio/shell.py
+++ b/docker/localfs/docker-compose.model.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_MODELDATA_NAME: pio_model
+      PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE: LOCALFS
+      PIO_FS_BASEDIR: /work/pio_store
+      PIO_FS_ENGINESDIR: /work/pio_store/engines
+      PIO_FS_TMPDIR: /work/pio_store/tmp
+      PIO_STORAGE_SOURCES_LOCALFS_TYPE: localfs
+      PIO_STORAGE_SOURCES_LOCALFS_PATH: /work/pio_store/models
 
diff --git a/docker/mysql/docker-compose.base.yml b/docker/mysql/docker-compose.base.yml
new file mode 100644
index 0000000..fad5309
--- /dev/null
+++ b/docker/mysql/docker-compose.base.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+services:
+  mysql:
+    image: mysql:8
+    command: mysqld --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
+    environment:
+      MYSQL_ROOT_PASSWORD: root
+      MYSQL_USER: pio
+      MYSQL_PASSWORD: pio
+      MYSQL_DATABASE: pio
+  pio:
+    depends_on:
+      - mysql
+    environment:
+      PIO_STORAGE_SOURCES_MYSQL_TYPE: jdbc
+      PIO_STORAGE_SOURCES_MYSQL_URL: "jdbc:mysql://mysql/pio"
+      PIO_STORAGE_SOURCES_MYSQL_USERNAME: pio
+      PIO_STORAGE_SOURCES_MYSQL_PASSWORD: pio
+
diff --git a/python/pypio/shell.py b/docker/mysql/docker-compose.event.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/mysql/docker-compose.event.yml
index b0295d3..f5f4035 100644
--- a/python/pypio/shell.py
+++ b/docker/mysql/docker-compose.event.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME: pio_event
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE: MYSQL
 
diff --git a/python/pypio/shell.py b/docker/mysql/docker-compose.meta.yml
similarity index 79%
rename from python/pypio/shell.py
rename to docker/mysql/docker-compose.meta.yml
index b0295d3..f7a5ece 100644
--- a/python/pypio/shell.py
+++ b/docker/mysql/docker-compose.meta.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_METADATA_NAME: pio_meta
+      PIO_STORAGE_REPOSITORIES_METADATA_SOURCE: MYSQL
 
diff --git a/python/pypio/shell.py b/docker/mysql/docker-compose.model.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/mysql/docker-compose.model.yml
index b0295d3..4a48684 100644
--- a/python/pypio/shell.py
+++ b/docker/mysql/docker-compose.model.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_MODELDATA_NAME: pio_model
+      PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE: MYSQL
 
diff --git a/python/pypio/shell.py b/docker/pgsql/docker-compose.base.yml
similarity index 63%
copy from python/pypio/shell.py
copy to docker/pgsql/docker-compose.base.yml
index b0295d3..297d6a8 100644
--- a/python/pypio/shell.py
+++ b/docker/pgsql/docker-compose.base.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  postgres:
+    image: postgres:9
+    environment:
+      POSTGRES_USER: pio
+      POSTGRES_PASSWORD: pio
+      POSTGRES_INITDB_ARGS: --encoding=UTF8
+  pio:
+    depends_on:
+      - postgres
+    environment:
+      PIO_STORAGE_SOURCES_PGSQL_TYPE: jdbc
+      PIO_STORAGE_SOURCES_PGSQL_URL: "jdbc:postgresql://postgres/pio"
+      PIO_STORAGE_SOURCES_PGSQL_USERNAME: pio
+      PIO_STORAGE_SOURCES_PGSQL_PASSWORD: pio
 
diff --git a/python/pypio/shell.py b/docker/pgsql/docker-compose.event.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/pgsql/docker-compose.event.yml
index b0295d3..2579141 100644
--- a/python/pypio/shell.py
+++ b/docker/pgsql/docker-compose.event.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME: pio_event
+      PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE: PGSQL
 
diff --git a/python/pypio/shell.py b/docker/pgsql/docker-compose.meta.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/pgsql/docker-compose.meta.yml
index b0295d3..345bfbe 100644
--- a/python/pypio/shell.py
+++ b/docker/pgsql/docker-compose.meta.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_METADATA_NAME: pio_meta
+      PIO_STORAGE_REPOSITORIES_METADATA_SOURCE: PGSQL
 
diff --git a/python/pypio/shell.py b/docker/pgsql/docker-compose.model.yml
similarity index 79%
copy from python/pypio/shell.py
copy to docker/pgsql/docker-compose.model.yml
index b0295d3..329649f 100644
--- a/python/pypio/shell.py
+++ b/docker/pgsql/docker-compose.model.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
+version: "3"
+services:
+  pio:
+    environment:
+      PIO_STORAGE_REPOSITORIES_MODELDATA_NAME: pio_model
+      PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE: PGSQL
 
diff --git a/docker/pio/Dockerfile b/docker/pio/Dockerfile
new file mode 100644
index 0000000..465bb07
--- /dev/null
+++ b/docker/pio/Dockerfile
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8
+
+ARG PIO_GIT_URL=https://github.com/apache/predictionio.git
+ARG PIO_TAG=v0.13.0
+ENV SCALA_VERSION=2.11.12
+ENV SPARK_VERSION=2.2.3
+ENV HADOOP_VERSION=2.7.7
+ENV ELASTICSEARCH_VERSION=5.5.3
+ENV PGSQL_VERSION=42.2.4
+ENV MYSQL_VERSION=8.0.12
+ENV PIO_HOME=/usr/share/predictionio
+
+RUN apt-get update && \
+    apt-get install -y dpkg-dev fakeroot && \
+    apt-get clean && \
+    rm -rf /var/lib/apt/lists/*
+
+WORKDIR /opt/src
+RUN git clone -b $PIO_TAG $PIO_GIT_URL
+WORKDIR /opt/src/predictionio
+RUN bash ./make-distribution.sh \
+      -Dscala.version=$SCALA_VERSION \
+      -Dspark.version=$SPARK_VERSION \
+      -Dhadoop.version=$HADOOP_VERSION \
+      -Delasticsearch.version=$ELASTICSEARCH_VERSION \
+      --with-deb && \
+    dpkg -i ./assembly/target/predictionio_*.deb && \
+    cp -r ./python /usr/share/predictionio && \
+    mkdir /var/log/predictionio && \
+    rm -rf /opt/src/predictionio/*
+
+
+RUN cp /etc/predictionio/pio-env.sh /etc/predictionio/pio-env.sh.orig && \
+    echo "#!/usr/bin/env bash" > /etc/predictionio/pio-env.sh
+RUN curl -o $PIO_HOME/lib/postgresql-$PGSQL_VERSION.jar \
+    http://central.maven.org/maven2/org/postgresql/postgresql/$PGSQL_VERSION/postgresql-$PGSQL_VERSION.jar && \
+    echo "POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-$PGSQL_VERSION.jar" >> /etc/predictionio/pio-env.sh && \
+    echo "MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-$MYSQL_VERSION.jar" >> /etc/predictionio/pio-env.sh
+
+WORKDIR /usr/share
+RUN curl -o /opt/src/spark-$SPARK_VERSION.tgz \
+    http://www-us.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz && \
+    tar zxvf /opt/src/spark-$SPARK_VERSION.tgz && \
+    echo "SPARK_HOME="`pwd`/`ls -d spark*` >> /etc/predictionio/pio-env.sh && \
+    rm -rf /opt/src
+
+WORKDIR /templates
+ADD pio_run /usr/bin/pio_run
+
+EXPOSE 7070
+EXPOSE 8000
+
+CMD ["sh", "/usr/bin/pio_run"]
+
diff --git a/docker/pio/pio_run b/docker/pio/pio_run
new file mode 100644
index 0000000..83ac6cc
--- /dev/null
+++ b/docker/pio/pio_run
@@ -0,0 +1,64 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. /etc/predictionio/pio-env.sh
+
+# check elasticsearch status
+if [ x"$PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE" != "x" ] ; then
+  RET=-1
+  COUNT=0
+  ES_HOST=`echo $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS | sed -e "s/,.*//"`
+  ES_PORT=`echo $PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS | sed -e "s/,.*//"`
+  # Wait for elasticsearch startup
+  while [ $RET != 0 -a $COUNT -lt 10 ] ; do
+    echo "Waiting for ${ES_HOST}..."
+    curl --connect-timeout 60 --retry 10 -s "$ES_HOST:$ES_PORT/_cluster/health?wait_for_status=green&timeout=1m"
+    RET=$?
+    COUNT=`expr $COUNT + 1`
+    sleep 1
+  done
+fi
+
+# check mysql jar file
+if [ x"$PIO_STORAGE_SOURCES_MYSQL_TYPE" != "x" ] ; then
+  MYSQL_JAR_FILE=$PIO_HOME/lib/mysql-connector-java-$MYSQL_VERSION.jar
+  if [ ! -f $MYSQL_JAR_FILE ] ; then
+    curl -o $MYSQL_JAR_FILE http://central.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar
+  fi
+fi
+
+# Check PIO status
+RET=-1
+COUNT=0
+while [ $RET != 0 -a $COUNT -lt 10 ] ; do
+  echo "Waiting for PredictionIO..."
+  $PIO_HOME/bin/pio status
+  RET=$?
+  COUNT=`expr $COUNT + 1`
+  sleep 1
+done
+
+
+if [ x"$PIO_RUN_FILE" != "x" ] ; then
+  sh $PIO_RUN_FILE
+else
+  # Start PIO Event Server
+  $PIO_HOME/bin/pio eventserver
+fi
+
diff --git a/python/pypio/shell.py b/docker/templates/.keep
similarity index 79%
copy from python/pypio/shell.py
copy to docker/templates/.keep
index b0295d3..ec20143 100644
--- a/python/pypio/shell.py
+++ b/docker/templates/.keep
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,11 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-
-from pypio.data import PEventStore
-from pypio.workflow import CleanupFunctions
-
-p_event_store = PEventStore(spark._jsparkSession, sqlContext)
-cleanup_functions = CleanupFunctions(sqlContext)
-
diff --git a/docs/manual/data/versions.yml b/docs/manual/data/versions.yml
index 31a0659..46b03e0 100644
--- a/docs/manual/data/versions.yml
+++ b/docs/manual/data/versions.yml
@@ -1,7 +1,7 @@
 pio: 0.13.0
 spark: 2.1.1
 spark_download_filename: spark-2.1.1-bin-hadoop2.6
-elasticsearch_download_filename: elasticsearch-5.5.2
+elasticsearch_download_filename: elasticsearch-5.6.9
 hbase_version: 1.2.6
 hbase_basename: hbase-1.2.6
 hbase_variant: bin
diff --git a/docs/manual/source/datacollection/eventapi.html.md b/docs/manual/source/datacollection/eventapi.html.md
index dea3782..be4b040 100644
--- a/docs/manual/source/datacollection/eventapi.html.md
+++ b/docs/manual/source/datacollection/eventapi.html.md
@@ -67,7 +67,7 @@
 
 ```
 HTTP/1.1 200 OK
-Server: spray-can/1.2.1
+Server: akka-http/10.1.5
 Date: Wed, 10 Sep 2014 22:37:30 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 18
@@ -284,7 +284,7 @@
 
 ```
 HTTP/1.1 201 Created
-Server: spray-can/1.2.1
+Server: akka-http/10.1.5
 Date: Wed, 10 Sep 2014 22:51:33 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 41
diff --git a/docs/manual/source/datacollection/eventmodel.html.md.erb b/docs/manual/source/datacollection/eventmodel.html.md.erb
index ec8e5a8..676de54 100644
--- a/docs/manual/source/datacollection/eventmodel.html.md.erb
+++ b/docs/manual/source/datacollection/eventmodel.html.md.erb
@@ -139,7 +139,7 @@
 
 ```
 HTTP/1.1 201 Created
-Server: spray-can/1.3.2
+Server: akka-http/10.1.5
 Date: Tue, 02 Jun 2015 23:13:58 GMT
 Content-Type: application/json; charset=UTF-8
 Content-Length: 57
@@ -339,7 +339,7 @@
 ###Sample Response:
 
     HTTP/1.1 200 Successful
-    Server: spray-can/1.2.1
+    Server: akka-http/10.1.5
     Date: Wed, 10 Sep 2014 22:51:33 GMT
     Content-Type: application/json; charset=UTF-8
     Content-Length: 41
diff --git a/docs/manual/source/deploy/monitoring.html.md b/docs/manual/source/deploy/monitoring.html.md
index 191898b..886b001 100644
--- a/docs/manual/source/deploy/monitoring.html.md
+++ b/docs/manual/source/deploy/monitoring.html.md
@@ -119,7 +119,7 @@
  exit 0
 ```
 
-There can be  cases when the process is running but the engine is down however. If the spray REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried.
+There can be  cases when the process is running but the engine is down however. If the Akka HTTP REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried.
 
 This sort of crash can be taken care of by using monits `check program` capability.
 
diff --git a/docs/manual/source/index.html.md.erb b/docs/manual/source/index.html.md.erb
index 47f1c23..7c74901 100644
--- a/docs/manual/source/index.html.md.erb
+++ b/docs/manual/source/index.html.md.erb
@@ -41,7 +41,7 @@
 * simplify data infrastructure management.
 
 Apache PredictionIO® can be [installed](/install/) as a full machine
-learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Spray**
+learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Akka HTTP**
 and **Elasticsearch**, which simplifies and accelerates scalable machine
 learning infrastructure management.
 
diff --git a/docs/manual/source/install/index.html.md.erb b/docs/manual/source/install/index.html.md.erb
index 1f8da3b..7da2a94 100644
--- a/docs/manual/source/install/index.html.md.erb
+++ b/docs/manual/source/install/index.html.md.erb
@@ -25,7 +25,7 @@
 technologies that power Apache PredictionIO®.
 
 * Apache Hadoop 2.6.5 (optional, required only if YARN and HDFS are needed)
-* Apache Spark 1.6.3 for Hadoop 2.6
+* Apache Spark 2.0.2 for Hadoop 2.6
 * Java SE Development Kit 8
 
 and one of the following sets:
@@ -41,8 +41,8 @@
 * Apache HBase 0.98.5
 * Elasticsearch 1.7.6
 
-WARNING: **Note that support for Scala 2.10 and Spark 1.6.x are deprecated as of PredictionIO 0.13.0,
-and may be removed in a future release.**
+WARNING: **Note that support for Scala 2.10 and Spark 1.6 were removed as of PredictionIO 0.14.0.
+Note that support for Elasticsearch 1 is deprecated as of PredictionIO 0.14.0.**
 
 If you are running on a single machine, we recommend a minimum of 2GB memory.
 
diff --git a/docs/manual/source/install/install-sourcecode.html.md.erb b/docs/manual/source/install/install-sourcecode.html.md.erb
index ed69965..fe5de3d 100644
--- a/docs/manual/source/install/install-sourcecode.html.md.erb
+++ b/docs/manual/source/install/install-sourcecode.html.md.erb
@@ -27,10 +27,10 @@
 You can use pre-built binary distribution for Apache PredictionIO® if you are
 building against
 
-* Scala 2.11.8
-* Spark 2.1.1
-* Hadoop 2.7.3
-* Elasticsearch 5.5.2
+* Scala 2.11.12
+* Spark 2.1.3
+* Hadoop 2.7.7
+* Elasticsearch 5.6.9
 
 Download [binary release from an Apache
 mirror](https://www.apache.org/dyn/closer.lua/predictionio/<%= data.versions.pio
@@ -100,17 +100,14 @@
 ### Building
 
 Run the following at the directory where you downloaded the source code to build
-Apache PredictionIO®. By default, the build will be against
-
-* Scala 2.11.8
-* Spark 2.1.1
-* Hadoop 2.7.3
-* Elasticsearch 5.5.2
+Apache PredictionIO®.
+As an example, if you want to build PredictionIO to support Scala 2.11.12,
+Spark 2.4.0, and Elasticsearch 6.4.2, you can do
 
 ```
 $ tar zxvf apache-predictionio-<%= data.versions.pio %>.tar.gz
 $ cd apache-predictionio-<%= data.versions.pio %>
-$ ./make-distribution.sh
+$ ./make-distribution.sh -Dscala.version=2.11.12 -Dspark.version=2.4.0 -Delasticsearch.version=6.4.2
 ```
 
 You should see something like the following when it finishes building
@@ -136,17 +133,10 @@
 versions of dependencies. As of writing, one could build PredictionIO against
 these different dependencies:
 
-* Scala 2.10.x(deprecated), 2.11.x
-* Spark 1.6.x(deprecated), 2.0.x, 2.1.x
+* Scala 2.11.x
+* Spark 2.0.x, 2.1.x, 2.2.x, 2.3.x, 2.4.x
 * Hadoop 2.6.x, 2.7.x
-* Elasticsearch 1.7.x, 5.x
-
-As an example, if you want to build PredictionIO to support Scala 2.11.8,
-Spark 2.1.0, and Elasticsearch 5.3.0, you can do
-
-```
-$ ./make-distribution.sh -Dscala.version=2.11.8 -Dspark.version=2.1.0 -Delasticsearch.version=5.3.0
-```
+* Elasticsearch 1.7.x(deprecated), 5.6.x, 6.x
 
 ## Installing Dependencies
 
diff --git a/docs/manual/source/resources/upgrade.html.md b/docs/manual/source/resources/upgrade.html.md
index 70e4311..2cffc75 100644
--- a/docs/manual/source/resources/upgrade.html.md
+++ b/docs/manual/source/resources/upgrade.html.md
@@ -23,18 +23,104 @@
 
 # How to Upgrade
 
-To upgrade and use new version of PredictionIO, do the following:
+## Upgrade to 0.14.0
 
-- Download and build the new PredictionIO binary
-  [(instructions)](/install/install-sourcecode/).
-- Retain the setting from current `PredictionIO/conf/pio-env.sh` to the new
-  `PredictionIO/conf/pio-env.sh`.
-- If you have added `PredictionIO/bin` to your `PATH` environment variable before,
-  change it to the new `PredictionIO/bin` as well.
+This release adds Elasticsearch 6 support. See [pull request](https://github.com/apache/predictionio/pull/466) for details.
+Consequently, you must reindex your data.
 
-# Additional Notes for Specific Versions Upgrade
+1. Access your old cluster to check existing indices
 
-In addition, please take notes of the following for specific version upgrade.
+```
+$ curl -XGET 'http://localhost:9200/_cat/indices?v'
+health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
+yellow open   pio_event 6BAPz-DfQ2e9bICdVRr03g   5   1       1501            0    321.3kb        321.3kb
+yellow open   pio_meta  oxDMU1mGRn-vnXtAjmifSw   5   1          4            0     32.4kb         32.4kb
+
+$ curl -XGET "http://localhost:9200/pio_meta/_search" -d'
+{
+  "aggs": {
+    "typesAgg": {
+      "terms": {
+        "field": "_type",
+        "size": 200
+      }
+    }
+  },
+  "size": 0
+}'
+{"took":3,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":4,"max_score":0.0,"hits":[]},"aggregations":{"typesAgg":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"accesskeys","doc_count":1},{"key":"apps","doc_count":1},{"key":"engine_instances","doc_count":1},{"key":"sequences","doc_count":1}]}}}
+
+$ curl -XGET "http://localhost:9200/pio_event/_search" -d'
+{
+  "aggs": {
+    "typesAgg": {
+      "terms": {
+        "field": "_type",
+        "size": 200
+      }
+    }
+  },
+  "size": 0
+}'
+{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1501,"max_score":0.0,"hits":[]},"aggregations":{"typesAgg":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"1","doc_count":1501}]}}}
+```
+
+2. (Optional) Settings for new indices
+
+If you want to add specific settings associated with each index, we would recommend defining [Index Templates](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html).
+
+For example,
+
+```
+$ curl -H "Content-Type: application/json" -XPUT "http://localhost:9600/_template/pio_meta" -d'
+{
+  "index_patterns": ["pio_meta_*"],
+  "settings": {
+    "number_of_shards": 1,
+    "number_of_replicas": 1
+  }
+}'
+$ curl -H "Content-Type: application/json" -XPUT "http://localhost:9600/_template/pio_event" -d'
+{
+  "index_patterns": ["pio_event_*"],
+  "settings": {
+    "number_of_shards": 1,
+    "number_of_replicas": 1
+  }
+}'
+```
+
+3. [Reindex](https://www.elastic.co/guide/en/elasticsearch/reference/6.0/reindex-upgrade-remote.html)
+
+According to the following conversion table, you run the reindex every index that you need to migrate to your new cluster.
+
+| Old Cluster | New Cluster |
+| --------------- | ---------------- |
+| index: `pio_meta` type: `accesskeys` | index: `pio_meta_accesskeys` |
+| index: `pio_meta` type: `apps` | index: `pio_meta_apps` |
+| index: `pio_meta` type: `channels` | index: `pio_meta_channels` |
+| index: `pio_meta` type: `engine_instances` | index: `pio_meta_engine_instances` |
+| index: `pio_meta` type: `evaluation_instances` | index: `pio_meta_evaluation_instances` |
+| index: `pio_meta` type: `sequences` | index: `pio_meta_sequences` |
+| index: `pio_event` type: It depends on your use case. (e.g. `1`) | index: pio_event_<old_type> (e.g. `pio_event_1`) |
+
+For example,
+
+```
+$ curl -H "Content-Type: application/json" -XPOST "http://localhost:9600/_reindex" -d'
+{
+  "source": {
+    "remote": {
+      "host": "http://localhost:9200"
+    },
+    "index": "pio_meta",
+    "type": "accesskeys"
+  },
+  "dest": {
+    "index": "pio_meta_accesskeys"
+  }
+}'
+```
 
 ## Upgrade to 0.12.0
 
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/PythonEngine.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/PythonEngine.scala
new file mode 100644
index 0000000..a2c7282
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/PythonEngine.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.e2.engine
+
+import java.util.Arrays
+
+import org.apache.predictionio.controller._
+import org.apache.predictionio.workflow.KryoInstantiator
+import org.apache.spark.SparkContext
+import org.apache.spark.ml.PipelineModel
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+
+object PythonEngine extends EngineFactory {
+
+  private[engine] type Query = Map[String, Any]
+
+  def apply(): Engine[EmptyTrainingData, EmptyEvaluationInfo, EmptyPreparedData,
+    Query, Row, EmptyActualResult] = {
+    new Engine(
+      classOf[PythonDataSource],
+      classOf[PythonPreparator],
+      Map("default" -> classOf[PythonAlgorithm]),
+      classOf[PythonServing])
+  }
+
+  def models(model: PipelineModel): Array[Byte] = {
+    val kryo = KryoInstantiator.newKryoInjection
+    kryo(Seq(model))
+  }
+
+}
+
+import PythonEngine.Query
+
+class PythonDataSource extends
+  PDataSource[EmptyTrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] {
+  def readTraining(sc: SparkContext): EmptyTrainingData = new SerializableClass()
+}
+
+class PythonPreparator extends PPreparator[EmptyTrainingData, EmptyPreparedData] {
+  def prepare(sc: SparkContext, trainingData: EmptyTrainingData): EmptyPreparedData =
+    new SerializableClass()
+}
+
+object PythonServing {
+  private[engine] val columns = "PythonPredictColumns"
+
+  case class Params(columns: Seq[String]) extends org.apache.predictionio.controller.Params
+}
+
+class PythonServing(params: PythonServing.Params) extends LFirstServing[Query, Row] {
+  override def supplement(q: Query): Query = {
+    q + (PythonServing.columns -> params.columns)
+  }
+}
+
+class PythonAlgorithm extends
+  P2LAlgorithm[EmptyPreparedData, PipelineModel, Query, Row] {
+
+  def train(sc: SparkContext, data: EmptyPreparedData): PipelineModel = ???
+
+  def predict(model: PipelineModel, query: Query): Row = {
+    val selectCols = query(PythonServing.columns).asInstanceOf[Seq[String]]
+    val (colNames, data) = (query - PythonServing.columns).toList.unzip
+
+    val rows = Arrays.asList(Row.fromSeq(data))
+    val schema = StructType(colNames.zipWithIndex.map { case (col, i) =>
+      StructField(col, Literal(data(i)).dataType)
+    })
+
+    val spark = SparkSession.builder.getOrCreate()
+    val df = spark.createDataFrame(rows, schema)
+    model.transform(df)
+      .select(selectCols.head, selectCols.tail: _*)
+      .first()
+  }
+
+}
diff --git a/make-distribution.sh b/make-distribution.sh
index 7a34274..7d94a79 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -112,7 +112,12 @@
 cp LICENSE.txt ${TARDIR}
 cp NOTICE.txt ${TARDIR}
 
-tar zcvf ${TARNAME} ${TARDIR}
+# Allows override for `tar` command
+# This enables using GNU tar on systems such as macOS
+if [ -z "$TAR" ] ; then
+  TAR=tar
+fi
+$TAR zcvf ${TARNAME} ${TARDIR}
 rm -rf ${TARDIR}
 
 echo -e "\033[0;32mPredictionIO binary distribution created at $TARNAME\033[0m"
diff --git a/project/PIOBuild.scala b/project/PIOBuild.scala
index 30fca65..c8185a4 100644
--- a/project/PIOBuild.scala
+++ b/project/PIOBuild.scala
@@ -19,6 +19,7 @@
 
 object PIOBuild {
   val elasticsearchVersion = settingKey[String]("The version of Elasticsearch used for building")
+  val hbaseVersion = settingKey[String]("The version of Hbase used for building")
   val json4sVersion = settingKey[String]("The version of JSON4S used for building")
   val sparkVersion = settingKey[String]("The version of Apache Spark used for building")
   val sparkBinaryVersion = settingKey[String]("The binary version of Apache Spark used for building")
diff --git a/project/assembly.sbt b/project/assembly.sbt
index 39c1bb8..d95475f 100644
--- a/project/assembly.sbt
+++ b/project/assembly.sbt
@@ -1 +1 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7")
diff --git a/project/build.properties b/project/build.properties
index 64317fd..5f528e4 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=0.13.15
+sbt.version=1.2.3
\ No newline at end of file
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 15b986d..4239efe 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,17 +1,17 @@
-addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.3.2")
+addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
 
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
+addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
 
-addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.1.1")
+addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.3.15")
 
-addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1")
+addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3")
 
-addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
 
 resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
 
-addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")
+addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
 
-addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.2.0-M8")
+addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
 
 addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0")
\ No newline at end of file
diff --git a/project/unidoc.sbt b/project/unidoc.sbt
index f9c20f8..ade7d89 100644
--- a/project/unidoc.sbt
+++ b/project/unidoc.sbt
@@ -1 +1 @@
-addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.0")
+addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.2")
diff --git a/python/pypio/__init__.py b/python/pypio/__init__.py
index 04d8ac3..e0ca788 100644
--- a/python/pypio/__init__.py
+++ b/python/pypio/__init__.py
@@ -18,3 +18,12 @@
 """
 PyPIO is the Python API for PredictionIO.
 """
+
+from __future__ import absolute_import
+
+from pypio.pypio import *
+
+
+__all__ = [
+    'pypio'
+]
diff --git a/python/pypio/data/eventstore.py b/python/pypio/data/eventstore.py
index 4eb73df..58f09d1 100644
--- a/python/pypio/data/eventstore.py
+++ b/python/pypio/data/eventstore.py
@@ -17,8 +17,8 @@
 
 from __future__ import absolute_import
 
-from pypio.utils import new_string_array
 from pyspark.sql.dataframe import DataFrame
+from pyspark.sql import utils
 
 __all__ = ["PEventStore"]
 
@@ -43,7 +43,7 @@
         pes = self._sc._jvm.org.apache.predictionio.data.store.python.PPythonEventStore
         jdf = pes.aggregateProperties(app_name, entity_type, channel_name,
                                       start_time, until_time,
-                                      new_string_array(required, self._sc._gateway),
+                                      utils.toJArray(self._sc._gateway, self._sc._gateway.jvm.String, required),
                                       self._jss)
         return DataFrame(jdf, self.sql_ctx)
 
diff --git a/python/pypio/pypio.py b/python/pypio/pypio.py
new file mode 100644
index 0000000..3a32cd8
--- /dev/null
+++ b/python/pypio/pypio.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import atexit
+import json
+import os
+import sys
+
+from pypio.data import PEventStore
+from pypio.utils import dict_to_scalamap, list_to_dict
+from pypio.workflow import CleanupFunctions
+from pyspark.sql import SparkSession
+
+
+def init():
+    global spark
+    spark = SparkSession.builder.getOrCreate()
+    global sc
+    sc = spark.sparkContext
+    global sqlContext
+    sqlContext = spark._wrapped
+    global p_event_store
+    p_event_store = PEventStore(spark._jsparkSession, sqlContext)
+
+    cleanup_functions = CleanupFunctions(sqlContext)
+    atexit.register(lambda: cleanup_functions.run())
+    atexit.register(lambda: sc.stop())
+    print("Initialized pypio")
+
+
+def find_events(app_name):
+    """
+    Returns a dataset of the specified app.
+
+    :param app_name: app name
+    :return: :py:class:`pyspark.sql.DataFrame`
+    """
+    return p_event_store.find(app_name)
+
+
+def save_model(model, predict_columns):
+    """
+    Save a PipelineModel object to storage.
+
+    :param model: :py:class:`pyspark.ml.pipeline.PipelineModel`
+    :param predict_columns: prediction columns
+    :return: identifier for the trained model to use for predict
+    """
+    if not predict_columns:
+        raise ValueError("predict_columns should have more than one value")
+    if os.environ.get('PYSPARK_PYTHON') is None:
+        # spark-submit
+        d = list_to_dict(sys.argv[1:])
+        pio_env = list_to_dict([v for e in d['--env'].split(',') for v in e.split('=')])
+    else:
+        # pyspark
+        pio_env = {k: v for k, v in os.environ.items() if k.startswith('PIO_')}
+
+    meta_storage = sc._jvm.org.apache.predictionio.data.storage.Storage.getMetaDataEngineInstances()
+
+    meta = sc._jvm.org.apache.predictionio.data.storage.EngineInstance.apply(
+        "",
+        "INIT", # status
+        sc._jvm.org.joda.time.DateTime.now(), # startTime
+        sc._jvm.org.joda.time.DateTime.now(), # endTime
+        "org.apache.predictionio.e2.engine.PythonEngine", # engineId
+        "1", # engineVersion
+        "default", # engineVariant
+        "org.apache.predictionio.e2.engine.PythonEngine", # engineFactory
+        "", # batch
+        dict_to_scalamap(sc._jvm, pio_env), # env
+        sc._jvm.scala.Predef.Map().empty(), # sparkConf
+        "{\"\":{}}", # dataSourceParams
+        "{\"\":{}}", # preparatorParams
+        "[{\"default\":{}}]", # algorithmsParams
+        json.dumps({"":{"columns":[v for v in predict_columns]}}) # servingParams
+    )
+    id = meta_storage.insert(meta)
+
+    engine = sc._jvm.org.apache.predictionio.e2.engine.PythonEngine
+    data = sc._jvm.org.apache.predictionio.data.storage.Model(id, engine.models(model._to_java()))
+    model_storage = sc._jvm.org.apache.predictionio.data.storage.Storage.getModelDataModels()
+    model_storage.insert(data)
+
+    meta_storage.update(
+        sc._jvm.org.apache.predictionio.data.storage.EngineInstance.apply(
+            id, "COMPLETED", meta.startTime(), sc._jvm.org.joda.time.DateTime.now(),
+            meta.engineId(), meta.engineVersion(), meta.engineVariant(),
+            meta.engineFactory(), meta.batch(), meta.env(), meta.sparkConf(),
+            meta.dataSourceParams(), meta.preparatorParams(), meta.algorithmsParams(), meta.servingParams()
+        )
+    )
+
+    return id
+
diff --git a/python/pypio/utils.py b/python/pypio/utils.py
index 76900c3..4155efb 100644
--- a/python/pypio/utils.py
+++ b/python/pypio/utils.py
@@ -16,12 +16,38 @@
 #
 
 
-def new_string_array(list_data, gateway):
-    if list_data is None:
-        return None
-    string_class = gateway.jvm.String
-    args = gateway.new_array(string_class, len(list_data))
-    for i in range(len(list_data)):
-        args[i] = list_data[i]
-    return args
+def dict_to_scalamap(jvm, d):
+    """
+    Convert python dictionary to scala type map
 
+    :param jvm: sc._jvm
+    :param d: python type dictionary
+    """
+    if d is None:
+        return None
+    sm = jvm.scala.Predef.Map().empty()
+    for k, v in d.items():
+        sm = sm.updated(k, v)
+    return sm
+
+def list_to_dict(l):
+    """
+    Convert python list to python dictionary
+
+    :param l: python type list
+
+    >>> list = ["key1", 1, "key2", 2, "key3", 3]
+    >>> list_to_dict(list) == {'key1': 1, 'key2': 2, 'key3': 3}
+    True
+    """
+    if l is None:
+        return None
+    return dict(zip(l[0::2], l[1::2]))
+
+
+if __name__ == "__main__":
+    import doctest
+    import sys
+    (failure_count, test_count) = doctest.testmod()
+    if failure_count:
+        sys.exit(-1)
\ No newline at end of file
diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt
index 754aefb..3dbd3de 100644
--- a/storage/elasticsearch/build.sbt
+++ b/storage/elasticsearch/build.sbt
@@ -19,15 +19,13 @@
 
 name := "apache-predictionio-data-elasticsearch"
 
-elasticsearchSparkArtifact := (if (majorVersion(sparkVersion.value) == 2) "elasticsearch-spark-20" else "elasticsearch-spark-13")
-
-elasticsearchVersion := (if (majorVersion(elasticsearchVersion.value) < 5) "5.5.2" else elasticsearchVersion.value)
+elasticsearchVersion := (if (majorVersion(elasticsearchVersion.value) < 5) "5.6.9" else elasticsearchVersion.value)
 
 libraryDependencies ++= Seq(
   "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
   "org.apache.spark"        %% "spark-core"               % sparkVersion.value % "provided",
-  "org.elasticsearch.client" % "rest"                     % elasticsearchVersion.value,
-  "org.elasticsearch"       %% elasticsearchSparkArtifact.value % elasticsearchVersion.value
+  "org.elasticsearch.client" % "elasticsearch-rest-client" % elasticsearchVersion.value,
+  "org.elasticsearch"       %% "elasticsearch-spark-20" % elasticsearchVersion.value
     exclude("org.apache.spark", "*"),
   "org.elasticsearch"        % "elasticsearch-hadoop-mr"  % elasticsearchVersion.value,
   "org.specs2"              %% "specs2"                   % "2.3.13" % "test")
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 73ef1d0..eef83e4 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -40,17 +40,15 @@
     extends AccessKeys with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "accesskeys"
-
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  private val internalIndex = index + "_" + estype
+   
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("key" -> ("type" -> "keyword")) ~
         ("events" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(accessKey: AccessKey): Option[String] = {
     val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -65,7 +63,7 @@
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -79,11 +77,11 @@
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -93,10 +91,10 @@
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error("Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -107,10 +105,10 @@
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+      ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error("Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -121,7 +119,7 @@
       val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -130,11 +128,11 @@
         case "created" =>
         case "updated" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 
@@ -142,18 +140,18 @@
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val json = parse(EntityUtils.toString(response.getEntity))
       val result = (json \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/id")
+          error(s"[$result] Failed to update $internalIndex/$estype/id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/id", e)
+        error(s"Failed to update $internalIndex/$estype/id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index ba48065..26621cf 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -41,17 +41,15 @@
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
   private val seq = new ESSequences(client, config, index)
+  private val internalIndex = index + "_" + estype
 
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("id" -> ("type" -> "keyword")) ~
         ("name" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(app: App): Option[Int] = {
     val id = app.id match {
@@ -74,7 +72,7 @@
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -88,11 +86,11 @@
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -106,7 +104,7 @@
       val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/_search",
+        s"/$internalIndex/$estype/_search",
         Map.empty[String, String].asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -119,7 +117,7 @@
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         None
     }
   }
@@ -129,10 +127,10 @@
       val json =
         ("query" ->
           ("match_all" -> Nil))
-      ESUtils.getAll[App](client, index, estype, compact(render(json)))
+      ESUtils.getAll[App](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error("Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -143,7 +141,7 @@
       val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -152,11 +150,11 @@
         case "created" =>
         case "updated" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 
@@ -164,18 +162,18 @@
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val json = parse(EntityUtils.toString(response.getEntity))
       val result = (json \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/id", e)
+        error(s"Failed to update $internalIndex/$estype/id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index b5eb5c8..ac248de 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -40,16 +40,14 @@
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
   private val seq = new ESSequences(client, config, index)
-  
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  private val internalIndex = index + "_" + estype
+
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("name" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(channel: Channel): Option[Int] = {
     val id = channel.id match {
@@ -72,7 +70,7 @@
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -86,11 +84,11 @@
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -101,10 +99,10 @@
         ("query" ->
           ("term" ->
             ("appid" -> appid)))
-      ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
+      ESUtils.getAll[Channel](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -115,7 +113,7 @@
       val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val json = parse(EntityUtils.toString(response.getEntity))
@@ -124,12 +122,12 @@
         case "created" => true
         case "updated" => true
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
           false
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
         false
     }
   }
@@ -138,18 +136,18 @@
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       val result = (jsonResponse \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index eec5b64..96f8a67 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -40,13 +40,11 @@
     extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
-  
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  private val internalIndex = index + "_" + estype
+
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("status" -> ("type" -> "keyword")) ~
         ("startTime" -> ("type" -> "date")) ~
@@ -59,9 +57,9 @@
         ("dataSourceParams" -> ("type" -> "keyword")) ~
         ("preparatorParams" -> ("type" -> "keyword")) ~
         ("algorithmsParams" -> ("type" -> "keyword")) ~
-        ("servingParams" -> ("type" -> "keyword")) ~
-        ("status" -> ("type" -> "keyword"))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+        ("servingParams" -> ("type" -> "keyword"))
+        ))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(i: EngineInstance): String = {
     val id = i.id match {
@@ -86,7 +84,7 @@
       val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/",
+        s"/$internalIndex/$estype/",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -95,12 +93,12 @@
         case "created" =>
           Some((jsonResponse \ "_id").extract[String])
         case _ =>
-          error(s"[$result] Failed to create $index/$estype")
+          error(s"[$result] Failed to create $internalIndex/$estype")
           None
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to create $index/$estype", e)
+        error(s"Failed to create $internalIndex/$estype", e)
         None
     }
   }
@@ -109,7 +107,7 @@
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -123,11 +121,11 @@
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -137,10 +135,10 @@
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -165,10 +163,10 @@
               ("sort" -> List(
                 ("startTime" ->
                   ("order" -> "desc"))))
-      ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
+        error(s"Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -188,7 +186,7 @@
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -197,11 +195,11 @@
         case "created" =>
         case "updated" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 
@@ -209,18 +207,18 @@
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val json = parse(EntityUtils.toString(response.getEntity))
       val result = (json \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 1706583..0025950 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -28,7 +28,6 @@
 import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
 import org.apache.predictionio.data.storage.EvaluationInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
 import org.elasticsearch.client.{ResponseException, RestClient}
 import org.json4s._
 import org.json4s.JsonDSL._
@@ -42,13 +41,11 @@
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
   private val seq = new ESSequences(client, config, index)
-  
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  private val internalIndex = index + "_" + estype
+
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("status" -> ("type" -> "keyword")) ~
         ("startTime" -> ("type" -> "date")) ~
@@ -59,7 +56,7 @@
         ("evaluatorResults" -> ("type" -> "text")) ~
         ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
         ("evaluatorResultsJSON" -> ("enabled" -> false))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def insert(i: EvaluationInstance): String = {
     val id = i.id match {
@@ -82,7 +79,7 @@
     try {
       val response = client.performRequest(
         "GET",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map.empty[String, String].asJava)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
       (jsonResponse \ "found").extract[Boolean] match {
@@ -96,11 +93,11 @@
         e.getResponse.getStatusLine.getStatusCode match {
           case 404 => None
           case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
+            error(s"Failed to access to /$internalIndex/$estype/$id", e)
             None
         }
       case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+        error(s"Failed to access to /$internalIndex/$estype/$id", e)
         None
     }
   }
@@ -110,10 +107,10 @@
       val json =
         ("query" ->
           ("match_all" -> List.empty))
-      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error("Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -127,10 +124,10 @@
             ("sort" ->
               ("startTime" ->
                 ("order" -> "desc")))
-      ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+      ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json)))
     } catch {
       case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
+        error("Failed to access to /$internalIndex/$estype/_search", e)
         Nil
     }
   }
@@ -141,7 +138,7 @@
       val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava,
         entity)
       val json = parse(EntityUtils.toString(response.getEntity))
@@ -150,11 +147,11 @@
         case "created" =>
         case "updated" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 
@@ -162,18 +159,18 @@
     try {
       val response = client.performRequest(
         "DELETE",
-        s"/$index/$estype/$id",
+        s"/$internalIndex/$estype/$id",
         Map("refresh" -> "true").asJava)
       val json = parse(EntityUtils.toString(response.getEntity))
       val result = (json \ "result").extract[String]
       result match {
         case "deleted" =>
         case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
+          error(s"[$result] Failed to update $internalIndex/$estype/$id")
       }
     } catch {
       case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+        error(s"Failed to update $internalIndex/$estype/$id", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 185be92..708d3d3 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -38,7 +38,7 @@
 import grizzled.slf4j.Logging
 import org.apache.http.message.BasicHeader
 
-class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String)
     extends LEvents with Logging {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
@@ -52,12 +52,10 @@
 
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
-    ESUtils.createIndex(client, index,
-      ESUtils.getNumberOfShards(config, index.toUpperCase),
-      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+    val index = baseIndex + "_" + estype
+    ESUtils.createIndex(client, index)
     val json =
       (estype ->
-        ("_all" -> ("enabled" -> false)) ~
         ("properties" ->
           ("name" -> ("type" -> "keyword")) ~
           ("eventId" -> ("type" -> "keyword")) ~
@@ -77,6 +75,7 @@
 
   override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     try {
       val json =
         ("query" ->
@@ -107,6 +106,7 @@
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val id = event.eventId.getOrElse {
           ESEventsUtil.getBase64UUID
@@ -152,6 +152,7 @@
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val ids = events.map { event =>
           event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
@@ -214,6 +215,7 @@
   }
 
   private def exists(client: RestClient, estype: String, id: Int): Boolean = {
+    val index = baseIndex + "_" + estype
     try {
       client.performRequest(
         "GET",
@@ -242,6 +244,7 @@
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val json =
           ("query" ->
@@ -275,6 +278,7 @@
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val json =
           ("query" ->
@@ -311,6 +315,7 @@
     (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val query = ESUtils.createEventQuery(
           startTime, untilTime, entityType, entityId,
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 75f7639..a86d378 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,7 +41,7 @@
 import org.json4s.ext.JodaTimeSerializers
 
 
-class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String)
     extends PEvents {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
@@ -78,6 +78,7 @@
       eventNames, targetEntityType, targetEntityId, None)
 
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     val conf = new Configuration()
     conf.set("es.resource", s"$index/$estype")
     conf.set("es.query", query)
@@ -97,6 +98,7 @@
     events: RDD[Event],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes())
     events.map { event =>
       ESEventsUtil.eventToPut(event, appId)
@@ -107,6 +109,7 @@
     eventIds: RDD[String],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
       eventIds.foreachPartition { iter =>
         iter.foreach { eventId =>
           try {
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 018ef85..ade0f40 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -21,7 +21,6 @@
 
 import scala.collection.JavaConverters._
 
-import org.apache.http.Header
 import org.apache.http.entity.ContentType
 import org.apache.http.nio.entity.NStringEntity
 import org.apache.http.util.EntityUtils
@@ -38,23 +37,21 @@
 class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging {
   implicit val formats = DefaultFormats
   private val estype = "sequences"
+  private val internalIndex = index + "_" + estype
 
-  ESUtils.createIndex(client, index,
-    ESUtils.getNumberOfShards(config, index.toUpperCase),
-    ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+  ESUtils.createIndex(client, internalIndex)
   val mappingJson =
     (estype ->
-      ("_all" -> ("enabled" -> false)) ~
       ("properties" ->
         ("n" -> ("enabled" -> false))))
-  ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+  ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
 
   def genNext(name: String): Long = {
     try {
       val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
       val response = client.performRequest(
         "POST",
-        s"/$index/$estype/$name",
+        s"/$internalIndex/$estype/$name",
         Map("refresh" -> "false").asJava,
         entity)
       val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -65,11 +62,11 @@
         case "updated" =>
           (jsonResponse \ "_version").extract[Long]
         case _ =>
-          throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name")
+          throw new IllegalStateException(s"[$result] Failed to update $internalIndex/$estype/$name")
       }
     } catch {
       case e: IOException =>
-        throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
+        throw new StorageClientException(s"Failed to update $internalIndex/$estype/$name", e)
     }
   }
 }
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index cd9aa53..93d5d94 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -21,7 +21,6 @@
 import scala.collection.JavaConverters._
 
 import org.apache.http.entity.ContentType
-import org.apache.http.entity.StringEntity
 import org.apache.http.nio.entity.NStringEntity
 import org.elasticsearch.client.RestClient
 import org.json4s._
@@ -165,23 +164,16 @@
 
   def createIndex(
     client: RestClient,
-    index: String,
-    numberOfShards: Option[Int],
-    numberOfReplicas: Option[Int]): Unit = {
+    index: String): Unit = {
     client.performRequest(
       "HEAD",
       s"/$index",
       Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
         case 404 =>
-          val json = ("settings" ->
-            ("number_of_shards" -> numberOfShards) ~
-            ("number_of_replicas" -> numberOfReplicas))
-          val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
           client.performRequest(
             "PUT",
             s"/$index",
-            Map.empty[String, String].asJava,
-            entity)
+            Map.empty[String, String].asJava)
         case 200 =>
         case _ =>
           throw new IllegalStateException(s"/$index is invalid.")
@@ -269,14 +261,6 @@
     (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
   }
 
-  def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = {
-    config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
-  }
-
-  def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = {
-    config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
-  }
-
   def getEventDataRefresh(config: StorageClientConfig): String = {
     config.properties.getOrElse("EVENTDATA_REFRESH", "true")
   }
diff --git a/storage/hbase/build.sbt b/storage/hbase/build.sbt
index 1e904fa..5e412b0 100644
--- a/storage/hbase/build.sbt
+++ b/storage/hbase/build.sbt
@@ -22,11 +22,11 @@
 libraryDependencies ++= Seq(
   "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided",
   "org.apache.spark"        %% "spark-core"     % sparkVersion.value % "provided",
-  "org.apache.hbase"         % "hbase-common"   % "0.98.5-hadoop2",
-  "org.apache.hbase"         % "hbase-client"   % "0.98.5-hadoop2"
+  "org.apache.hbase"         % "hbase-common"   % hbaseVersion.value,
+  "org.apache.hbase"         % "hbase-client"   % hbaseVersion.value
     exclude("org.apache.zookeeper", "zookeeper"),
   // added for Parallel storage interface
-  "org.apache.hbase"         % "hbase-server"   % "0.98.5-hadoop2"
+  "org.apache.hbase"         % "hbase-server"   % hbaseVersion.value
     exclude("org.apache.hbase", "hbase-client")
     exclude("org.apache.zookeeper", "zookeeper")
     exclude("javax.servlet", "servlet-api")
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
index d31e592..4fa8b9f 100644
--- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -20,12 +20,10 @@
 import java.sql.{DriverManager, ResultSet}
 
 import com.github.nscala_time.time.Imports._
-import org.apache.predictionio.data.storage.{
-  DataMap, Event, PEvents, StorageClientConfig}
-import org.apache.predictionio.data.SparkVersionDependent
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.{JdbcRDD, RDD}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.json4s.JObject
 import org.json4s.native.Serialization
 import scalikejdbc._
@@ -121,7 +119,7 @@
   }
 
   def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-    val sqlSession = SparkVersionDependent.sqlSession(sc)
+    val sqlSession = SparkSession.builder().getOrCreate()
     import sqlSession.implicits._
 
     val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
diff --git a/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala
index d96b37d..6408f37 100644
--- a/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala
+++ b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala
@@ -17,21 +17,21 @@
 
 package org.apache.predictionio.data.storage.s3
 
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
 import org.apache.predictionio.data.storage.BaseStorageClient
 import org.apache.predictionio.data.storage.StorageClientConfig
-
-import com.amazonaws.auth.profile.ProfileCredentialsProvider
 import com.amazonaws.client.builder.AwsClientBuilder
 import com.amazonaws.services.s3.AmazonS3
 import com.amazonaws.services.s3.AmazonS3ClientBuilder
-
 import grizzled.slf4j.Logging
 
 class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
     with Logging {
   override val prefix = "S3"
   val client: AmazonS3 = {
-    val builder = AmazonS3ClientBuilder.standard().withCredentials(new ProfileCredentialsProvider())
+    val builder = AmazonS3ClientBuilder
+                    .standard()
+                    .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
     (config.properties.get("ENDPOINT"), config.properties.get("REGION")) match {
       case (Some(endpoint), Some(region)) =>
         builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
diff --git a/tests/build_docker.sh b/tests/build_docker.sh
index 2c6f8f8..0adf3b0 100755
--- a/tests/build_docker.sh
+++ b/tests/build_docker.sh
@@ -35,11 +35,14 @@
   mv $SPARK_ARCHIVE $DIR/docker-files/
 fi
 
+set -e
+
 ./make-distribution.sh \
     -Dscala.version=$PIO_SCALA_VERSION \
     -Dspark.version=$PIO_SPARK_VERSION \
     -Dhadoop.version=$PIO_HADOOP_VERSION \
-    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION
+    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION \
+    -Dhbase.version=$PIO_HBASE_VERSION
 sbt/sbt clean storage/clean
 
 assembly_folder=assembly/src/universal/lib
diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml
index e0eda34..0c196f1 100644
--- a/tests/docker-compose.yml
+++ b/tests/docker-compose.yml
@@ -21,7 +21,7 @@
       - xpack.security.enabled=false
       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
   hbase:
-    image: harisekhon/hbase:1.0
+    image: harisekhon/hbase:${HBASE_TAG}
   postgres:
     image: postgres:9
     environment:
diff --git a/tests/unit.sh b/tests/unit.sh
index 1421dce..a9c84fc 100755
--- a/tests/unit.sh
+++ b/tests/unit.sh
@@ -32,13 +32,15 @@
     -Dscala.version=$PIO_SCALA_VERSION \
     -Dspark.version=$PIO_SPARK_VERSION \
     -Dhadoop.version=$PIO_HADOOP_VERSION \
-    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION
+    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION \
+    -Dhbase.version=$PIO_HBASE_VERSION
 
 # Run all unit tests
 sbt/sbt dataJdbc/compile test storage/test \
     -Dscala.version=$PIO_SCALA_VERSION \
     -Dspark.version=$PIO_SPARK_VERSION \
     -Dhadoop.version=$PIO_HADOOP_VERSION \
-    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION
+    -Delasticsearch.version=$PIO_ELASTICSEARCH_VERSION \
+    -Dhbase.version=$PIO_HBASE_VERSION
 
 popd
diff --git a/tools/build.sbt b/tools/build.sbt
index 9375f2a..acdb1fe 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -21,11 +21,11 @@
 name := "apache-predictionio-tools"
 
 libraryDependencies ++= Seq(
-  "com.github.zafarkhaja"  %  "java-semver"    % "0.9.0",
-  "org.apache.spark"       %% "spark-sql"      % sparkVersion.value % "provided",
-  "com.typesafe.akka"      %% "akka-slf4j"     % akkaVersion.value,
-  "io.spray"               %% "spray-testkit"  % "1.3.3" % "test",
-  "org.specs2"             %% "specs2"         % "2.3.13" % "test")
+  "com.github.zafarkhaja"  %  "java-semver"       % "0.9.0",
+  "org.apache.spark"       %% "spark-sql"         % sparkVersion.value % "provided",
+  "com.typesafe.akka"      %% "akka-slf4j"        % akkaVersion.value,
+  "com.typesafe.akka"      %% "akka-http-testkit" % "10.1.5" % "test",
+  "org.specs2"             %% "specs2-core"       % "4.2.0" % "test")
 
 assemblyMergeStrategy in assembly := {
   case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
index 236d3ba..50b9337 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -51,19 +51,15 @@
     verbose: Boolean = false): Expected[(Process, () => Unit)] = {
 
     val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI)
-    val variantJson = wa.variantJson.getOrElse(new File(engineDirPath, "engine.json"))
-    val ei = Console.getEngineInfo(
-      variantJson,
-      engineDirPath)
-    val args = Seq(
-      "--engine-id",
-      ei.engineId,
-      "--engine-version",
-      ei.engineVersion,
-      "--engine-variant",
-      variantJson.toURI.toString,
-      "--verbosity",
-      wa.verbosity.toString) ++
+    val args =
+      (if (wa.mainPyFile.isEmpty) {
+        val variantJson = wa.variantJson.getOrElse(new File(engineDirPath, "engine.json"))
+        val ei = Console.getEngineInfo(variantJson, engineDirPath)
+        Seq(
+          "--engine-id", ei.engineId,
+          "--engine-version", ei.engineVersion,
+          "--engine-variant", variantJson.toURI.toString)
+      } else Nil) ++
       wa.engineFactory.map(
         x => Seq("--engine-factory", x)).getOrElse(Nil) ++
       wa.engineParamsKey.map(
@@ -72,19 +68,15 @@
       (if (verbose) Seq("--verbose") else Nil) ++
       (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Nil) ++
       (if (wa.stopAfterRead) Seq("--stop-after-read") else Nil) ++
-      (if (wa.stopAfterPrepare) {
-        Seq("--stop-after-prepare")
-      } else {
-        Nil
-      }) ++
+      (if (wa.stopAfterPrepare) Seq("--stop-after-prepare") else Nil) ++
       wa.evaluation.map(x => Seq("--evaluation-class", x)).
         getOrElse(Nil) ++
       // If engineParamsGenerator is specified, it overrides the evaluation.
       wa.engineParamsGenerator.orElse(wa.evaluation)
         .map(x => Seq("--engine-params-generator-class", x))
         .getOrElse(Nil) ++
-      (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++
-      Seq("--json-extractor", wa.jsonExtractor.toString)
+      Seq("--json-extractor", wa.jsonExtractor.toString,
+          "--verbosity", wa.verbosity.toString)
 
     val resourceName = wa.mainPyFile match {
       case Some(x) => x
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
index 7e8fd30..d8bb79f 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
@@ -14,118 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.predictionio.tools.admin
 
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.event.Logging
-import akka.io.IO
-import akka.util.Timeout
-import org.apache.predictionio.data.api.StartServer
-import org.apache.predictionio.data.storage.Storage
-import org.json4s.{Formats, DefaultFormats}
-
 import java.util.concurrent.TimeUnit
 
-import spray.can.Http
-import spray.http.{MediaTypes, StatusCodes}
-import spray.httpx.Json4sSupport
-import spray.routing._
+import akka.http.scaladsl.server._
+import org.apache.predictionio.data.storage._
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.util.Timeout
+import org.apache.predictionio.akkahttpjson4s.Json4sSupport._
+import org.json4s.{DefaultFormats, Formats}
 
-class AdminServiceActor(val commandClient: CommandClient)
-  extends HttpServiceActor {
-
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats
-  }
-
-  import Json4sProtocol._
-
-  val log = Logging(context.system, this)
-
-  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
-  // Futures
-  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
-  implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
-
-  // for better message response
-  val rejectionHandler = RejectionHandler {
-    case MalformedRequestContentRejection(msg, _) :: _ =>
-      complete(StatusCodes.BadRequest, Map("message" -> msg))
-    case MissingQueryParamRejection(msg) :: _ =>
-      complete(StatusCodes.NotFound,
-        Map("message" -> s"missing required query parameter ${msg}."))
-    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
-      complete(StatusCodes.Unauthorized, challengeHeaders,
-        Map("message" -> s"Invalid accessKey."))
-  }
-
-  val jsonPath = """(.+)\.json$""".r
-
-  val route: Route =
-    pathSingleSlash {
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete(Map("status" -> "alive"))
-        }
-      }
-    } ~
-      path("cmd" / "app" / Segment / "data") {
-        appName => {
-          delete {
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete(commandClient.futureAppDataDelete(appName))
-            }
-          }
-        }
-      } ~
-      path("cmd" / "app" / Segment) {
-        appName => {
-          delete {
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete(commandClient.futureAppDelete(appName))
-            }
-          }
-        }
-      } ~
-      path("cmd" / "app") {
-        get {
-          respondWithMediaType(MediaTypes.`application/json`) {
-            complete(commandClient.futureAppList())
-          }
-        } ~
-          post {
-            entity(as[AppRequest]) {
-              appArgs => respondWithMediaType(MediaTypes.`application/json`) {
-                complete(commandClient.futureAppNew(appArgs))
-              }
-            }
-          }
-      }
-  def receive: Actor.Receive = runRoute(route)
-}
-
-class AdminServerActor(val commandClient: CommandClient) extends Actor {
-  val log = Logging(context.system, this)
-  val child = context.actorOf(
-    Props(classOf[AdminServiceActor], commandClient),
-    "AdminServiceActor")
-
-  implicit val system = context.system
-
-  def receive: PartialFunction[Any, Unit] = {
-    case StartServer(host, portNum) => {
-      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
-
-    }
-    case m: Http.Bound => log.info("Bound received. AdminServer is ready.")
-    case m: Http.CommandFailed => log.error("Command failed.")
-    case _ => log.error("Unknown message.")
-  }
+object Json4sProtocol {
+  implicit val serialization = org.json4s.jackson.Serialization
+  implicit def json4sFormats: Formats = DefaultFormats
 }
 
 case class AdminServerConfig(
@@ -134,8 +43,23 @@
 )
 
 object AdminServer {
-  def createAdminServer(config: AdminServerConfig): ActorSystem = {
-    implicit val system = ActorSystem("AdminServerSystem")
+  import Json4sProtocol._
+
+  private implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
+
+  // for better message response
+  private val rejectionHandler = RejectionHandler.newBuilder().handle {
+    case MalformedRequestContentRejection(msg, _) =>
+      complete(StatusCodes.BadRequest, Map("message" -> msg))
+    case MissingQueryParamRejection(msg) =>
+      complete(StatusCodes.NotFound,
+        Map("message" -> s"missing required query parameter ${msg}."))
+    case AuthenticationFailedRejection(cause, challengeHeaders) =>
+      complete(StatusCodes.Unauthorized, challengeHeaders,
+        Map("message" -> s"Invalid accessKey."))
+  }.result()
+
+  def createRoute()(implicit executionContext: ExecutionContext): Route = {
 
     val commandClient = new CommandClient(
       appClient = Storage.getMetaDataApps,
@@ -143,19 +67,63 @@
       eventClient = Storage.getLEvents()
     )
 
-    val serverActor = system.actorOf(
-      Props(classOf[AdminServerActor], commandClient),
-      "AdminServerActor")
-    serverActor ! StartServer(config.ip, config.port)
+    val route =
+      pathSingleSlash {
+        get {
+          complete(Map("status" -> "alive"))
+        }
+      } ~
+      path("cmd" / "app" / Segment / "data") {
+        appName => {
+          delete {
+            complete(commandClient.futureAppDataDelete(appName))
+          }
+        }
+      } ~
+      path("cmd" / "app" / Segment) {
+        appName => {
+          delete {
+            complete(commandClient.futureAppDelete(appName))
+          }
+        }
+      } ~
+      path("cmd" / "app") {
+        get {
+          complete(commandClient.futureAppList())
+        } ~
+        post {
+          entity(as[AppRequest]) {
+            appArgs =>
+              onSuccess(commandClient.futureAppNew(appArgs)){
+                case res: GeneralResponse => complete(res)
+                case res: AppNewResponse  => complete(res)
+              }
+          }
+        }
+      }
+
+    route
+  }
+
+
+  def createAdminServer(config: AdminServerConfig): ActorSystem = {
+    implicit val system = ActorSystem("AdminServerSystem")
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
+
+    val route = createRoute()
+    Http().bindAndHandle(route, config.ip, config.port)
     system
   }
 }
 
 object AdminRun {
   def main (args: Array[String]) : Unit = {
-    AdminServer.createAdminServer(AdminServerConfig(
+    val f = AdminServer.createAdminServer(AdminServerConfig(
       ip = "localhost",
       port = 7071))
-    .awaitTermination
+    .whenTerminated
+
+    Await.ready(f, Duration.Inf)
   }
 }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
index 666b572..7944665 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md
@@ -26,7 +26,7 @@
 $ set -a
 $ source conf/pio-env.sh
 $ set +a
-$ sbt/sbt "tools/run-main org.apache.predictionio.tools.admin.AdminRun"
+$ sbt/sbt "tools/runMain org.apache.predictionio.tools.admin.AdminRun"
 ```
 
 ### Unit test (Very minimal)
@@ -35,7 +35,7 @@
 $ set -a
 $ source conf/pio-env.sh
 $ set +a
-$ sbt/sbt "tools/test-only org.apache.predictionio.tools.admin.AdminAPISpec"
+$ sbt/sbt "tools/testOnly org.apache.predictionio.tools.admin.AdminAPISpec"
 ```
 
 ### Start with pio command adminserver
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
index 8380695..20d0ed9 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
@@ -218,22 +218,23 @@
     if (verifyResult.isLeft) {
       return Left(verifyResult.left.get)
     }
-    val ei = Console.getEngineInfo(
-      serverArgs.variantJson.getOrElse(new File(engineDirPath, "engine.json")),
-      engineDirPath)
     val engineInstances = storage.Storage.getMetaDataEngineInstances
-    val engineInstance = engineInstanceId map { eid =>
-      engineInstances.get(eid)
-    } getOrElse {
-      engineInstances.getLatestCompleted(
-        ei.engineId, ei.engineVersion, ei.variantId)
-    }
-    engineInstance map { r =>
-      RunServer.runServer(
-        r.id, serverArgs, sparkArgs, pioHome, engineDirPath, verbose)
-    } getOrElse {
-      engineInstanceId map { eid =>
+    engineInstanceId map { eid =>
+      engineInstances.get(eid).map { r =>
+        RunServer.runServer(
+          r.id, serverArgs, sparkArgs, pioHome, engineDirPath, verbose)
+      } getOrElse {
         logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
+      }
+    } getOrElse {
+      val ei = Console.getEngineInfo(
+        serverArgs.variantJson.getOrElse(new File(engineDirPath, "engine.json")),
+        engineDirPath)
+
+      engineInstances.getLatestCompleted(
+        ei.engineId, ei.engineVersion, ei.variantId).map { r =>
+        RunServer.runServer(
+          r.id, serverArgs, sparkArgs, pioHome, engineDirPath, verbose)
       } getOrElse {
         logAndFail(s"No valid engine instance found for engine ${ei.engineId} " +
           s"${ei.engineVersion}.\nTry running 'train' before 'deploy'. Aborting.")
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
index cd71fdd..54f5d3f 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
@@ -25,7 +25,7 @@
 import org.apache.predictionio.tools.EitherLogging
 import org.apache.predictionio.tools.Common
 import org.apache.predictionio.tools.ReturnTypes._
-import org.apache.predictionio.tools.dashboard.Dashboard
+import org.apache.predictionio.tools.dashboard.DashboardServer
 import org.apache.predictionio.tools.dashboard.DashboardConfig
 import org.apache.predictionio.tools.admin.AdminServer
 import org.apache.predictionio.tools.admin.AdminServerConfig
@@ -62,7 +62,7 @@
     */
   def dashboard(da: DashboardArgs): ActorSystem = {
     info(s"Creating dashboard at ${da.ip}:${da.port}")
-    Dashboard.createDashboard(DashboardConfig(
+    DashboardServer.createDashboard(DashboardConfig(
       ip = da.ip,
       port = da.port))
   }
@@ -109,7 +109,7 @@
     val sparkHomePath = Common.getSparkHome(sparkHome)
     if (new File(s"$sparkHomePath/bin/spark-submit").exists) {
       info(s"Apache Spark is installed at $sparkHomePath")
-      val sparkMinVersion = "1.6.3"
+      val sparkMinVersion = "2.0.2"
       pioStatus = pioStatus.copy(
         sparkHome = sparkHomePath,
         sparkMinVersion = sparkMinVersion)
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
index ef4581b..1b4c8a8 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
@@ -17,16 +17,13 @@
 
 package org.apache.predictionio.tools.console
 
-import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
-  DeployArgs, BatchPredictArgs}
-import org.apache.predictionio.tools.commands.{
-  DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
-  BuildArgs, EngineArgs, Management, Engine, Import, Export,
-  App => AppCmd, AccessKey => AccessKeysCmd}
+import org.apache.predictionio.tools.{BatchPredictArgs, DeployArgs, EventServerArgs, ServerArgs, SparkArgs, WorkflowArgs}
+import org.apache.predictionio.tools.commands.{AdminServerArgs, BuildArgs, DashboardArgs, Engine, EngineArgs, Export, ExportArgs, Import, ImportArgs, Management, AccessKey => AccessKeysCmd, App => AppCmd}
 import org.apache.predictionio.tools.ReturnTypes._
-
 import grizzled.slf4j.Logging
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 import scala.language.implicitConversions
 import scala.sys.process._
 
@@ -116,17 +113,17 @@
         ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose))
 
   def dashboard(da: DashboardArgs): Int = {
-    Management.dashboard(da).awaitTermination
+    Await.ready(Management.dashboard(da).whenTerminated, Duration.Inf)
     0
   }
 
   def eventserver(ea: EventServerArgs): Int = {
-    Management.eventserver(ea).awaitTermination
+    Await.ready(Management.eventserver(ea).whenTerminated, Duration.Inf)
     0
   }
 
   def adminserver(aa: AdminServerArgs): Int = {
-    Management.adminserver(aa).awaitTermination
+    Await.ready(Management.adminserver(aa).whenTerminated, Duration.Inf)
     0
   }
 
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
index 1026996..0a1031d 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
@@ -18,60 +18,33 @@
 
 package org.apache.predictionio.tools.dashboard
 
-// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea
+// Reference from: https://gist.github.com/jeroenr/5261fa041d592f37cd80
 
-import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins}
-import spray.http.HttpHeaders._
-import spray.http.HttpEntity
-import spray.routing._
-import spray.http.StatusCodes
-import spray.http.ContentTypes
+import akka.http.scaladsl.model.HttpMethods._
+import akka.http.scaladsl.model.{StatusCodes, HttpResponse}
+import akka.http.scaladsl.model.headers._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{Directive0, Route}
+import com.typesafe.config.ConfigFactory
 
-// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
-trait CORSSupport {
-  this: HttpService =>
+trait CorsSupport {
 
-  private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
-  private val optionsCorsHeaders = List(
-    `Access-Control-Allow-Headers`("""Origin,
-                                      |X-Requested-With,
-                                      |Content-Type,
-                                      |Accept,
-                                      |Accept-Encoding,
-                                      |Accept-Language,
-                                      |Host,
-                                      |Referer,
-                                      |User-Agent""".stripMargin.replace("\n", " ")),
-    `Access-Control-Max-Age`(1728000)
-  )
-
-  def cors[T]: Directive0 = mapRequestContext { ctx =>
-    ctx.withRouteResponseHandling {
-      // OPTION request for a resource that responds to other methods
-      case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) &&
-          x.exists(_.isInstanceOf[MethodRejection])) => {
-        val allowedMethods: List[HttpMethod] = x.collect {
-          case rejection: MethodRejection => rejection.supported
-        }
-        ctx.complete {
-          HttpResponse().withHeaders(
-            `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) ::
-            allowOriginHeader ::
-            optionsCorsHeaders
-          )
-        }
-      }
-    }.withHttpResponseHeadersMapped { headers =>
-      allowOriginHeader :: headers
-    }
+  // this directive adds access control headers to normal responses
+  private def addAccessControlHeaders: Directive0 = {
+    respondWithHeaders(
+      `Access-Control-Allow-Origin`.forRange(HttpOriginRange.`*`),
+      `Access-Control-Allow-Credentials`(true),
+      `Access-Control-Allow-Headers`("Authorization", "Content-Type", "X-Requested-With")
+    )
   }
 
-  override def timeoutRoute: StandardRoute = complete {
-    HttpResponse(
-      StatusCodes.InternalServerError,
-      HttpEntity(ContentTypes.`text/plain(UTF-8)`,
-          "The server was not able to produce a timely response to your request."),
-      List(allowOriginHeader)
-    )
+  // this handles preflight OPTIONS requests.
+  private def preflightRequestHandler: Route = options {
+    complete(HttpResponse(StatusCodes.OK)
+      .withHeaders(`Access-Control-Allow-Methods`(OPTIONS, POST, PUT, GET, DELETE)))
+  }
+
+  def corsHandler(r: Route): Route = addAccessControlHeaders {
+    preflightRequestHandler ~ r
   }
 }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
index 7d651b1..ddbf715 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
@@ -18,22 +18,23 @@
 
 package org.apache.predictionio.tools.dashboard
 
-import com.typesafe.config.ConfigFactory
 import org.apache.predictionio.authentication.KeyAuthentication
-import org.apache.predictionio.configuration.SSLConfiguration
 import org.apache.predictionio.data.storage.Storage
-import spray.can.server.ServerSettings
-import scala.concurrent.ExecutionContext
-import akka.actor.{ActorContext, Actor, ActorSystem, Props}
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import akka.actor.ActorSystem
+import akka.http.scaladsl.server.directives.FutureDirectives.onSuccess
 import com.github.nscala_time.time.Imports.DateTime
 import grizzled.slf4j.Logging
-import spray.can.Http
-import spray.http._
-import spray.http.MediaTypes._
-import spray.routing._
+import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.directives._
+import akka.http.scaladsl.server._
+import akka.stream.ActorMaterializer
+import akka.http.scaladsl.model.ContentTypes._
+import com.typesafe.config.ConfigFactory
+import org.apache.predictionio.configuration.SSLConfiguration
 
 import scala.concurrent.duration._
 
@@ -41,7 +42,8 @@
   ip: String = "localhost",
   port: Int = 9000)
 
-object Dashboard extends Logging with SSLConfiguration {
+object Dashboard extends Logging {
+
   def main(args: Array[String]): Unit = {
     val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") {
       opt[String]("ip") action { (x, c) =>
@@ -53,108 +55,108 @@
     }
 
     parser.parse(args, DashboardConfig()) map { dc =>
-      createDashboard(dc).awaitTermination
+      val f = DashboardServer.createDashboard(dc).whenTerminated
+      Await.result(f, Duration.Inf)
     }
   }
 
+}
+
+object DashboardServer extends KeyAuthentication with CorsSupport with SSLConfiguration {
+
   def createDashboard(dc: DashboardConfig): ActorSystem = {
     val systemName = "pio-dashboard"
     implicit val system = ActorSystem(systemName)
-    val service =
-      system.actorOf(Props(classOf[DashboardActor], dc), "dashboard")
-    implicit val timeout = Timeout(5.seconds)
-    val settings = ServerSettings(system)
+    implicit val materializer = ActorMaterializer()
+    implicit val executionContext = system.dispatcher
     val serverConfig = ConfigFactory.load("server.conf")
     val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced")
-    IO(Http) ? Http.Bind(
-      service,
-      interface = dc.ip,
-      port = dc.port,
-      settings = Some(settings.copy(sslEncryption = sslEnforced)))
+    val route = createRoute(DateTime.now, dc)
+    if(sslEnforced){
+      val https: HttpsConnectionContext = ConnectionContext.https(sslContext)
+      Http().setDefaultServerHttpContext(https)
+      Http().bindAndHandle(route, dc.ip, dc.port, connectionContext = https)
+    } else {
+      Http().bindAndHandle(route, dc.ip, dc.port)
+    }
     system
   }
-}
 
-class DashboardActor(
-    val dc: DashboardConfig)
-  extends Actor with DashboardService {
-  def actorRefFactory: ActorContext = context
-  def receive: Actor.Receive = runRoute(dashboardRoute)
-}
+  def createRoute(serverStartTime: DateTime, dc: DashboardConfig)
+                 (implicit executionContext: ExecutionContext): Route = {
+    val evaluationInstances = Storage.getMetaDataEvaluationInstances
+    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
 
-trait DashboardService extends HttpService with KeyAuthentication with CORSSupport {
-
-  implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
-  val dc: DashboardConfig
-  val evaluationInstances = Storage.getMetaDataEvaluationInstances
-  val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
-  val serverStartTime = DateTime.now
-  val dashboardRoute =
-    path("") {
-      authenticate(withAccessKeyFromFile) { request =>
-        get {
-          respondWithMediaType(`text/html`) {
-            complete {
-              val completedInstances = evaluationInstances.getCompleted
-              html.index(
-                dc,
-                serverStartTime,
-                pioEnvVars,
-                completedInstances).toString
-            }
-          }
+    def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]):
+        AuthenticationDirective[T] = {
+      extractRequestContext.flatMap { requestContext =>
+        onSuccess(authenticator(requestContext)).flatMap {
+          case Right(x) => provide(x)
+          case Left(x)  => reject(x): Directive1[T]
         }
       }
-    } ~
-    pathPrefix("engine_instances" / Segment) { instanceId =>
-      path("evaluator_results.txt") {
-        get {
-          respondWithMediaType(`text/plain`) {
+    }
+
+    val route: Route =
+      path("") {
+        authenticate(withAccessKeyFromFile) { request =>
+          get {
+            val completedInstances = evaluationInstances.getCompleted
+            complete(HttpResponse(entity = HttpEntity(
+                `text/html(UTF-8)`,
+                 html.index(dc, serverStartTime, pioEnvVars, completedInstances).toString
+            )))
+          }
+        }
+      } ~
+      pathPrefix("engine_instances" / Segment) { instanceId =>
+        path("evaluator_results.txt") {
+          get {
             evaluationInstances.get(instanceId).map { i =>
               complete(i.evaluatorResults)
             } getOrElse {
               complete(StatusCodes.NotFound)
             }
           }
-        }
-      } ~
-      path("evaluator_results.html") {
-        get {
-          respondWithMediaType(`text/html`) {
-            evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsHTML)
-            } getOrElse {
-              complete(StatusCodes.NotFound)
-            }
-          }
-        }
-      } ~
-      path("evaluator_results.json") {
-        get {
-          respondWithMediaType(`application/json`) {
-            evaluationInstances.get(instanceId).map { i =>
-              complete(i.evaluatorResultsJSON)
-            } getOrElse {
-              complete(StatusCodes.NotFound)
-            }
-          }
-        }
-      } ~
-      cors {
-        path("local_evaluator_results.json") {
+        } ~
+        path("evaluator_results.html") {
           get {
-            respondWithMediaType(`application/json`) {
+            evaluationInstances.get(instanceId).map { i =>
+              complete(HttpResponse(
+                entity = HttpEntity(`text/html(UTF-8)`, i.evaluatorResultsHTML)))
+            } getOrElse {
+              complete(StatusCodes.NotFound)
+            }
+          }
+        } ~
+        path("evaluator_results.json") {
+          get {
+            evaluationInstances.get(instanceId).map { i =>
+              complete(HttpResponse(
+                entity = HttpEntity(`application/json`, i.evaluatorResultsJSON)))
+            } getOrElse {
+              complete(StatusCodes.NotFound)
+            }
+          }
+        } ~
+        corsHandler {
+          path("local_evaluator_results.json") {
+            get {
               evaluationInstances.get(instanceId).map { i =>
-                complete(i.evaluatorResultsJSON)
+                complete(HttpResponse(
+                  entity = HttpEntity(`application/json`, i.evaluatorResultsJSON)))
               } getOrElse {
                 complete(StatusCodes.NotFound)
               }
             }
           }
+        } ~
+        pathPrefix("assets") {
+          getFromResourceDirectory("assets")
         }
       }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    }
+
+    route
+  }
+
 }
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
index 0372a44..9b6dbb5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -21,14 +21,12 @@
 import org.apache.predictionio.controller.Utils
 import org.apache.predictionio.data.storage.EventJson4sSupport
 import org.apache.predictionio.data.storage.Storage
-import org.apache.predictionio.data.SparkVersionDependent
 import org.apache.predictionio.tools.Runner
 import org.apache.predictionio.workflow.WorkflowContext
 import org.apache.predictionio.workflow.WorkflowUtils
 import org.apache.predictionio.workflow.CleanupFunctions
-
 import grizzled.slf4j.Logging
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.json4s.native.Serialization._
 
 case class EventsToFileArgs(
@@ -93,7 +91,7 @@
           mode = "Export",
           batch = "App ID " + args.appId + channelStr,
           executorEnv = Runner.envStringToMap(args.env))
-        val sqlSession = SparkVersionDependent.sqlSession(sc)
+        val sqlSession = SparkSession.builder().getOrCreate()
         val events = Storage.getPEvents()
         val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
         val jsonStringRdd = eventsRdd.map(write(_))
diff --git a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
index e6c8bd3..e554ebf 100644
--- a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
+++ b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala
@@ -17,67 +17,19 @@
 
 package org.apache.predictionio.tools.admin
 
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.apache.predictionio.data.storage.Storage
 import org.specs2.mutable.Specification
-import spray.http._
-import spray.httpx.RequestBuilding._
-import spray.util._
+import akka.http.scaladsl.testkit.Specs2RouteTest
 
-
-class AdminAPISpec extends Specification{
-
-  val system = ActorSystem(Utils.actorSystemNameFrom(getClass))
-  val config = AdminServerConfig(
-    ip = "localhost",
-    port = 7071)
-
-  val commandClient = new CommandClient(
-    appClient = Storage.getMetaDataApps,
-    accessKeyClient = Storage.getMetaDataAccessKeys,
-    eventClient = Storage.getLEvents()
-  )
-
-  val adminActor= system.actorOf(Props(classOf[AdminServiceActor], commandClient))
+class AdminAPISpec extends Specification with Specs2RouteTest {
+  val route = AdminServer.createRoute()
 
   "GET / request" should {
     "properly produce OK HttpResponses" in {
-      val probe = TestProbe()(system)
-      probe.send(adminActor, Get("/"))
-
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":"alive"}"""
-          )
-        )
-      )
-      success
+      Get() ~> route ~> check {
+        response.status.intValue() shouldEqual 200
+        responseAs[String] shouldEqual """{"status":"alive"}"""
+      }
     }
   }
 
-  "GET /cmd/app request" should {
-    "properly produce OK HttpResponses" in {
-      /*
-      val probe = TestProbe()(system)
-      probe.send(adminActor,Get("/cmd/app"))
-
-      //TODO: Need to convert the response string to the corresponding case object to assert some properties on the object
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":1}"""
-          )
-        )
-      )*/
-      pending
-    }
-  }
-
-  step(system.shutdown())
 }