SAMZA-2424: AM should cache and serve serialized Job Model to containers (#1241)

diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index c7e7c7c..5a3f7b3 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -34,7 +34,6 @@
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
 import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
 import org.apache.samza.job.model.ContainerModel
@@ -45,9 +44,10 @@
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.LocationId
+import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.system._
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil, Util}
+import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConverters
 import scala.collection.JavaConversions._
@@ -65,7 +65,7 @@
    * a volatile value to store the current instantiated <code>JobModelManager</code>
    */
   @volatile var currentJobModelManager: JobModelManager = _
-  val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
+  val serializedJobModelRef = new AtomicReference[Array[Byte]]
 
   /**
    * Currently used only in the ApplicationMaster for yarn deployment model.
@@ -93,16 +93,18 @@
       val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
       val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
 
-      val jobModel: JobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
-      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers))
+      val jobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
+      val jobModelToServe = new JobModel(jobModel.getConfig, jobModel.getContainers)
+      val serializedJobModelToServe = SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
+      serializedJobModelRef.set(serializedJobModelToServe)
 
       updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
 
       val server = new HttpServer
-      server.addServlet("/", new JobServlet(jobModelRef))
+      server.addServlet("/", new JobServlet(serializedJobModelRef))
       server.addServlet("/locality", new LocalityServlet(localityManager))
 
-      currentJobModelManager = new JobModelManager(jobModelRef.get(), server)
+      currentJobModelManager = new JobModelManager(jobModelToServe, server)
       currentJobModelManager
     } finally {
       systemAdmins.stop()
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 5750c2d..c1f5b47 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -22,19 +22,22 @@
 
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.SamzaException
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.util.Logging
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
 /**
- * A servlet that dumps the job model for a Samza job.
+ * Serves the JSON serialized job model for the job.
  */
-class JobServlet(jobModelRef: AtomicReference[JobModel]) extends ServletBase with Logging {
-  protected def getObjectToWrite() = {
+class JobServlet(jobModelRef: AtomicReference[Array[Byte]]) extends HttpServlet {
+  override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
     val jobModel = jobModelRef.get()
-    if (jobModel == null) { // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
-      throw new SamzaException("Job Model is not defined in the JobCoordinator. This indicates that the Samza job is unstable. Exiting...")
+
+    // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+    if (jobModel == null) {
+      throw new IllegalStateException("No JobModel to serve in the JobCoordinator.")
     }
-    jobModel
+
+    response.setContentType("application/json")
+    response.setStatus(HttpServletResponse.SC_OK)
+    response.getOutputStream.write(jobModel)
   }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
deleted file mode 100644
index 2732cca..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.server;
-
-import java.io.IOException
-import javax.servlet.ServletException
-import javax.servlet.http.HttpServlet
-import javax.servlet.http.HttpServletRequest
-import javax.servlet.http.HttpServletResponse
-import org.codehaus.jackson.map.ObjectMapper;
-import org.apache.samza.serializers.model.SamzaObjectMapper
-
-/**
- * A simple servlet helper that makes it easy to dump objects to JSON.
- */
-trait ServletBase extends HttpServlet {
-  val mapper = SamzaObjectMapper.getObjectMapper()
-
-  override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
-    response.setContentType("application/json")
-    response.setStatus(HttpServletResponse.SC_OK)
-    mapper.writeValue(response.getWriter(), getObjectToWrite())
-  }
-
-  /**
-   * Returns an object that should be fed to Jackson's ObjectMapper, and 
-   * returned as an HTTP response.
-   */
-  protected def getObjectToWrite(): Object
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a36e475..7d18033 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -22,6 +22,7 @@
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 import org.apache.samza.Partition
 import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig}
 import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
@@ -29,6 +30,7 @@
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.metrics.Gauge
+import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.storage.ContainerStorageManager
 import org.apache.samza.system._
 import org.junit.Assert._
@@ -266,11 +268,11 @@
       "0" -> new ContainerModel("0", tasks),
       "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
-    def jobModelGenerator(): JobModel = jobModel
+    def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
-    JobModelManager.jobModelRef.set(jobModelGenerator())
-    coordinator.server.addServlet("/*", new JobServlet(JobModelManager.jobModelRef))
+    JobModelManager.serializedJobModelRef.set(jobModelGenerator())
+    coordinator.server.addServlet("/*", new JobServlet(JobModelManager.serializedJobModelRef))
     try {
       coordinator.start
       assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -291,11 +293,11 @@
       "0" -> new ContainerModel("0", tasks),
       "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
-    def jobModelGenerator(): JobModel = jobModel
+    def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
-    JobModelManager.jobModelRef.set(jobModelGenerator())
-    val mockJobServlet = new MockJobServlet(2, JobModelManager.jobModelRef)
+    JobModelManager.serializedJobModelRef.set(jobModelGenerator())
+    val mockJobServlet = new MockJobServlet(2, JobModelManager.serializedJobModelRef)
     coordinator.server.addServlet("/*", mockJobServlet)
     try {
       coordinator.start
@@ -371,16 +373,24 @@
     this.samzaContainer.setContainerListener(this.samzaContainerListener)
   }
 
-  class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
+  class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[Array[Byte]]) extends HttpServlet {
     var exceptionCount = 0
 
-    override protected def getObjectToWrite(): JobModel = {
+    override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
       if (exceptionCount < exceptionLimit) {
         exceptionCount += 1
         throw new java.io.IOException("Throwing exception")
       } else {
         val jobModel = jobModelRef.get()
-        jobModel
+
+        // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+        if (jobModel == null) {
+          throw new IllegalStateException("No JobModel to serve in the JobCoordinator.")
+        }
+
+        response.setContentType("application/json")
+        response.setStatus(HttpServletResponse.SC_OK)
+        response.getOutputStream.write(jobModelRef.get())
       }
     }
   }
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 26ee97d..123872b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -107,9 +107,10 @@
     val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
 
     // Verify that the atomicReference is initialized
-    assertNotNull(JobModelManager.jobModelRef.get())
+    assertNotNull(JobModelManager.serializedJobModelRef.get())
     val expectedContainerModels = new util.TreeMap[String, ContainerModel](expectedJobModel.getContainers)
-    val actualContainerModels = new util.TreeMap[String, ContainerModel](JobModelManager.jobModelRef.get().getContainers)
+    val jobModel = SamzaObjectMapper.getObjectMapper.readValue(JobModelManager.serializedJobModelRef.get(), classOf[JobModel])
+    val actualContainerModels = new util.TreeMap[String, ContainerModel](jobModel.getContainers)
     assertEquals(expectedContainerModels, actualContainerModels)
 
     coordinator.start
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index 6ca4070..4ebad53 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,10 +19,12 @@
 
 package org.apache.samza.coordinator.server
 
+import java.net.URL
+
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 import org.apache.samza.util.{HttpUtil, Util}
 import org.junit.Assert._
 import org.junit.Test
-import java.net.URL
 
 class TestHttpServer {
   @Test
@@ -32,7 +34,7 @@
       server.addServlet("/basic", new BasicServlet())
       server.start
       val body = HttpUtil.read(new URL(server.getUrl + "/basic"))
-      assertEquals("{\"foo\":\"bar\"}", body)
+      assertEquals("{}", body)
       val css = HttpUtil.read(new URL(server.getUrl + "/css/ropa-sans.css"))
       assertTrue(css.contains("RopaSans"))
     } finally {
@@ -53,10 +55,10 @@
   }
 }
 
-class BasicServlet extends ServletBase {
-  def getObjectToWrite = {
-    val map = new java.util.HashMap[String, String]()
-    map.put("foo", "bar")
-    map
+class BasicServlet extends HttpServlet {
+  override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
+    response.setContentType("application/json")
+    response.setStatus(HttpServletResponse.SC_OK)
+    response.getWriter.write("{}")
   }
 }
\ No newline at end of file