blob: 5a3f7b34224cac6f32ccfd8260013faf58b156ee [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.coordinator
import java.util
import java.util.concurrent.atomic.AtomicReference
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
import org.apache.samza.config.Config
import org.apache.samza.container.grouper.task._
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskMode
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metadatastore.MetadataStore
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil}
import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
* Helper companion object that is responsible for wiring up a JobModelManager
* given a Config object.
object JobModelManager extends Logging {
val SOURCE = "JobModelManager"
* a volatile value to store the current instantiated <code>JobModelManager</code>
@volatile var currentJobModelManager: JobModelManager = _
val serializedJobModelRef = new AtomicReference[Array[Byte]]
* Currently used only in the ApplicationMaster for yarn deployment model.
* Does the following:
* a) Reads the jobModel from coordinator stream using the job's configuration.
* b) Recomputes the changelog partition mapping based on jobModel and job's configuration.
* c) Builds JobModelManager using the jobModel read from coordinator stream.
* @param config config from the coordinator stream.
* @param changelogPartitionMapping changelog partition-to-task mapping of the samza job.
* @param metricsRegistry the registry for reporting metrics.
* @return the instantiated {@see JobModelManager}.
def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer],
metadataStore: MetadataStore,
metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): JobModelManager = {
// Instantiate the respective metadata store util classes which uses the same coordinator metadata store.
val localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE))
val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE))
val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE))
val systemAdmins = new SystemAdmins(config)
try {
val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
val jobModel = readJobModel(config, changelogPartitionMapping, streamMetadataCache, grouperMetadata)
val jobModelToServe = new JobModel(jobModel.getConfig, jobModel.getContainers)
val serializedJobModelToServe = SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
val server = new HttpServer
server.addServlet("/", new JobServlet(serializedJobModelRef))
server.addServlet("/locality", new LocalityServlet(localityManager))
currentJobModelManager = new JobModelManager(jobModelToServe, server)
} finally {
// Not closing coordinatorStreamStore, since {@code ClusterBasedJobCoordinator} uses it to read container locality through {@code JobModel}.
* Builds the {@see GrouperMetadataImpl} for the samza job.
* @param config represents the configurations defined by the user.
* @param localityManager provides the processor to host mapping persisted to the metadata store.
* @param taskAssignmentManager provides the processor to task assignments persisted to the metadata store.
* @param taskPartitionAssignmentManager provides the task to partition assignments persisted to the metadata store.
* @return the instantiated {@see GrouperMetadata}.
def getGrouperMetadata(config: Config, localityManager: LocalityManager, taskAssignmentManager: TaskAssignmentManager, taskPartitionAssignmentManager: TaskPartitionAssignmentManager) = {
val processorLocality: util.Map[String, LocationId] = getProcessorLocality(config, localityManager)
val taskModes: util.Map[TaskName, TaskMode] = taskAssignmentManager.readTaskModes()
// We read the taskAssignment only for ActiveTasks, i.e., tasks that have no task-mode or have an active task mode
val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment().
filterKeys(taskName => !taskModes.containsKey(new TaskName(taskName)) || taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
val taskNameToProcessorId: util.Map[TaskName, String] = new util.HashMap[TaskName, String]()
for ((taskName, processorId) <- taskAssignment) {
taskNameToProcessorId.put(new TaskName(taskName), processorId)
val taskLocality: util.Map[TaskName, LocationId] = new util.HashMap[TaskName, LocationId]()
for ((taskName, processorId) <- taskAssignment) {
if (processorLocality.containsKey(processorId)) {
taskLocality.put(new TaskName(taskName), processorLocality.get(processorId))
val sspToTaskMapping: util.Map[SystemStreamPartition, util.List[String]] = taskPartitionAssignmentManager.readTaskPartitionAssignments()
val taskPartitionAssignments: util.Map[TaskName, util.List[SystemStreamPartition]] = new util.HashMap[TaskName, util.List[SystemStreamPartition]]()
// Task to partition assignments is stored as {@see SystemStreamPartition} to list of {@see TaskName} in
// coordinator stream. This is done due to the 1 MB value size limit in a kafka topic. Conversion to
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
sspToTaskMapping foreach { case (systemStreamPartition: SystemStreamPartition, taskNames: util.List[String]) =>
for (task <- taskNames) {
val taskName: TaskName = new TaskName(task)
// We read the partition assignments only for active-tasks, i.e., tasks that have no task-mode or have an active task mode
if (!taskModes.containsKey(taskName) || taskModes.get(taskName).eq(TaskMode.Active)) {
taskPartitionAssignments.putIfAbsent(taskName, new util.ArrayList[SystemStreamPartition]())
new GrouperMetadataImpl(processorLocality, taskLocality, taskPartitionAssignments, taskNameToProcessorId)
* Retrieves and returns the processor locality of a samza job using provided {@see Config} and {@see LocalityManager}.
* @param config provides the configurations defined by the user. Required to connect to the storage layer.
* @param localityManager provides the processor to host mapping persisted to the metadata store.
* @return the processor locality.
def getProcessorLocality(config: Config, localityManager: LocalityManager) = {
val containerToLocationId: util.Map[String, LocationId] = new util.HashMap[String, LocationId]()
val existingContainerLocality = localityManager.readLocality().getProcessorLocalities
for (containerId <- 0 until new JobConfig(config).getContainerCount) {
val preferredHost = Option.apply(existingContainerLocality.get(containerId.toString))
.map(containerLocality =>
.filter(host => host.nonEmpty)
// To handle the case when the container count is increased between two different runs of a samza-yarn job,
// set the locality of newly added containers to any_host.
var locationId: LocationId = new LocationId("ANY_HOST")
if (preferredHost != null) {
locationId = new LocationId(preferredHost)
containerToLocationId.put(containerId.toString, locationId)
* This method does the following:
* 1. Deletes the existing task assignments if the partition-task grouping has changed from the previous run of the job.
* 2. Saves the newly generated task assignments to the storage layer through the {@param TaskAssignementManager}.
* @param jobModel represents the {@see JobModel} of the samza job.
* @param taskAssignmentManager required to persist the processor to task assignments to the metadata store.
* @param taskPartitionAssignmentManager required to persist the task to partition assignments to the metadata store.
* @param grouperMetadata provides the historical metadata of the samza application.
def updateTaskAssignments(jobModel: JobModel,
taskAssignmentManager: TaskAssignmentManager,
taskPartitionAssignmentManager: TaskPartitionAssignmentManager,
grouperMetadata: GrouperMetadata): Unit = {
info("Storing the task assignments into metadata store.")
val activeTaskNames: util.Set[String] = new util.HashSet[String]()
val standbyTaskNames: util.Set[String] = new util.HashSet[String]()
val systemStreamPartitions: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition]()
for (container <- jobModel.getContainers.values()) {
for (taskModel <- container.getTasks.values()) {
if(taskModel.getTaskMode.eq(TaskMode.Active)) {
if(taskModel.getTaskMode.eq(TaskMode.Standby)) {
val previousTaskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment
if (activeTaskNames.size() != previousTaskToContainerId.size()) {
warn("Current task count %s does not match saved task count %s. Stateful jobs may observe misalignment of keys!"
format (activeTaskNames.size(), previousTaskToContainerId.size()))
// If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that
// without a much more complicated mapping. Further, the partition count may have changed, which means
// input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary
// data associated with the incoming keys. Warn the user and default to grouper
// In this scenario the tasks may have been reduced, so we need to delete all the existing messages
taskAssignmentManager.deleteTaskContainerMappings( => taskName.getTaskName).asJava)
// if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has
// changed, we log a warning and delete the existing mapping for these tasks
val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x => x._2.eq(TaskMode.Standby))
if(standbyTaskNames.asScala.eq(previousStandbyTasks.keySet)) {
info("The set of standby tasks has changed, current standby tasks %s, previous standby tasks %s" format (standbyTaskNames, previousStandbyTasks.keySet))
taskAssignmentManager.deleteTaskContainerMappings( => x._1.getTaskName).asJava)
// Task to partition assignments is stored as {@see SystemStreamPartition} to list of {@see TaskName} in
// coordinator stream. This is done due to the 1 MB value size limit in a kafka topic. Conversion to
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = new util.HashMap[String, util.Map[String, TaskMode]]()
for (container <- jobModel.getContainers.values()) {
for ((taskName, taskModel) <- container.getTasks) {
taskContainerMappings.putIfAbsent(container.getId, new util.HashMap[String, TaskMode]())
taskContainerMappings.get(container.getId).put(taskName.getTaskName, container.getTasks.get(taskName).getTaskMode)
for (partition <- taskModel.getSystemStreamPartitions) {
if (!sspToTaskNameMap.containsKey(partition)) {
sspToTaskNameMap.put(partition, new util.ArrayList[String]())
* Computes the input system stream partitions of a samza job using the provided {@param config}
* and {@param streamMetadataCache}.
* @param config the configuration of the job.
* @param streamMetadataCache to query the partition metadata of the input streams.
* @return the input {@see SystemStreamPartition} of the samza job.
private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = {
val taskConfig = new TaskConfig(config)
// Expand regex input, if a regex-rewriter is defined in config
val inputSystemStreams =
// Get the set of partitions for each SystemStream from the stream metadata
.getStreamMetadata(inputSystemStreams, partitionsMetadataOnly = true)
.flatMap {
case (systemStream, metadata) =>
.map(new SystemStreamPartition(systemStream, _))
* Builds the input {@see SystemStreamPartition} based upon the {@param config} defined by the user.
* @param config configuration to fetch the metadata of the input streams.
* @param streamMetadataCache required to query the partition metadata of the input streams.
* @return the input SystemStreamPartitions of the job.
private def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache):
Set[SystemStreamPartition] = {
val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
val jobConfig = new JobConfig(config)
JavaOptionals.toRichOptional(jobConfig.getSSPMatcherClass).toOption match {
case Some(sspMatcherClassName) =>
val jfr = jobConfig.getSSPMatcherConfigJobFactoryRegex.r
JavaOptionals.toRichOptional(jobConfig.getStreamJobFactoryClass).toOption match {
case Some(jfr(_*)) =>
info("before match: allSystemStreamPartitions.size = %s" format allSystemStreamPartitions.size)
val sspMatcher = ReflectionUtil.getObj(sspMatcherClassName, classOf[SystemStreamPartitionMatcher])
val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet
// Usually a small set hence ok to log at info level
info("after match: matchedPartitions = %s" format matchedPartitions)
case _ => allSystemStreamPartitions
case _ => allSystemStreamPartitions
* Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param config}. Instantiates the {@see SystemStreamPartitionGrouper}
* object through the factory.
* @param config the configuration of the samza job.
* @return the instantiated {@see SystemStreamPartitionGrouper}.
private def getSystemStreamPartitionGrouper(config: Config) = {
val factoryString = new JobConfig(config).getSystemStreamPartitionGrouperFactory
val factory = ReflectionUtil.getObj(factoryString, classOf[SystemStreamPartitionGrouperFactory])
* Refresh Kafka topic list used as input streams if enabled {@link org.apache.samza.config.RegExTopicGenerator}
* @param config Samza job config
* @return refreshed config
private def refreshConfigByRegexTopicRewriter(config: Config): Config = {
val jobConfig = new JobConfig(config)
JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
case Some(rewriters) => rewriters.split(",").
filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
.getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
foldLeft(config)(ConfigUtil.applyRewriter(_, _))
case _ => config
* Does the following:
* 1. Fetches metadata of the input streams defined in configuration through {@param streamMetadataCache}.
* 2. Applies the {@see SystemStreamPartitionGrouper}, {@see TaskNameGrouper} defined in the configuration
* to build the {@see JobModel}.
* @param originalConfig the configuration of the job.
* @param changeLogPartitionMapping the task to changelog partition mapping of the job.
* @param streamMetadataCache the cache that holds the partition metadata of the input streams.
* @param grouperMetadata provides the historical metadata of the application.
* @return the built {@see JobModel}.
def readJobModel(originalConfig: Config,
changeLogPartitionMapping: util.Map[TaskName, Integer],
streamMetadataCache: StreamMetadataCache,
grouperMetadata: GrouperMetadata): JobModel = {
// refresh config if enabled regex topic rewriter
val config = refreshConfigByRegexTopicRewriter(originalConfig)
val taskConfig = new TaskConfig(config)
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
// processor list is required by some of the groupers. So, let's pass them as part of the config.
// Copy the config and add the processor list to the config copy.
// TODO: It is non-ideal to have config as a medium to transmit the locality information; especially, if the locality information evolves. Evaluate options on using context objects to pass dependent components.
val configMap = new util.HashMap[String, String](config)
configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", grouperMetadata.getProcessorLocality.keySet()))
val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
val jobConfig = new JobConfig(config)
val groups: util.Map[TaskName, util.Set[SystemStreamPartition]] = if (jobConfig.isSSPGrouperProxyEnabled) {
val sspGrouperProxy: SSPGrouperProxy = new SSPGrouperProxy(config, grouper), grouperMetadata)
} else {
warn("SSPGrouperProxy is disabled (%s = false). Stateful jobs may produce erroneous results if this is not enabled." format JobConfig.SSP_INPUT_EXPANSION_ENABLED)
info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups))
// If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
// mapping.
var maxChangelogPartitionId =
// Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups)
// Assign all SystemStreamPartitions to TaskNames.
val taskModels = { { case (taskName, systemStreamPartitions) =>
val changelogPartition = Option(changeLogPartitionMapping.get(taskName)) match {
case Some(changelogPartitionId) => new Partition(changelogPartitionId)
case _ =>
// If we've never seen this TaskName before, then assign it a
// new changelog.
maxChangelogPartitionId += 1
info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
new Partition(maxChangelogPartitionId)
new TaskModel(taskName, systemStreamPartitions, changelogPartition)
// Here is where we should put in a pluggable option for the
// SSPTaskNameGrouper for locality, load-balancing, etc.
val containerGrouperFactory =
ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory, classOf[TaskNameGrouperFactory])
val standbyTasksEnabled = jobConfig.getStandbyTasksEnabled
val standbyTaskReplicationFactor = jobConfig.getStandbyTaskReplicationFactor
val taskNameGrouperProxy = new TaskNameGrouperProxy(, standbyTasksEnabled, standbyTaskReplicationFactor)
var containerModels: util.Set[ContainerModel] = null
val isHostAffinityEnabled = new ClusterManagerConfig(config).getHostAffinityEnabled
if(isHostAffinityEnabled) {
containerModels =, grouperMetadata)
} else {
containerModels =, new util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
val containerMap = => containerModel.getId -> containerModel).toMap
new JobModel(config, containerMap.asJava)
* <p>JobModelManager is responsible for managing the lifecycle of a Samza job
* once it's been started. This includes starting and stopping containers,
* managing configuration, etc.</p>
* <p>Any new cluster manager that's integrated with Samza (YARN, Mesos, etc)
* must integrate with the job coordinator.</p>
* <p>This class' API is currently unstable, and likely to change. The
* responsibility is simply to propagate the job model, and HTTP
* server right now.</p>
class JobModelManager(
* The data model that describes the Samza job's containers and tasks.
val jobModel: JobModel,
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
val server: HttpServer = null) extends Logging {
debug("Got job model: %s." format jobModel)
def start() {
if (server != null) {
debug("Starting HTTP server.")
info("Started HTTP server: %s" format server.getUrl)
def stop() {
if (server != null) {
debug("Stopping HTTP server.")
info("Stopped HTTP server.")