blob: 09597f08ea9325041339abdd68e0920e4b0ef8be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.RegExTopicGenerator;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.stream.SSPGrouperProxy;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
import org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.container.grouper.task.TaskNameGrouperFactory;
import org.apache.samza.container.grouper.task.TaskNameGrouperProxy;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionMatcher;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
public class JobModelCalculator {
private static final Logger LOG = LoggerFactory.getLogger(JobModelCalculator.class);
public static final JobModelCalculator INSTANCE = new JobModelCalculator();
private JobModelCalculator() {
}
/**
* Does the following:
* 1. Fetches metadata of the input streams defined in configuration through {@code streamMetadataCache}.
* 2. Applies the SSP grouper and task name grouper defined in the configuration to build the {@link JobModel}.
* @param originalConfig the configuration of the job.
* @param changeLogPartitionMapping the task to changelog partition mapping of the job.
* @param streamMetadataCache the cache that holds the partition metadata of the input streams.
* @param grouperMetadata provides the historical metadata of the application.
* @return the built {@link JobModel}.
*/
public JobModel calculateJobModel(Config originalConfig, Map<TaskName, Integer> changeLogPartitionMapping,
StreamMetadataCache streamMetadataCache, GrouperMetadata grouperMetadata) {
// refresh config if enabled regex topic rewriter
Config refreshedConfig = refreshConfigByRegexTopicRewriter(originalConfig);
TaskConfig taskConfig = new TaskConfig(refreshedConfig);
// Do grouping to fetch TaskName to SSP mapping
Set<SystemStreamPartition> allSystemStreamPartitions =
getMatchedInputStreamPartitions(refreshedConfig, streamMetadataCache);
// processor list is required by some of the groupers. So, let's pass them as part of the config.
// Copy the config and add the processor list to the config copy.
// TODO: It is non-ideal to have config as a medium to transmit the locality information; especially, if the locality information evolves. Evaluate options on using context objects to pass dependent components.
Map<String, String> configMap = new HashMap<>(refreshedConfig);
configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", grouperMetadata.getProcessorLocality().keySet()));
SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap));
JobConfig jobConfig = new JobConfig(refreshedConfig);
Map<TaskName, Set<SystemStreamPartition>> groups;
if (jobConfig.isSSPGrouperProxyEnabled()) {
SSPGrouperProxy sspGrouperProxy = new SSPGrouperProxy(refreshedConfig, grouper);
groups = sspGrouperProxy.group(allSystemStreamPartitions, grouperMetadata);
} else {
LOG.warn(String.format(
"SSPGrouperProxy is disabled (%s = false). Stateful jobs may produce erroneous results if this is not enabled.",
JobConfig.SSP_INPUT_EXPANSION_ENABLED));
groups = grouper.group(allSystemStreamPartitions);
}
LOG.info(String.format(
"SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s",
grouper, groups.size(), groups));
// If no mappings are present (first time the job is running) we return -1, this will allow 0 to be the first change
// mapping.
int maxChangelogPartitionId = changeLogPartitionMapping.values().stream().max(Comparator.naturalOrder()).orElse(-1);
// Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
TreeMap<TaskName, Set<SystemStreamPartition>> sortedGroups = new TreeMap<>(groups);
Set<TaskModel> taskModels = new HashSet<>();
for (Map.Entry<TaskName, Set<SystemStreamPartition>> group : sortedGroups.entrySet()) {
TaskName taskName = group.getKey();
Set<SystemStreamPartition> systemStreamPartitions = group.getValue();
Optional<Integer> changelogPartitionId = Optional.ofNullable(changeLogPartitionMapping.get(taskName));
Partition changelogPartition;
if (changelogPartitionId.isPresent()) {
changelogPartition = new Partition(changelogPartitionId.get());
} else {
// If we've never seen this TaskName before, then assign it a new changelog partition.
maxChangelogPartitionId++;
LOG.info(
String.format("New task %s is being assigned changelog partition %s.", taskName, maxChangelogPartitionId));
changelogPartition = new Partition(maxChangelogPartitionId);
}
taskModels.add(new TaskModel(taskName, systemStreamPartitions, changelogPartition));
}
// Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc.
TaskNameGrouperFactory containerGrouperFactory =
ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory(), TaskNameGrouperFactory.class);
boolean standbyTasksEnabled = jobConfig.getStandbyTasksEnabled();
int standbyTaskReplicationFactor = jobConfig.getStandbyTaskReplicationFactor();
TaskNameGrouperProxy taskNameGrouperProxy =
new TaskNameGrouperProxy(containerGrouperFactory.build(refreshedConfig), standbyTasksEnabled,
standbyTaskReplicationFactor);
Set<ContainerModel> containerModels;
boolean isHostAffinityEnabled = new ClusterManagerConfig(refreshedConfig).getHostAffinityEnabled();
if (isHostAffinityEnabled) {
containerModels = taskNameGrouperProxy.group(taskModels, grouperMetadata);
} else {
containerModels =
taskNameGrouperProxy.group(taskModels, new ArrayList<>(grouperMetadata.getProcessorLocality().keySet()));
}
Map<String, ContainerModel> containerMap =
containerModels.stream().collect(Collectors.toMap(ContainerModel::getId, Function.identity()));
return new JobModel(refreshedConfig, containerMap);
}
/**
* Refresh Kafka topic list used as input streams if enabled {@link org.apache.samza.config.RegExTopicGenerator}
* @param originalConfig Samza job config
* @return refreshed config
*/
private static Config refreshConfigByRegexTopicRewriter(Config originalConfig) {
JobConfig jobConfig = new JobConfig(originalConfig);
Optional<String> configRewriters = jobConfig.getConfigRewriters();
Config resultConfig = originalConfig;
if (configRewriters.isPresent()) {
for (String rewriterName : configRewriters.get().split(",")) {
String rewriterClass = jobConfig.getConfigRewriterClass(rewriterName)
.orElseThrow(() -> new ConfigException(
String.format("Unable to find class config for config rewriter %s.", rewriterName)));
if (rewriterClass.equalsIgnoreCase(RegExTopicGenerator.class.getName())) {
resultConfig = ConfigUtil.applyRewriter(resultConfig, rewriterName);
}
}
}
return resultConfig;
}
/**
* Builds the input {@see SystemStreamPartition} based upon the {@param config} defined by the user.
* @param config configuration to fetch the metadata of the input streams.
* @param streamMetadataCache required to query the partition metadata of the input streams.
* @return the input SystemStreamPartitions of the job.
*/
private static Set<SystemStreamPartition> getMatchedInputStreamPartitions(Config config,
StreamMetadataCache streamMetadataCache) {
Set<SystemStreamPartition> allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache);
JobConfig jobConfig = new JobConfig(config);
Optional<String> sspMatcherClassName = jobConfig.getSSPMatcherClass();
if (sspMatcherClassName.isPresent()) {
String sspMatcherConfigJobFactoryRegex = jobConfig.getSSPMatcherConfigJobFactoryRegex();
Optional<String> streamJobFactoryClass = jobConfig.getStreamJobFactoryClass();
if (streamJobFactoryClass.isPresent() && Pattern.matches(sspMatcherConfigJobFactoryRegex,
streamJobFactoryClass.get())) {
LOG.info(String.format("before match: allSystemStreamPartitions.size = %s", allSystemStreamPartitions.size()));
SystemStreamPartitionMatcher sspMatcher =
ReflectionUtil.getObj(sspMatcherClassName.get(), SystemStreamPartitionMatcher.class);
Set<SystemStreamPartition> matchedPartitions = sspMatcher.filter(allSystemStreamPartitions, config);
// Usually a small set hence ok to log at info level
LOG.info(String.format("after match: matchedPartitions = %s", matchedPartitions));
return matchedPartitions;
}
}
return allSystemStreamPartitions;
}
/**
* Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param config}. Instantiates the
* {@see SystemStreamPartitionGrouper} object through the factory.
* @param config the configuration of the samza job.
* @return the instantiated {@see SystemStreamPartitionGrouper}.
*/
private static SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
String factoryString = new JobConfig(config).getSystemStreamPartitionGrouperFactory();
SystemStreamPartitionGrouperFactory factory =
ReflectionUtil.getObj(factoryString, SystemStreamPartitionGrouperFactory.class);
return factory.getSystemStreamPartitionGrouper(config);
}
/**
* Computes the input system stream partitions of a samza job using the provided {@param config}
* and {@param streamMetadataCache}.
* @param config the configuration of the job.
* @param streamMetadataCache to query the partition metadata of the input streams.
* @return the input {@see SystemStreamPartition} of the samza job.
*/
private static Set<SystemStreamPartition> getInputStreamPartitions(Config config,
StreamMetadataCache streamMetadataCache) {
TaskConfig taskConfig = new TaskConfig(config);
// Get the set of partitions for each SystemStream from the stream metadata
Map<SystemStream, SystemStreamMetadata> allMetadata = JavaConverters.mapAsJavaMapConverter(
streamMetadataCache.getStreamMetadata(
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams()).asScala().toSet(), true)).asJava();
Set<SystemStreamPartition> inputStreamPartitions = new HashSet<>();
allMetadata.forEach((systemStream, systemStreamMetadata) -> systemStreamMetadata.getSystemStreamPartitionMetadata()
.keySet()
.forEach(partition -> inputStreamPartitions.add(new SystemStreamPartition(systemStream, partition))));
return inputStreamPartitions;
}
}