| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.coordinator |
| |
| |
| import java.util |
| import java.util.concurrent.atomic.AtomicReference |
| |
| import org.apache.samza.config.JobConfig.Config2Job |
| import org.apache.samza.config.SystemConfig.Config2System |
| import org.apache.samza.config.TaskConfig.Config2Task |
| import org.apache.samza.config.Config |
| import org.apache.samza.config.StorageConfig |
| import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory |
| import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper |
| import org.apache.samza.container.grouper.task.TaskNameGrouperFactory |
| import org.apache.samza.container.LocalityManager |
| import org.apache.samza.container.TaskName |
| import org.apache.samza.coordinator.server.HttpServer |
| import org.apache.samza.coordinator.server.JobServlet |
| import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer |
| import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory |
| import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer |
| import org.apache.samza.job.model.JobModel |
| import org.apache.samza.job.model.TaskModel |
| import org.apache.samza.metrics.MetricsRegistryMap |
| import org.apache.samza.storage.ChangelogPartitionManager |
| import org.apache.samza.system.ExtendedSystemAdmin |
| import org.apache.samza.system.StreamMetadataCache |
| import org.apache.samza.system.SystemFactory |
| import org.apache.samza.system.SystemStreamPartition |
| import org.apache.samza.system.SystemStreamPartitionMatcher |
| import org.apache.samza.system.SystemAdmin |
| import org.apache.samza.util.Logging |
| import org.apache.samza.util.Util |
| import org.apache.samza.Partition |
| import org.apache.samza.SamzaException |
| |
| import scala.collection.JavaConverters._ |
| |
| /** |
| * Helper companion object that is responsible for wiring up a JobModelManager |
| * given a Config object. |
| */ |
| object JobModelManager extends Logging { |
| |
| val SOURCE = "JobModelManager" |
| /** |
| * a volatile value to store the current instantiated <code>JobModelManager</code> |
| */ |
| @volatile var currentJobModelManager: JobModelManager = null |
| val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]() |
| |
| /** |
| * Does the following actions for a job. |
| * a) Reads the jobModel from coordinator stream using the job's configuration. |
| * b) Creates changeLogStream for task stores if it does not exists. |
| * c) Recomputes changelog partition mapping based on jobModel and job's configuration |
| * and writes it to the coordinator stream. |
| * d) Builds JobModelManager using the jobModel read from coordinator stream. |
| * @param coordinatorSystemConfig A config object that contains job.name |
| * job.id, and all system.<job-coordinator-system-name>.* |
| * configuration. The method will use this config to read all configuration |
| * from the coordinator stream, and instantiate a JobModelManager. |
| */ |
| def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = { |
| val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory() |
| val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap) |
| val coordinatorSystemProducer: CoordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap) |
| info("Registering coordinator system stream consumer.") |
| coordinatorSystemConsumer.register |
| debug("Starting coordinator system stream consumer.") |
| coordinatorSystemConsumer.start |
| debug("Bootstrapping coordinator system stream consumer.") |
| coordinatorSystemConsumer.bootstrap |
| info("Registering coordinator system stream producer.") |
| coordinatorSystemProducer.register(SOURCE) |
| |
| val config = coordinatorSystemConsumer.getConfig |
| info("Got config: %s" format config) |
| val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE) |
| changelogManager.start() |
| val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer) |
| // We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers. |
| // TODO: This code will go away with refactoring - SAMZA-678 |
| |
| localityManager.start() |
| |
| // Map the name of each system to the corresponding SystemAdmin |
| val systemAdmins = getSystemAdmins(config) |
| |
| val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0) |
| var streamPartitionCountMonitor: StreamPartitionCountMonitor = null |
| if (config.getMonitorPartitionChange) { |
| val extendedSystemAdmins = systemAdmins.filter{ |
| case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin] |
| } |
| val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem)) |
| if (inputStreamsToMonitor.nonEmpty) { |
| streamPartitionCountMonitor = new StreamPartitionCountMonitor( |
| inputStreamsToMonitor.asJava, |
| streamMetadataCache, |
| metricsRegistryMap, |
| config.getMonitorPartitionChangeFrequency) |
| } |
| } |
| val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping() |
| val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null) |
| val jobModel = jobModelManager.jobModel |
| // Save the changelog mapping back to the ChangelogPartitionmanager |
| // newChangelogPartitionMapping is the merging of all current task:changelog |
| // assignments with whatever we had before (previousChangelogPartitionMapping). |
| // We must persist legacy changelog assignments so that |
| // maxChangelogPartitionId always has the absolute max, not the current |
| // max (in case the task with the highest changelog partition mapping |
| // disappears. |
| val newChangelogPartitionMapping = jobModel.getContainers.asScala.flatMap(_._2.getTasks.asScala).map{case (taskName,taskModel) => { |
| taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId) |
| }}.toMap ++ previousChangelogPartitionMapping.asScala |
| info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping) |
| changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava) |
| |
| createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions) |
| |
| jobModelManager |
| } |
| def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap()) |
| |
| /** |
| * Build a JobModelManager using a Samza job's configuration. |
| */ |
| private def getJobModelManager(config: Config, |
| changeLogMapping: util.Map[TaskName, Integer], |
| localityManager: LocalityManager, |
| streamMetadataCache: StreamMetadataCache, |
| streamPartitionCountMonitor: StreamPartitionCountMonitor, |
| containerIds: java.util.List[String]) = { |
| val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds) |
| jobModelRef.set(jobModel) |
| |
| val server = new HttpServer |
| server.addServlet("/", new JobServlet(jobModelRef)) |
| currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor) |
| currentJobModelManager |
| } |
| |
| /** |
| * For each input stream specified in config, exactly determine its |
| * partitions, returning a set of SystemStreamPartitions containing them all. |
| */ |
| private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) = { |
| val inputSystemStreams = config.getInputStreams |
| |
| // Get the set of partitions for each SystemStream from the stream metadata |
| streamMetadataCache |
| .getStreamMetadata(inputSystemStreams, true) |
| .flatMap { |
| case (systemStream, metadata) => |
| metadata |
| .getSystemStreamPartitionMetadata |
| .asScala |
| .keys |
| .map(new SystemStreamPartition(systemStream, _)) |
| }.toSet |
| } |
| |
| private def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = { |
| val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache) |
| config.getSSPMatcherClass match { |
| case Some(s) => { |
| val jfr = config.getSSPMatcherConfigJobFactoryRegex.r |
| config.getStreamJobFactoryClass match { |
| case Some(jfr(_*)) => { |
| info("before match: allSystemStreamPartitions.size = %s" format (allSystemStreamPartitions.size)) |
| val sspMatcher = Util.getObj[SystemStreamPartitionMatcher](s) |
| val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet |
| // Usually a small set hence ok to log at info level |
| info("after match: matchedPartitions = %s" format (matchedPartitions)) |
| matchedPartitions |
| } |
| case _ => allSystemStreamPartitions |
| } |
| } |
| case _ => allSystemStreamPartitions |
| } |
| } |
| |
| /** |
| * Gets a SystemStreamPartitionGrouper object from the configuration. |
| */ |
| private def getSystemStreamPartitionGrouper(config: Config) = { |
| val factoryString = config.getSystemStreamPartitionGrouperFactory |
| val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString) |
| factory.getSystemStreamPartitionGrouper(config) |
| } |
| |
| /** |
| * The function reads the latest checkpoint from the underlying coordinator stream and |
| * builds a new JobModel. |
| */ |
| def readJobModel(config: Config, |
| changeLogPartitionMapping: util.Map[TaskName, Integer], |
| localityManager: LocalityManager, |
| streamMetadataCache: StreamMetadataCache, |
| containerIds: java.util.List[String]): JobModel = { |
| // Do grouping to fetch TaskName to SSP mapping |
| val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) |
| val grouper = getSystemStreamPartitionGrouper(config) |
| val groups = grouper.group(allSystemStreamPartitions.asJava) |
| info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet())) |
| |
| // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change |
| // mapping. |
| var maxChangelogPartitionId = changeLogPartitionMapping.asScala.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1) |
| // Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive |
| val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups) |
| |
| // Assign all SystemStreamPartitions to TaskNames. |
| val taskModels = { |
| sortedGroups.asScala.map { case (taskName, systemStreamPartitions) => |
| val changelogPartition = Option(changeLogPartitionMapping.get(taskName)) match { |
| case Some(changelogPartitionId) => new Partition(changelogPartitionId) |
| case _ => |
| // If we've never seen this TaskName before, then assign it a |
| // new changelog. |
| maxChangelogPartitionId += 1 |
| info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId)) |
| new Partition(maxChangelogPartitionId) |
| } |
| new TaskModel(taskName, systemStreamPartitions, changelogPartition) |
| }.toSet |
| } |
| |
| // Here is where we should put in a pluggable option for the |
| // SSPTaskNameGrouper for locality, load-balancing, etc. |
| val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory) |
| val containerGrouper = containerGrouperFactory.build(config) |
| val containerModels = { |
| containerGrouper match { |
| case grouper: BalancingTaskNameGrouper => grouper.balance(taskModels.asJava, localityManager) |
| case _ => containerGrouper.group(taskModels.asJava, containerIds) |
| } |
| } |
| val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getProcessorId -> containerModel }.toMap |
| |
| new JobModel(config, containerMap.asJava, localityManager) |
| } |
| |
| /** |
| * Instantiates the system admins based upon the system factory class available in {@param config}. |
| * @param config contains adequate information to instantiate the SystemAdmin. |
| * @return a map of SystemName(String) to the instantiated SystemAdmin. |
| */ |
| def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = { |
| val systemNames = getSystemNames(config) |
| // Map the name of each system to the corresponding SystemAdmin |
| val systemAdmins = systemNames.map(systemName => { |
| val systemFactoryClassName = config |
| .getSystemFactory(systemName) |
| .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) |
| val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) |
| systemName -> systemFactory.getAdmin(systemName, config) |
| }).toMap |
| systemAdmins |
| } |
| |
| private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) { |
| val changeLogSystemStreams = config |
| .getStoreNames |
| .filter(config.getChangelogStream(_).isDefined) |
| .map(name => (name, config.getChangelogStream(name).get)).toMap |
| .mapValues(Util.getSystemStreamFromNames(_)) |
| |
| for ((storeName, systemStream) <- changeLogSystemStreams) { |
| val systemAdmin = Util.getObj[SystemFactory](config |
| .getSystemFactory(systemStream.getSystem) |
| .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem)) |
| ).getAdmin(systemStream.getSystem, config) |
| |
| systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions) |
| } |
| } |
| |
| private def getSystemNames(config: Config) = config.getSystemNames.toSet |
| |
| } |
| |
| /** |
| * <p>JobModelManager is responsible for managing the lifecycle of a Samza job |
| * once it's been started. This includes starting and stopping containers, |
| * managing configuration, etc.</p> |
| * |
| * <p>Any new cluster manager that's integrated with Samza (YARN, Mesos, etc) |
| * must integrate with the job coordinator.</p> |
| * |
| * <p>This class' API is currently unstable, and likely to change. The |
| * responsibility is simply to propagate the job model, and HTTP |
| * server right now.</p> |
| */ |
| class JobModelManager( |
| /** |
| * The data model that describes the Samza job's containers and tasks. |
| */ |
| val jobModel: JobModel, |
| |
| /** |
| * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up. |
| */ |
| val server: HttpServer = null, |
| val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging { |
| |
| debug("Got job model: %s." format jobModel) |
| |
| def start { |
| if (server != null) { |
| debug("Starting HTTP server.") |
| server.start |
| if (streamPartitionCountMonitor != null) { |
| debug("Starting Stream Partition Count Monitor..") |
| streamPartitionCountMonitor.start() |
| } |
| info("Started HTTP server: %s" format server.getUrl) |
| } |
| } |
| |
| def stop { |
| if (server != null) { |
| debug("Stopping HTTP server.") |
| if (streamPartitionCountMonitor != null) { |
| debug("Stopping Stream Partition Count Monitor..") |
| streamPartitionCountMonitor.stop() |
| } |
| server.stop |
| info("Stopped HTTP server.") |
| } |
| } |
| } |