blob: bc61a7d8644a889025b51d63f18eeba6b1e4ed4f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.job.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import org.apache.samza.SamzaException;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.config.Config;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A class to manage read and writes of {@link JobCoordinatorMetadata} to {@link MetadataStore}. It also provides
* additional helper functionalities to generate {@link JobCoordinatorMetadata} and check for changes across runs.
public class JobCoordinatorMetadataManager {
private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
static final String CONTAINER_ID_DELIMITER = "_";
private final JobCoordinatorMetadataManagerMetrics metrics;
private final MetadataStore metadataStore;
private final ObjectMapper metadataMapper = SamzaObjectMapper.getObjectMapper();
private final Serde<String> valueSerde;
private final ClusterType clusterType;
public JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType,
MetricsRegistry metricsRegistry) {
this(metadataStore, clusterType, metricsRegistry,
new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE));
JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry,
Serde<String> valueSerde) {
Preconditions.checkNotNull(clusterType, "Cluster type cannot be null");
this.clusterType = clusterType;
this.metadataStore = metadataStore;
this.valueSerde = valueSerde;
this.metrics = new JobCoordinatorMetadataManagerMetrics(metricsRegistry);
* Generates {@link JobCoordinatorMetadata}.
* Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. Refer to the javadocs for more
* details on how it is generated and the properties of the identifier.
* Config ID - A unique and reproducible identifier that is generated based on the input {@link Config}. It uses
* a {@link Funnel} to use a subset of the input configuration to generate the identifier and as long as the subset
* of the configuration remains same, the identifier is guaranteed to be same. For the list of config prefixes used
* by the funnel refer to {@link ConfigHashFunnel}
* JobModel ID - A unique and reproducible identifier that is generated based on the input {@link JobModel}. It only
* uses the {@link org.apache.samza.job.model.ContainerModel} within the {@linkplain JobModel} for generation. We
* serialize the data into bytes and use those bytes to compute the identifier.
* In case of YARN, the epoch identifier is extracted from the application attempt and translates to applicationId
* e.g. 1606797336059_0010
* Both config and job model identifiers should a 32 bit integer.
* @param jobModel job model used for generating the metadata
* @return the metadata for the job coordinator
public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel jobModel, Config config) {
try {
int jobModelId = Hashing
int configId = Hashing
.hashObject(config, new ConfigHashFunnel())
.asInt();"Generated job model id {} and config id {}", jobModelId, configId);
return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(configId),
} catch (Exception e) {
LOG.error("Failed to generate metadata for the current attempt due to ", e);
throw new SamzaException("Failed to generate the metadata for the current attempt due to ", e);
* Check for changes between the metadata passed as inputs. Metadata is considered changed if any of the attributes within
* {@linkplain JobCoordinatorMetadata} changes.
* We intentionally check for each changes to help us track at this granularity. We want to use this information
* to determine if complex handling is required to cater these changes instead of blindly restarting all the
* containers upstream.
* @param newMetadata new metadata to be compared
* @param previousMetadata previous metadata to be compared against
* @return true if metadata changed, false otherwise
public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata,
JobCoordinatorMetadata previousMetadata) {
Set<JobMetadataChange> changes = new HashSet<>();
boolean newDeployment = false;
if (previousMetadata == null || !previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
newDeployment = true;
if (previousMetadata == null || !previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
if (!newDeployment) {
if (previousMetadata == null || !previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
if (!newDeployment) {
if (changes.isEmpty()) {"Job coordinator metadata {} unchanged.", newMetadata);
} else {"Job coordinator metadata changed from: {} to: {}", previousMetadata, newMetadata);
return changes;
* Reads the {@link JobCoordinatorMetadata} from the metadata store. It fetches the metadata
* associated with cluster type specified at the creation of the manager.
* @return job coordinator metadata
public JobCoordinatorMetadata readJobCoordinatorMetadata() {
JobCoordinatorMetadata metadata = null;
for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
if ( {
try {
String metadataString = valueSerde.fromBytes(entry.getValue());
metadata = metadataMapper.readValue(metadataString, JobCoordinatorMetadata.class);
} catch (Exception e) {
LOG.error("Failed to read job coordinator metadata due to ", e);
}"Fetched the job coordinator metadata for cluster {} as {}.", clusterType, metadata);
return metadata;
* Persist the {@link JobCoordinatorMetadata} in metadata store. The job coordinator metadata is associated
* with the cluster type specified at the creation of the manager.
* @param metadata metadata to be persisted
* @throws SamzaException in case of exception encountered during the writes to underlying metadata store
public void writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be null");
try {
String metadataValueString = metadataMapper.writeValueAsString(metadata);
metadataStore.put(, valueSerde.toBytes(metadataValueString));"Successfully written job coordinator metadata: {} for cluster {}.", metadata, clusterType);
} catch (Exception e) {
LOG.error("Failed to write the job coordinator metadata to metadata store due to ", e);
throw new SamzaException("Failed to write the job coordinator metadata.", e);
* The properties of the epoch identifier are as follows
* 1. Unique across applications in the cluster
* 2. Remains unchanged within a single deployment lifecycle
* 3. Remains unchanged across application attempt within a single deployment lifecycle
* 4. Changes across deployment lifecycle
* For YARN environment:
* Generate the epoch id using the execution container id that is passed through system environment. This isn't ideal
* way of generating this ID, since it is YARN-specific. This is left as a legacy flow for backwards compatibility, as
* the original implementation did not define a cluster-agnostic contract.
* The format and property used to generate ID is specific to YARN and the specific format of the container name
* is a public contract by YARN which is likely to remain backward compatible.
* For non-YARN environments:
* Extract this from the environment variable SAMZA_EPOCH_ID. This is a more generic way of obtaining an epoch id, but
* it does require the resource management layer to inject this value.
* @return an identifier associated with the job coordinator satisfying the above properties
String fetchEpochIdForJobCoordinator() {
if (ClusterType.YARN.equals(this.clusterType)) {
String[] containerIdParts = getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
} else {
return getEnvProperty(EnvironmentVariables.SAMZA_EPOCH_ID);
String getEnvProperty(String propertyName) {
return System.getenv(propertyName);
JobCoordinatorMetadataManagerMetrics getMetrics() {
return metrics;
* A helper class to generate hash for the {@link Config} based on with a subset of configuration.
* The subset of configuration used are configurations that prefix match the allowed prefixes.
private static class ConfigHashFunnel implements Funnel<Config> {
private static final Logger LOG = LoggerFactory.getLogger(ConfigHashFunnel.class);
// using sorted set to ensure the hash computation on configurations is reproducible and deterministic
private static final SortedSet<String> ALLOWED_PREFIXES = ImmutableSortedSet.of("job.autosizing");
public void funnel(Config from, PrimitiveSink into) {
SortedMap<String, String> map = new TreeMap<>();
ALLOWED_PREFIXES.forEach(prefix -> map.putAll(from.subset(prefix, false)));"Using the config {} to generate hash", map);
map.forEach((key, value) -> {
* Type of the cluster deployment associated with the {@link JobCoordinatorMetadataManager}
public enum ClusterType {
* A container class to hold all the metrics related to {@link JobCoordinatorMetadataManager}.
static class JobCoordinatorMetadataManagerMetrics {
private static final String APPLICATION_ATTEMPT_COUNT = "application-attempt-count";
private static final String GROUP = "JobCoordinatorMetadataManager";
private static final String JOB_MODEL_CHANGED = "job-model-changed";
private static final String CONFIG_CHANGED = "config-changed";
private static final String METADATA_GENERATION_FAILED_COUNT = "metadata-generation-failed-count";
private static final String METADATA_READ_FAILED_COUNT = "metadata-read-failed-count";
private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
private static final String NEW_DEPLOYMENT = "new-deployment";
private final Gauge<Integer> applicationAttemptCount;
private final Gauge<Integer> metadataGenerationFailedCount;
private final Gauge<Integer> metadataReadFailedCount;
private final Gauge<Integer> metadataWriteFailedCount;
private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
private final Gauge<Integer> configChangedAcrossApplicationAttempt;
private final Gauge<Integer> newDeployment;
public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
applicationAttemptCount = registry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
configChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, CONFIG_CHANGED, 0);
jobModelChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
metadataGenerationFailedCount = registry.newGauge(GROUP,
metadataReadFailedCount = registry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
metadataWriteFailedCount = registry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
Gauge<Integer> getApplicationAttemptCount() {
return applicationAttemptCount;
Gauge<Integer> getMetadataGenerationFailedCount() {
return metadataGenerationFailedCount;
Gauge<Integer> getMetadataReadFailedCount() {
return metadataReadFailedCount;
Gauge<Integer> getMetadataWriteFailedCount() {
return metadataWriteFailedCount;
Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
return jobModelChangedAcrossApplicationAttempt;
Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
return configChangedAcrossApplicationAttempt;
Gauge<Integer> getNewDeployment() {
return newDeployment;
void incrementApplicationAttemptCount() {
applicationAttemptCount.set(applicationAttemptCount.getValue() + 1);
void incrementMetadataGenerationFailedCount() {
metadataGenerationFailedCount.set(metadataGenerationFailedCount.getValue() + 1);
void incrementMetadataReadFailedCount() {
metadataReadFailedCount.set(metadataReadFailedCount.getValue() + 1);
void incrementMetadataWriteFailedCount() {
metadataWriteFailedCount.set(metadataWriteFailedCount.getValue() + 1);
void setConfigChangedAcrossApplicationAttempt(int value) {
void setJobModelChangedAcrossApplicationAttempt(int value) {
void setNewDeployment(int value) {