SAMZA-2424: AM should cache and serve serialized Job Model to containers (#1241)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index c7e7c7c..5a3f7b3 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -34,7 +34,6 @@
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
import org.apache.samza.job.model.ContainerModel
@@ -45,9 +44,10 @@
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
+import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil, Util}
+import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil}
import scala.collection.JavaConverters
import scala.collection.JavaConversions._
@@ -65,7 +65,7 @@
* a volatile value to store the current instantiated <code>JobModelManager</code>
*/
@volatile var currentJobModelManager: JobModelManager = _
- val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
+ val serializedJobModelRef = new AtomicReference[Array[Byte]]
/**
* Currently used only in the ApplicationMaster for yarn deployment model.
@@ -93,16 +93,18 @@
val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
- val jobModel: JobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
- jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers))
+ val jobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
+ val jobModelToServe = new JobModel(jobModel.getConfig, jobModel.getContainers)
+ val serializedJobModelToServe = SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
+ serializedJobModelRef.set(serializedJobModelToServe)
updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
val server = new HttpServer
- server.addServlet("/", new JobServlet(jobModelRef))
+ server.addServlet("/", new JobServlet(serializedJobModelRef))
server.addServlet("/locality", new LocalityServlet(localityManager))
- currentJobModelManager = new JobModelManager(jobModelRef.get(), server)
+ currentJobModelManager = new JobModelManager(jobModelToServe, server)
currentJobModelManager
} finally {
systemAdmins.stop()
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 5750c2d..c1f5b47 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -22,19 +22,22 @@
import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.SamzaException
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.util.Logging
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
/**
- * A servlet that dumps the job model for a Samza job.
+ * Serves the JSON serialized job model for the job.
*/
-class JobServlet(jobModelRef: AtomicReference[JobModel]) extends ServletBase with Logging {
- protected def getObjectToWrite() = {
+class JobServlet(jobModelRef: AtomicReference[Array[Byte]]) extends HttpServlet {
+ override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
val jobModel = jobModelRef.get()
- if (jobModel == null) { // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
- throw new SamzaException("Job Model is not defined in the JobCoordinator. This indicates that the Samza job is unstable. Exiting...")
+
+ // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+ if (jobModel == null) {
+ throw new IllegalStateException("No JobModel to serve in the JobCoordinator.")
}
- jobModel
+
+ response.setContentType("application/json")
+ response.setStatus(HttpServletResponse.SC_OK)
+ response.getOutputStream.write(jobModel)
}
}
\ No newline at end of file
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
deleted file mode 100644
index 2732cca..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.server;
-
-import java.io.IOException
-import javax.servlet.ServletException
-import javax.servlet.http.HttpServlet
-import javax.servlet.http.HttpServletRequest
-import javax.servlet.http.HttpServletResponse
-import org.codehaus.jackson.map.ObjectMapper;
-import org.apache.samza.serializers.model.SamzaObjectMapper
-
-/**
- * A simple servlet helper that makes it easy to dump objects to JSON.
- */
-trait ServletBase extends HttpServlet {
- val mapper = SamzaObjectMapper.getObjectMapper()
-
- override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
- response.setContentType("application/json")
- response.setStatus(HttpServletResponse.SC_OK)
- mapper.writeValue(response.getWriter(), getObjectToWrite())
- }
-
- /**
- * Returns an object that should be fed to Jackson's ObjectMapper, and
- * returned as an HTTP response.
- */
- protected def getObjectToWrite(): Object
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a36e475..7d18033 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -22,6 +22,7 @@
import java.util
import java.util.concurrent.atomic.AtomicReference
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.apache.samza.Partition
import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig}
import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
@@ -29,6 +30,7 @@
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
import org.apache.samza.metrics.Gauge
+import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.storage.ContainerStorageManager
import org.apache.samza.system._
import org.junit.Assert._
@@ -266,11 +268,11 @@
"0" -> new ContainerModel("0", tasks),
"1" -> new ContainerModel("1", tasks))
val jobModel = new JobModel(config, containers)
- def jobModelGenerator(): JobModel = jobModel
+ def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
val server = new HttpServer
val coordinator = new JobModelManager(jobModel, server)
- JobModelManager.jobModelRef.set(jobModelGenerator())
- coordinator.server.addServlet("/*", new JobServlet(JobModelManager.jobModelRef))
+ JobModelManager.serializedJobModelRef.set(jobModelGenerator())
+ coordinator.server.addServlet("/*", new JobServlet(JobModelManager.serializedJobModelRef))
try {
coordinator.start
assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -291,11 +293,11 @@
"0" -> new ContainerModel("0", tasks),
"1" -> new ContainerModel("1", tasks))
val jobModel = new JobModel(config, containers)
- def jobModelGenerator(): JobModel = jobModel
+ def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
val server = new HttpServer
val coordinator = new JobModelManager(jobModel, server)
- JobModelManager.jobModelRef.set(jobModelGenerator())
- val mockJobServlet = new MockJobServlet(2, JobModelManager.jobModelRef)
+ JobModelManager.serializedJobModelRef.set(jobModelGenerator())
+ val mockJobServlet = new MockJobServlet(2, JobModelManager.serializedJobModelRef)
coordinator.server.addServlet("/*", mockJobServlet)
try {
coordinator.start
@@ -371,16 +373,24 @@
this.samzaContainer.setContainerListener(this.samzaContainerListener)
}
- class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
+ class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[Array[Byte]]) extends HttpServlet {
var exceptionCount = 0
- override protected def getObjectToWrite(): JobModel = {
+ override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
if (exceptionCount < exceptionLimit) {
exceptionCount += 1
throw new java.io.IOException("Throwing exception")
} else {
val jobModel = jobModelRef.get()
- jobModel
+
+ // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+ if (jobModel == null) {
+ throw new IllegalStateException("No JobModel to serve in the JobCoordinator.")
+ }
+
+ response.setContentType("application/json")
+ response.setStatus(HttpServletResponse.SC_OK)
+ response.getOutputStream.write(jobModelRef.get())
}
}
}
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 26ee97d..123872b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -107,9 +107,10 @@
val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
// Verify that the atomicReference is initialized
- assertNotNull(JobModelManager.jobModelRef.get())
+ assertNotNull(JobModelManager.serializedJobModelRef.get())
val expectedContainerModels = new util.TreeMap[String, ContainerModel](expectedJobModel.getContainers)
- val actualContainerModels = new util.TreeMap[String, ContainerModel](JobModelManager.jobModelRef.get().getContainers)
+ val jobModel = SamzaObjectMapper.getObjectMapper.readValue(JobModelManager.serializedJobModelRef.get(), classOf[JobModel])
+ val actualContainerModels = new util.TreeMap[String, ContainerModel](jobModel.getContainers)
assertEquals(expectedContainerModels, actualContainerModels)
coordinator.start
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index 6ca4070..4ebad53 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,10 +19,12 @@
package org.apache.samza.coordinator.server
+import java.net.URL
+
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.apache.samza.util.{HttpUtil, Util}
import org.junit.Assert._
import org.junit.Test
-import java.net.URL
class TestHttpServer {
@Test
@@ -32,7 +34,7 @@
server.addServlet("/basic", new BasicServlet())
server.start
val body = HttpUtil.read(new URL(server.getUrl + "/basic"))
- assertEquals("{\"foo\":\"bar\"}", body)
+ assertEquals("{}", body)
val css = HttpUtil.read(new URL(server.getUrl + "/css/ropa-sans.css"))
assertTrue(css.contains("RopaSans"))
} finally {
@@ -53,10 +55,10 @@
}
}
-class BasicServlet extends ServletBase {
- def getObjectToWrite = {
- val map = new java.util.HashMap[String, String]()
- map.put("foo", "bar")
- map
+class BasicServlet extends HttpServlet {
+ override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+ response.setContentType("application/json")
+ response.setStatus(HttpServletResponse.SC_OK)
+ response.getWriter.write("{}")
}
}
\ No newline at end of file