[GEARPUMP-252] Return meaningful result than app id when submitting application in ClientContext

Author: huafengw <fvunicorn@gmail.com>

Closes #133 from huafengw/refactorClient.
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 34582dd..48b95d8 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -23,8 +23,9 @@
 import akka.actor.{ActorRef, ActorSystem}
 import akka.util.Timeout
 import com.typesafe.config.{Config, ConfigValueFactory}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
-import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.cluster.master.MasterProxy
 import org.apache.gearpump.jarstore.JarStoreClient
@@ -32,10 +33,11 @@
 import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
 import org.slf4j.Logger
 
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /**
  * ClientContext is a user facing util to submit/manage an application.
@@ -43,7 +45,6 @@
  * TODO: add interface to query master here
  */
 class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-
   def this(system: ActorSystem) = {
     this(system.settings.config, system, null)
   }
@@ -53,20 +54,20 @@
   }
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
-  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
   implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
   LOG.info(s"Starting system ${system.name}")
-  val shouldCleanupSystem = Option(sys).isEmpty
-
   private val jarStoreClient = new JarStoreClient(config, system)
+  private val masterClientTimeout = {
+    val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
+    Timeout(timeout, TimeUnit.SECONDS)
+  }
 
   private lazy val master: ActorRef = {
     val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
       .flatMap(Util.parseHostList)
     val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
       s"masterproxy${system.name}"))
-    LOG.info(s"Creating master proxy ${master} for master list: $masters")
+    LOG.info(s"Creating master proxy $master for master list: $masters")
     master
   }
 
@@ -75,26 +76,25 @@
    * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
    * not send the jar across the wire.
    */
-  def submit(app: Application): Int = {
+  def submit(app: Application): RunningApplication = {
     submit(app, System.getProperty(GEARPUMP_APP_JAR))
   }
 
-  def submit(app: Application, jar: String): Int = {
-    submit(app, jar, getExecutorNum())
+  def submit(app: Application, jar: String): RunningApplication = {
+    submit(app, jar, getExecutorNum)
   }
 
-  def submit(app: Application, jar: String, executorNum: Int): Int = {
-    val client = getMasterClient
+  def submit(app: Application, jar: String, executorNum: Int): RunningApplication = {
     val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
     val submissionConfig = getSubmissionConfig(config)
       .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
     val appDescription =
       AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
     val appJar = Option(jar).map(loadFile)
-    client.submitApplication(appDescription, appJar)
+    submitApplication(SubmitApplication(appDescription, appJar))
   }
 
-  private def getExecutorNum(): Int = {
+  private def getExecutorNum: Int = {
     Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
   }
 
@@ -102,8 +102,11 @@
     ClusterConfig.filterOutDefaultConfig(config)
   }
 
+  def listApps: AppMastersData = {
+    ActorUtil.askActor[AppMastersData](master, AppMastersDataRequest, masterClientTimeout)
+  }
+
   def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
-    import scala.concurrent.ExecutionContext.Implicits.global
     val result = Await.result(
       ActorUtil.askAppMaster[ReplayApplicationResult](master,
         appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
@@ -111,27 +114,29 @@
   }
 
   def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
-    import scala.concurrent.ExecutionContext.Implicits.global
     ActorUtil.askAppMaster[T](master, appId, msg)
   }
 
-  def listApps: AppMastersData = {
-    val client = getMasterClient
-    client.listApplications
-  }
-
   def shutdown(appId: Int): Unit = {
-    val client = getMasterClient
-    client.shutdownApplication(appId)
+    val result = ActorUtil.askActor[ShutdownApplicationResult](master,
+      ShutdownApplication(appId), masterClientTimeout)
+    result.appId match {
+      case Success(_) =>
+      case Failure(ex) => throw ex
+    }
   }
 
   def resolveAppID(appId: Int): ActorRef = {
-    val client = getMasterClient
-    client.resolveAppId(appId)
+    val result = ActorUtil.askActor[ResolveAppIdResult](master,
+      ResolveAppId(appId), masterClientTimeout)
+    result.appMaster match {
+      case Success(appMaster) => appMaster
+      case Failure(ex) => throw ex
+    }
   }
 
   def close(): Unit = {
-    if (shouldCleanupSystem) {
+    if (sys == null) {
       LOG.info(s"Shutting down system ${system.name}")
       system.terminate()
     }
@@ -161,9 +166,18 @@
     fullName
   }
 
-  private def getMasterClient: MasterClient = {
-    val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
-    new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
+  private def submitApplication(submitApplication: SubmitApplication): RunningApplication = {
+    val result = ActorUtil.askActor[SubmitApplicationResult](master,
+      submitApplication, masterClientTimeout)
+    val application = result.appId match {
+      case Success(appId) =>
+        // scalastyle:off println
+        Console.println(s"Submit application succeed. The application id is $appId")
+        // scalastyle:on println
+        new RunningApplication(appId, master, masterClientTimeout)
+      case Failure(ex) => throw ex
+    }
+    application
   }
 }
 
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
deleted file mode 100644
index 77ebedf..0000000
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.client
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-import akka.util.Timeout
-
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * Client to inter-operate with Master node.
- *
- * NOTE: Stateless, thread safe
- */
-class MasterClient(master: ActorRef, timeout: Timeout) {
-  implicit val masterClientTimeout = timeout
-
-  def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
-    val result = Await.result(
-      (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
-      Duration.Inf)
-    val appId = result.appId match {
-      case Success(appId) =>
-        // scalastyle:off println
-        Console.println(s"Submit application succeed. The application id is $appId")
-        // scalastyle:on println
-        appId
-      case Failure(ex) => throw ex
-    }
-    appId
-  }
-
-  def resolveAppId(appId: Int): ActorRef = {
-    val result = Await.result(
-      (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
-    result.appMaster match {
-      case Success(appMaster) => appMaster
-      case Failure(ex) => throw ex
-    }
-  }
-
-  def shutdownApplication(appId: Int): Unit = {
-    val result = Await.result(
-      (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
-      Duration.Inf)
-    result.appId match {
-      case Success(_) =>
-      case Failure(ex) => throw ex
-    }
-  }
-
-  def listApplications: AppMastersData = {
-    val result = Await.result(
-      (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
-    result
-  }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
new file mode 100644
index 0000000..153c824
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.client
+
+import akka.pattern.ask
+import akka.actor.ActorRef
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.util.ActorUtil
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) {
+  lazy val appMaster: Future[ActorRef] = resolveAppMaster(appId)
+
+  def shutDown(): Unit = {
+    val result = ActorUtil.askActor[ShutdownApplicationResult](master,
+      ShutdownApplication(appId), timeout)
+    result.appId match {
+      case Success(_) =>
+      case Failure(ex) => throw ex
+    }
+  }
+
+  def askAppMaster[T](msg: Any): Future[T] = {
+    appMaster.flatMap(_.ask(msg)(timeout).asInstanceOf[Future[T]])
+  }
+
+  private def resolveAppMaster(appId: Int): Future[ActorRef] = {
+    master.ask(ResolveAppId(appId))(timeout).
+      asInstanceOf[Future[ResolveAppIdResult]].map(_.appMaster.get)
+  }
+}
+
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 09f2969..82c7fe2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -21,13 +21,12 @@
 import org.apache.gearpump.cluster.AppMasterContext
 import org.apache.gearpump.cluster.worker.WorkerId
 
-import scala.concurrent.{ExecutionContext, Future}
-
+import scala.concurrent.{Await, ExecutionContext, Future}
 import akka.actor.Actor.Receive
 import akka.actor._
 import akka.pattern.ask
 import org.slf4j.Logger
-
+import akka.util.Timeout
 import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers}
 import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
 import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -36,6 +35,8 @@
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import org.apache.gearpump.transport.HostPort
 
+import scala.concurrent.duration.Duration
+
 object ActorUtil {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
@@ -136,4 +137,13 @@
     implicit val timeout = Constants.FUTURE_TIMEOUT
     (actor ? msg).asInstanceOf[Future[T]]
   }
+
+  def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = {
+    askActor(actor, msg, timeout, ActorRef.noSender)
+  }
+
+  def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef)
+    (implicit ex: ExecutionContext): T = {
+    Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf)
+  }
 }
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
new file mode 100644
index 0000000..5f0d5e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "RunningApplication" should "be able to shutdown application" in {
+    val errorMsg = "mock exception"
+    val master = TestProbe()
+    val timeout = Timeout(90, TimeUnit.SECONDS)
+    val application = new RunningApplication(1, master.ref, timeout)
+    Future {
+      application.shutDown()
+    }
+    master.expectMsg(ShutdownApplication(1))
+    master.reply(ShutdownApplicationResult(Success(1)))
+
+    val result = Future {
+      intercept[Exception] {
+        application.shutDown()
+      }
+    }
+    master.expectMsg(ShutdownApplication(1))
+    master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg))))
+    val exception = Await.result(result, Duration.Inf)
+    assert(exception.getMessage.equals(errorMsg))
+  }
+
+  "RunningApplication" should "be able to ask appmaster" in {
+    val master = TestProbe()
+    val appMaster = TestProbe()
+    val appId = 1
+    val timeout = Timeout(90, TimeUnit.SECONDS)
+    val request = MockAskAppMasterRequest("request")
+    val application = new RunningApplication(appId, master.ref, timeout)
+    val future = application.askAppMaster[MockAskAppMasterResponse](request)
+    master.expectMsg(ResolveAppId(appId))
+    master.reply(ResolveAppIdResult(Success(appMaster.ref)))
+    appMaster.expectMsg(MockAskAppMasterRequest("request"))
+    appMaster.reply(MockAskAppMasterResponse("response"))
+    val result = Await.result(future, Duration.Inf)
+    assert(result.res.equals("response"))
+
+    // ResolveAppId should not be called multiple times
+    val future2 = application.askAppMaster[MockAskAppMasterResponse](request)
+    appMaster.expectMsg(MockAskAppMasterRequest("request"))
+    appMaster.reply(MockAskAppMasterResponse("response"))
+    val result2 = Await.result(future2, Duration.Inf)
+    assert(result2.res.equals("response"))
+  }
+}
+
+object RunningApplicationSpec {
+  case class MockAskAppMasterRequest(req: String)
+
+  case class MockAskAppMasterResponse(res: String)
+}
\ No newline at end of file
diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
index c4eec07..6db8531 100644
--- a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
+++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
@@ -33,9 +33,9 @@
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distributed shell submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistShellAppMaster]("DistributedShell",
+    val app = context.submit(Application[DistShellAppMaster]("DistributedShell",
     UserConfig.empty))
     context.close()
-    LOG.info(s"Distributed Shell Application started with appId $appId !")
+    LOG.info(s"Distributed Shell Application started with appId ${app.appId} !")
   }
 }
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
index df7a517..655389b 100644
--- a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
@@ -33,9 +33,9 @@
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distribute Service submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+    val app = context.submit(Application[DistServiceAppMaster]("DistributedService",
       UserConfig.empty))
     context.close()
-    LOG.info(s"Distribute Service Application started with appId $appId !")
+    LOG.info(s"Distribute Service Application started with appId ${app.appId} !")
   }
 }
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 544a4eb..df1de06 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -163,7 +163,7 @@
       .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
     val app = StreamApplication(name, processorGraph, config)
     LOG.info(s"jar file uploaded to $uploadedJarLocation")
-    val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
+    val appId = clientContext.submit(app, uploadedJarLocation, workerNum).appId
     applications += name -> appId
     topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
     LOG.info(s"Storm Application $appId submitted")
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 62a431a..ed15121 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -169,7 +169,7 @@
           }
 
           val effectiveConfig = if (userConfig == null) UserConfig.empty else userConfig
-          val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph))
+          val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)).appId
 
           import upickle.default.write
           val submitApplicationResultValue = SubmitApplicationResultValue(appId)
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index b8d1f4c..f5b2910 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -21,7 +21,7 @@
 import java.util.Collection
 
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication}
 import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
 import org.apache.gearpump.streaming.source.DataSource
 
@@ -42,7 +42,7 @@
     new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
   }
 
-  def submit(): Int = {
+  def submit(): RunningApplication = {
     context.submit(streamApp)
   }
 }