blob: 37d13938f0b0f13f2642ce7a1c8d9167eae8486f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.util
import java.util
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
import{Delete, SetConfig}
import org.apache.samza.job.JobRunner
import org.apache.samza.metadatastore.MetadataStore
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, SystemFactory, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.JavaConverters._
object CoordinatorStreamUtil extends Logging {
* Given a job's full config object, build a subset config which includes
* only the job name, job id, and system config for the coordinator stream.
def buildCoordinatorStreamConfig(config: Config): MapConfig = {
val jobConfig = new JobConfig(config)
val buildConfigFactory = jobConfig.getCoordinatorStreamFactory
val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
new MapConfig(coordinatorSystemConfig)
* Creates coordinator stream from config if it does not exist, otherwise no-op.
* @param config to create coordinator stream.
def createCoordinatorStream(config: Config): Unit = {
val systemAdmins = new SystemAdmins(config)
info("Creating coordinator stream")
val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
* Creates a coordinator stream.
* @param coordinatorSystemStream the {@see SystemStream} that describes the stream to create.
* @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the stream.
def createCoordinatorStream(coordinatorSystemStream: SystemStream, coordinatorSystemAdmin: SystemAdmin): Unit = {
// TODO: This logic should be part of the final coordinator stream metadata store abstraction. See SAMZA-2182
val streamName = coordinatorSystemStream.getStream
val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
info("Created coordinator stream: %s." format streamName)
} else {
info("Coordinator stream: %s already exists." format streamName)
* Get the coordinator system stream from the configuration
* @param config Configuration to get coordinator system stream from.
* @return
def getCoordinatorSystemStream(config: Config): SystemStream = {
val jobConfig = new JobConfig(config)
val systemName = jobConfig.getCoordinatorSystemName
val (jobName, jobId) = getJobNameAndId(jobConfig)
val streamName = getCoordinatorStreamName(jobName, jobId)
new SystemStream(systemName, streamName)
* Get the coordinator system factory from the configuration
* @param config Configuration to get coordinator system factory from.
* @return
def getCoordinatorSystemFactory(config: Config): SystemFactory = {
val systemName = new JobConfig(config).getCoordinatorSystemName
val systemConfig = new SystemConfig(config)
val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName))
ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
* Generates a coordinator stream name based on the job name and job id
* for the job. The format of the stream name will be:
* __samza_coordinator_<JOBNAME>_<JOBID>.
def getCoordinatorStreamName(jobName: String, jobId: String): String = {
"__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
* Get a job's name and ID given a config. Job ID is defaulted to 1 if not
* defined in the config, and job name must be defined in config.
* @return A tuple of (jobName, jobId)
private def getJobNameAndId(jobConfig: JobConfig) = {
.getOrElse(throw new ConfigException("Missing required config:")),
* Reads and returns launch config persisted in coordinator stream. Only sizing configs are currently supported.
* @param config full job config
* @param metadataStore an instance of the instantiated MetadataStore
* @return empty config if auto sizing is disabled, otherwise auto sizing related configs.
def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: MetadataStore): Config = {
if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
new MapConfig()
} else {
val config = readConfigFromCoordinatorStream(metadataStore)
val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingConfig(key)).asJava
new MapConfig(launchConfig)
* Reads and returns the complete configuration stored in the coordinator stream.
* @param metadataStore an instance of the instantiated {@link CoordinatorStreamStore}.
* @return the configuration read from the coordinator stream.
def readConfigFromCoordinatorStream(metadataStore: MetadataStore): Config = {
val namespaceAwareCoordinatorStreamStore: NamespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
val configFromCoordinatorStream: util.Map[String, Array[Byte]] = namespaceAwareCoordinatorStreamStore.all
val configMap: util.Map[String, String] = new util.HashMap[String, String]
for ((key: String, valueAsBytes: Array[Byte]) <- configFromCoordinatorStream.asScala) {
if (valueAsBytes == null) {
warn("Value for key: %s in config is null. Ignoring it." format key)
} else {
val valueSerde: CoordinatorStreamValueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
if (valueAsString == null) {
warn("Value for key: %s in config is decoded to be null. Ignoring it." format key)
} else {
configMap.put(key, valueAsString)
new MapConfig(configMap)
def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = true) {
debug("config: %s" format config)
val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
if (resetJobConfig) {
info("Storing config in coordinator stream.")
coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
info("Loading old config from coordinator stream.")
val oldConfig = coordinatorSystemConsumer.getConfig
if (resetJobConfig) {
var keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
val jobConfig = new JobConfig(config)
if (jobConfig.getAutosizingEnabled) {
// If autosizing is enabled, we retain auto-sizing related configs
keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })