[GEARPUMP-252] Return meaningful result than app id when submitting application in ClientContext
Author: huafengw <fvunicorn@gmail.com>
Closes #133 from huafengw/refactorClient.
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 34582dd..48b95d8 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -23,8 +23,9 @@
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigValueFactory}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
-import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster._
import org.apache.gearpump.cluster.master.MasterProxy
import org.apache.gearpump.jarstore.JarStoreClient
@@ -32,10 +33,11 @@
import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
import org.slf4j.Logger
+import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/**
* ClientContext is a user facing util to submit/manage an application.
@@ -43,7 +45,6 @@
* TODO: add interface to query master here
*/
class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-
def this(system: ActorSystem) = {
this(system.settings.config, system, null)
}
@@ -53,20 +54,20 @@
}
private val LOG: Logger = LogUtil.getLogger(getClass)
- private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
LOG.info(s"Starting system ${system.name}")
- val shouldCleanupSystem = Option(sys).isEmpty
-
private val jarStoreClient = new JarStoreClient(config, system)
+ private val masterClientTimeout = {
+ val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
+ Timeout(timeout, TimeUnit.SECONDS)
+ }
private lazy val master: ActorRef = {
val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
.flatMap(Util.parseHostList)
val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
s"masterproxy${system.name}"))
- LOG.info(s"Creating master proxy ${master} for master list: $masters")
+ LOG.info(s"Creating master proxy $master for master list: $masters")
master
}
@@ -75,26 +76,25 @@
* defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
* not send the jar across the wire.
*/
- def submit(app: Application): Int = {
+ def submit(app: Application): RunningApplication = {
submit(app, System.getProperty(GEARPUMP_APP_JAR))
}
- def submit(app: Application, jar: String): Int = {
- submit(app, jar, getExecutorNum())
+ def submit(app: Application, jar: String): RunningApplication = {
+ submit(app, jar, getExecutorNum)
}
- def submit(app: Application, jar: String, executorNum: Int): Int = {
- val client = getMasterClient
+ def submit(app: Application, jar: String, executorNum: Int): RunningApplication = {
val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
val submissionConfig = getSubmissionConfig(config)
.withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
val appDescription =
AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
val appJar = Option(jar).map(loadFile)
- client.submitApplication(appDescription, appJar)
+ submitApplication(SubmitApplication(appDescription, appJar))
}
- private def getExecutorNum(): Int = {
+ private def getExecutorNum: Int = {
Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
}
@@ -102,8 +102,11 @@
ClusterConfig.filterOutDefaultConfig(config)
}
+ def listApps: AppMastersData = {
+ ActorUtil.askActor[AppMastersData](master, AppMastersDataRequest, masterClientTimeout)
+ }
+
def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
- import scala.concurrent.ExecutionContext.Implicits.global
val result = Await.result(
ActorUtil.askAppMaster[ReplayApplicationResult](master,
appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
@@ -111,27 +114,29 @@
}
def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
- import scala.concurrent.ExecutionContext.Implicits.global
ActorUtil.askAppMaster[T](master, appId, msg)
}
- def listApps: AppMastersData = {
- val client = getMasterClient
- client.listApplications
- }
-
def shutdown(appId: Int): Unit = {
- val client = getMasterClient
- client.shutdownApplication(appId)
+ val result = ActorUtil.askActor[ShutdownApplicationResult](master,
+ ShutdownApplication(appId), masterClientTimeout)
+ result.appId match {
+ case Success(_) =>
+ case Failure(ex) => throw ex
+ }
}
def resolveAppID(appId: Int): ActorRef = {
- val client = getMasterClient
- client.resolveAppId(appId)
+ val result = ActorUtil.askActor[ResolveAppIdResult](master,
+ ResolveAppId(appId), masterClientTimeout)
+ result.appMaster match {
+ case Success(appMaster) => appMaster
+ case Failure(ex) => throw ex
+ }
}
def close(): Unit = {
- if (shouldCleanupSystem) {
+ if (sys == null) {
LOG.info(s"Shutting down system ${system.name}")
system.terminate()
}
@@ -161,9 +166,18 @@
fullName
}
- private def getMasterClient: MasterClient = {
- val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
- new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
+ private def submitApplication(submitApplication: SubmitApplication): RunningApplication = {
+ val result = ActorUtil.askActor[SubmitApplicationResult](master,
+ submitApplication, masterClientTimeout)
+ val application = result.appId match {
+ case Success(appId) =>
+ // scalastyle:off println
+ Console.println(s"Submit application succeed. The application id is $appId")
+ // scalastyle:on println
+ new RunningApplication(appId, master, masterClientTimeout)
+ case Failure(ex) => throw ex
+ }
+ application
}
}
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
deleted file mode 100644
index 77ebedf..0000000
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.client
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.{Failure, Success}
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-import akka.util.Timeout
-
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * Client to inter-operate with Master node.
- *
- * NOTE: Stateless, thread safe
- */
-class MasterClient(master: ActorRef, timeout: Timeout) {
- implicit val masterClientTimeout = timeout
-
- def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
- val result = Await.result(
- (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
- Duration.Inf)
- val appId = result.appId match {
- case Success(appId) =>
- // scalastyle:off println
- Console.println(s"Submit application succeed. The application id is $appId")
- // scalastyle:on println
- appId
- case Failure(ex) => throw ex
- }
- appId
- }
-
- def resolveAppId(appId: Int): ActorRef = {
- val result = Await.result(
- (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
- result.appMaster match {
- case Success(appMaster) => appMaster
- case Failure(ex) => throw ex
- }
- }
-
- def shutdownApplication(appId: Int): Unit = {
- val result = Await.result(
- (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
- Duration.Inf)
- result.appId match {
- case Success(_) =>
- case Failure(ex) => throw ex
- }
- }
-
- def listApplications: AppMastersData = {
- val result = Await.result(
- (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
- result
- }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
new file mode 100644
index 0000000..153c824
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.client
+
+import akka.pattern.ask
+import akka.actor.ActorRef
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.util.ActorUtil
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) {
+ lazy val appMaster: Future[ActorRef] = resolveAppMaster(appId)
+
+ def shutDown(): Unit = {
+ val result = ActorUtil.askActor[ShutdownApplicationResult](master,
+ ShutdownApplication(appId), timeout)
+ result.appId match {
+ case Success(_) =>
+ case Failure(ex) => throw ex
+ }
+ }
+
+ def askAppMaster[T](msg: Any): Future[T] = {
+ appMaster.flatMap(_.ask(msg)(timeout).asInstanceOf[Future[T]])
+ }
+
+ private def resolveAppMaster(appId: Int): Future[ActorRef] = {
+ master.ask(ResolveAppId(appId))(timeout).
+ asInstanceOf[Future[ResolveAppIdResult]].map(_.appMaster.get)
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 09f2969..82c7fe2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -21,13 +21,12 @@
import org.apache.gearpump.cluster.AppMasterContext
import org.apache.gearpump.cluster.worker.WorkerId
-import scala.concurrent.{ExecutionContext, Future}
-
+import scala.concurrent.{Await, ExecutionContext, Future}
import akka.actor.Actor.Receive
import akka.actor._
import akka.pattern.ask
import org.slf4j.Logger
-
+import akka.util.Timeout
import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers}
import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -36,6 +35,8 @@
import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
import org.apache.gearpump.transport.HostPort
+import scala.concurrent.duration.Duration
+
object ActorUtil {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -136,4 +137,13 @@
implicit val timeout = Constants.FUTURE_TIMEOUT
(actor ? msg).asInstanceOf[Future[T]]
}
+
+ def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = {
+ askActor(actor, msg, timeout, ActorRef.noSender)
+ }
+
+ def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef)
+ (implicit ex: ExecutionContext): T = {
+ Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf)
+ }
}
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
new file mode 100644
index 0000000..5f0d5e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ implicit var system: ActorSystem = _
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ "RunningApplication" should "be able to shutdown application" in {
+ val errorMsg = "mock exception"
+ val master = TestProbe()
+ val timeout = Timeout(90, TimeUnit.SECONDS)
+ val application = new RunningApplication(1, master.ref, timeout)
+ Future {
+ application.shutDown()
+ }
+ master.expectMsg(ShutdownApplication(1))
+ master.reply(ShutdownApplicationResult(Success(1)))
+
+ val result = Future {
+ intercept[Exception] {
+ application.shutDown()
+ }
+ }
+ master.expectMsg(ShutdownApplication(1))
+ master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg))))
+ val exception = Await.result(result, Duration.Inf)
+ assert(exception.getMessage.equals(errorMsg))
+ }
+
+ "RunningApplication" should "be able to ask appmaster" in {
+ val master = TestProbe()
+ val appMaster = TestProbe()
+ val appId = 1
+ val timeout = Timeout(90, TimeUnit.SECONDS)
+ val request = MockAskAppMasterRequest("request")
+ val application = new RunningApplication(appId, master.ref, timeout)
+ val future = application.askAppMaster[MockAskAppMasterResponse](request)
+ master.expectMsg(ResolveAppId(appId))
+ master.reply(ResolveAppIdResult(Success(appMaster.ref)))
+ appMaster.expectMsg(MockAskAppMasterRequest("request"))
+ appMaster.reply(MockAskAppMasterResponse("response"))
+ val result = Await.result(future, Duration.Inf)
+ assert(result.res.equals("response"))
+
+ // ResolveAppId should not be called multiple times
+ val future2 = application.askAppMaster[MockAskAppMasterResponse](request)
+ appMaster.expectMsg(MockAskAppMasterRequest("request"))
+ appMaster.reply(MockAskAppMasterResponse("response"))
+ val result2 = Await.result(future2, Duration.Inf)
+ assert(result2.res.equals("response"))
+ }
+}
+
+object RunningApplicationSpec {
+ case class MockAskAppMasterRequest(req: String)
+
+ case class MockAskAppMasterResponse(res: String)
+}
\ No newline at end of file
diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
index c4eec07..6db8531 100644
--- a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
+++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
@@ -33,9 +33,9 @@
override def main(akkaConf: Config, args: Array[String]): Unit = {
LOG.info(s"Distributed shell submitting application...")
val context = ClientContext(akkaConf)
- val appId = context.submit(Application[DistShellAppMaster]("DistributedShell",
+ val app = context.submit(Application[DistShellAppMaster]("DistributedShell",
UserConfig.empty))
context.close()
- LOG.info(s"Distributed Shell Application started with appId $appId !")
+ LOG.info(s"Distributed Shell Application started with appId ${app.appId} !")
}
}
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
index df7a517..655389b 100644
--- a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
@@ -33,9 +33,9 @@
override def main(akkaConf: Config, args: Array[String]): Unit = {
LOG.info(s"Distribute Service submitting application...")
val context = ClientContext(akkaConf)
- val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+ val app = context.submit(Application[DistServiceAppMaster]("DistributedService",
UserConfig.empty))
context.close()
- LOG.info(s"Distribute Service Application started with appId $appId !")
+ LOG.info(s"Distribute Service Application started with appId ${app.appId} !")
}
}
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 544a4eb..df1de06 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -163,7 +163,7 @@
.withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
val app = StreamApplication(name, processorGraph, config)
LOG.info(s"jar file uploaded to $uploadedJarLocation")
- val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
+ val appId = clientContext.submit(app, uploadedJarLocation, workerNum).appId
applications += name -> appId
topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
LOG.info(s"Storm Application $appId submitted")
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 62a431a..ed15121 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -169,7 +169,7 @@
}
val effectiveConfig = if (userConfig == null) UserConfig.empty else userConfig
- val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph))
+ val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)).appId
import upickle.default.write
val submitApplicationResultValue = SubmitApplicationResultValue(appId)
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index b8d1f4c..f5b2910 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -21,7 +21,7 @@
import java.util.Collection
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication}
import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
import org.apache.gearpump.streaming.source.DataSource
@@ -42,7 +42,7 @@
new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
}
- def submit(): Int = {
+ def submit(): RunningApplication = {
context.submit(streamApp)
}
}