blob: 0b274e9074984b946676f4a1f533509b35baec62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.samza.SamzaException;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobConfig extends MapConfig {
private static final Logger LOG = LoggerFactory.getLogger(JobConfig.class);
public static final String STREAM_JOB_FACTORY_CLASS = "job.factory.class";
/**
* job.config.rewriters is a CSV list of config rewriter names. Each name is determined
* by the %s value in job.config.rewriter.%s.class. For example, if you define
* job.config.rewriter.some-regex.class=org.apache.samza.config.RegExTopicGenerator,
* then the rewriter config would be set to job.config.rewriters = some-regex.
*/
public static final String CONFIG_REWRITERS = "job.config.rewriters";
public static final String CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class";
/**
* job.config.loader.factory specifies {@link ConfigLoaderFactory} to get {@link ConfigLoader}
*/
public static final String CONFIG_LOADER_FACTORY = "job.config.loader.factory";
public static final String JOB_NAME = "job.name";
public static final String JOB_ID = "job.id";
static final String DEFAULT_JOB_ID = "1";
public static final String JOB_COORDINATOR_SYSTEM = "job.coordinator.system";
public static final String JOB_DEFAULT_SYSTEM = "job.default.system";
public static final String JOB_JMX_ENABLED = "job.jmx.enabled";
public static final String JOB_CONTAINER_COUNT = "job.container.count";
static final int DEFAULT_JOB_CONTAINER_COUNT = 1;
public static final String JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size";
public static final String JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions";
public static final String JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms";
static final int DEFAULT_DEBOUNCE_TIME_MS = 20000;
public static final String SSP_INPUT_EXPANSION_ENABLED = "job.systemstreampartition.input.expansion.enabled";
public static final boolean DEFAULT_INPUT_EXPANSION_ENABLED = true;
public static final String SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory";
public static final String SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class";
public static final String SSP_MATCHER_CLASS_REGEX = "org.apache.samza.system.RegexSystemStreamPartitionMatcher";
public static final String SSP_MATCHER_CLASS_RANGE = "org.apache.samza.system.RangeSystemStreamPartitionMatcher";
public static final String SSP_MATCHER_CONFIG_REGEX = "job.systemstreampartition.matcher.config.regex";
public static final String SSP_MATCHER_CONFIG_RANGES = "job.systemstreampartition.matcher.config.ranges";
public static final String SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX =
"job.systemstreampartition.matcher.config.job.factory.regex";
static final String DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX =
"org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)";
public static final String SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = "job.system.stream.partition.mapper.factory";
// number of partitions in the checkpoint stream should be 1. But sometimes,
// if a stream was created(automatically) with the wrong number of partitions(default number of partitions
// for new streams), there is no easy fix for the user (topic deletion or reducing of number of partitions
// is not yet supported, and auto-creation of the topics cannot be always easily tuned off).
// So we add a setting that allows for the job to continue even though number of partitions is not 1.
public static final String JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled";
public static final String MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change";
public static final String MONITOR_PARTITION_CHANGE_FREQUENCY_MS =
"job.coordinator.monitor-partition-change.frequency.ms";
static final int DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000;
public static final String MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms";
static final int DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000;
public static final String REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex";
public static final String REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system";
public static final String REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config";
public static final String JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory";
public static final String METADATA_STORE_FACTORY = "metadata.store.factory";
public static final String STARTPOINT_METADATA_STORE_FACTORY = "startpoint.metadata.store.factory";
public static final String LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory";
// Processor Config Constants
public static final String PROCESSOR_ID = "processor.id";
public static final String PROCESSOR_LIST = "processor.list";
// Represents the store path for non-changelog stores.
public static final String JOB_NON_LOGGED_STORE_BASE_DIR = "job.non-logged.store.base.dir";
// Represents the store path for stores with changelog enabled. Typically the stores are not cleaned up
// across application restarts
public static final String JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir";
// Enables diagnostic appender for logging exception events
public static final String JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled";
// Enables standby tasks
public static final String STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor";
static final int DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1;
// Naming format and directory for container.metadata file
public static final String CONTAINER_METADATA_FILENAME_FORMAT = "%s.metadata"; // Filename: <containerID>.metadata
public static final String CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY = "samza.log.dir";
// Auto-sizing related configs that take precedence over respective sizing confings job.container.count, etc,
// *only* when job.autosizing.enabled is true. Otherwise current behavior is maintained.
private static final String JOB_AUTOSIZING_CONFIG_PREFIX = "job.autosizing."; // used to determine if a config is related to autosizing
public static final String JOB_AUTOSIZING_ENABLED = JOB_AUTOSIZING_CONFIG_PREFIX + "enabled";
public static final String JOB_AUTOSIZING_CONTAINER_COUNT = JOB_AUTOSIZING_CONFIG_PREFIX + "container.count";
public static final String JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE = JOB_AUTOSIZING_CONFIG_PREFIX + "container.thread.pool.size";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";
private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
public JobConfig(Config config) {
super(config);
}
public Optional<String> getName() {
return Optional.ofNullable(get(JOB_NAME));
}
public String getCoordinatorSystemName() {
return Optional.ofNullable(getCoordinatorSystemNameOrNull())
.orElseThrow(() -> new ConfigException(
"Missing job.coordinator.system configuration. Cannot proceed with job execution."));
}
/**
* Gets the System to use for reading/writing the coordinator stream. Uses the following precedence.
*
* 1. If job.coordinator.system is defined, that value is used.
* 2. If job.default.system is defined, that value is used.
* 3. None
*/
public String getCoordinatorSystemNameOrNull() {
return Optional.ofNullable(get(JOB_COORDINATOR_SYSTEM)).orElseGet(() -> getDefaultSystem().orElse(null));
}
public Optional<String> getDefaultSystem() {
return Optional.ofNullable(get(JOB_DEFAULT_SYSTEM));
}
/**
* Return the value of JOB_CONTAINER_COUNT or "yarn.container.count" (in that order) if autosizing is not enabled,
* otherwise returns the value of JOB_AUTOSIZING_CONTAINER_COUNT.
* @return
*/
public int getContainerCount() {
Optional<String> autosizingContainerCountValue = Optional.ofNullable(get(JOB_AUTOSIZING_CONTAINER_COUNT));
Optional<String> jobContainerCountValue = Optional.ofNullable(get(JOB_CONTAINER_COUNT));
if (getAutosizingEnabled() && autosizingContainerCountValue.isPresent()) {
return Integer.parseInt(autosizingContainerCountValue.get());
} else if (jobContainerCountValue.isPresent()) {
return Integer.parseInt(jobContainerCountValue.get());
} else {
// To maintain backwards compatibility, honor yarn.container.count for now.
// TODO get rid of this in a future release.
Optional<String> yarnContainerCountValue = Optional.ofNullable(get("yarn.container.count"));
if (yarnContainerCountValue.isPresent()) {
LOG.warn("Configuration 'yarn.container.count' is deprecated. Please use {}.", JOB_CONTAINER_COUNT);
return Integer.parseInt(yarnContainerCountValue.get());
} else {
return DEFAULT_JOB_CONTAINER_COUNT;
}
}
}
public int getMonitorRegexFrequency() {
return getInt(MONITOR_INPUT_REGEX_FREQUENCY_MS, DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS);
}
public boolean getMonitorRegexDisabled() {
return getMonitorRegexFrequency() <= 0;
}
public int getMonitorPartitionChangeFrequency() {
return getInt(MONITOR_PARTITION_CHANGE_FREQUENCY_MS, DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS);
}
/**
* Compile a map of each input-system to its corresponding input-monitor-regex patterns.
*/
public Map<String, Pattern> getMonitorRegexPatternMap(String rewritersList) {
Map<String, Pattern> inputRegexesToMonitor = new HashMap<>();
Stream.of(rewritersList.split(",")).forEach(rewriterName -> {
Optional<String> rewriterSystem = getRegexResolvedSystem(rewriterName);
Optional<String> rewriterRegex = getRegexResolvedStreams(rewriterName);
if (rewriterSystem.isPresent() && rewriterRegex.isPresent()) {
Pattern newPatternForSystem;
Pattern existingPatternForSystem = inputRegexesToMonitor.get(rewriterSystem.get());
if (existingPatternForSystem == null) {
newPatternForSystem = Pattern.compile(rewriterRegex.get());
} else {
newPatternForSystem =
Pattern.compile(String.join("|", existingPatternForSystem.pattern(), rewriterRegex.get()));
}
inputRegexesToMonitor.put(rewriterSystem.get(), newPatternForSystem);
}
});
return inputRegexesToMonitor;
}
public Optional<String> getRegexResolvedStreams(String rewriterName) {
return Optional.ofNullable(get(String.format(REGEX_RESOLVED_STREAMS, rewriterName)));
}
public Optional<String> getRegexResolvedSystem(String rewriterName) {
return Optional.ofNullable(get(String.format(REGEX_RESOLVED_SYSTEM, rewriterName)));
}
public Config getRegexResolvedInheritedConfig(String rewriterName) {
return subset(String.format(REGEX_INHERITED_CONFIG, rewriterName) + ".", true);
}
public Optional<String> getStreamJobFactoryClass() {
return Optional.ofNullable(get(STREAM_JOB_FACTORY_CLASS));
}
public String getJobId() {
return Optional.ofNullable(get(JOB_ID)).orElse(DEFAULT_JOB_ID);
}
public boolean failOnCheckpointValidation() {
return getBoolean(JOB_FAIL_CHECKPOINT_VALIDATION, true);
}
public Optional<String> getConfigRewriters() {
return Optional.ofNullable(get(CONFIG_REWRITERS));
}
public Optional<String> getConfigRewriterClass(String name) {
return Optional.ofNullable(get(String.format(CONFIG_REWRITER_CLASS, name)));
}
public boolean isSSPGrouperProxyEnabled() {
return getBoolean(SSP_INPUT_EXPANSION_ENABLED, DEFAULT_INPUT_EXPANSION_ENABLED);
}
public String getSystemStreamPartitionGrouperFactory() {
return Optional.ofNullable(get(SSP_GROUPER_FACTORY)).orElseGet(GroupByPartitionFactory.class::getName);
}
public String getLocationIdProviderFactory() {
return Optional.ofNullable(get(LOCATION_ID_PROVIDER_FACTORY))
.orElseGet(DefaultLocationIdProviderFactory.class::getName);
}
public Optional<String> getSecurityManagerFactory() {
return Optional.ofNullable(get(JOB_SECURITY_MANAGER_FACTORY));
}
public Optional<String> getSSPMatcherClass() {
return Optional.ofNullable(get(SSP_MATCHER_CLASS));
}
public String getSSPMatcherConfigRegex() {
return Optional.ofNullable(get(SSP_MATCHER_CONFIG_REGEX))
.orElseThrow(
() -> new SamzaException(String.format("Missing required configuration: '%s'", SSP_MATCHER_CONFIG_REGEX)));
}
public String getSSPMatcherConfigRanges() {
return Optional.ofNullable(get(SSP_MATCHER_CONFIG_RANGES))
.orElseThrow(
() -> new SamzaException(String.format("Missing required configuration: '%s'", SSP_MATCHER_CONFIG_RANGES)));
}
public String getSSPMatcherConfigJobFactoryRegex() {
return get(SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX);
}
/**
* Return the value of JOB_CONTAINER_THREAD_POOL_SIZE if autosizing is not enabled,
* otherwise returns the value of JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE.
* @return
*/
public int getThreadPoolSize() {
Optional<String> autosizingContainerThreadPoolSize = Optional.ofNullable(get(
JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE));
if (getAutosizingEnabled() && autosizingContainerThreadPoolSize.isPresent()) {
return Integer.parseInt(autosizingContainerThreadPoolSize.get());
} else {
return getInt(JOB_CONTAINER_THREAD_POOL_SIZE, 0);
}
}
public int getDebounceTimeMs() {
return getInt(JOB_DEBOUNCE_TIME_MS, DEFAULT_DEBOUNCE_TIME_MS);
}
public Optional<String> getNonLoggedStorePath() {
return Optional.ofNullable(get(JOB_NON_LOGGED_STORE_BASE_DIR));
}
public Optional<String> getLoggedStorePath() {
return Optional.ofNullable(get(JOB_LOGGED_STORE_BASE_DIR));
}
public String getMetadataStoreFactory() {
return get(METADATA_STORE_FACTORY, CoordinatorStreamMetadataStoreFactory.class.getName());
}
public boolean getDiagnosticsEnabled() {
return getBoolean(JOB_DIAGNOSTICS_ENABLED, false);
}
public boolean getAutosizingEnabled() {
return getBoolean(JOB_AUTOSIZING_ENABLED, false);
}
/**
* Check if a given config parameter is an internal autosizing related config, based on
* its name having the prefix "job.autosizing"
* @param configParam the config param to determine
* @return true if the config is related to autosizing, false otherwise
*/
public static boolean isAutosizingConfig(String configParam) {
return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
}
public boolean getJMXEnabled() {
return getBoolean(JOB_JMX_ENABLED, true);
}
public String getSystemStreamPartitionMapperFactoryName() {
return get(SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, HashSystemStreamPartitionMapperFactory.class.getName());
}
public int getStandbyTaskReplicationFactor() {
return getInt(STANDBY_TASKS_REPLICATION_FACTOR, DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR);
}
public boolean getStandbyTasksEnabled() {
return getStandbyTaskReplicationFactor() > 1;
}
public boolean isSplitDeploymentEnabled() {
return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
}
/**
* The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
* Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
* which uniquely identifies a container's lifecycle.
*/
public static Optional<File> getMetadataFile(String execEnvContainerId) {
String dir = System.getProperty(CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
if (dir == null || execEnvContainerId == null) {
return Optional.empty();
} else {
return Optional.of(
new File(dir, String.format(CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId)));
}
}
/**
* Get coordinatorStreamFactory according to the configs
* @return the name of coordinatorStreamFactory
*/
public String getCoordinatorStreamFactory() {
return get(COORDINATOR_STREAM_FACTORY, DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY);
}
/**
* Get config loader factory according to the configs
* @return full qualified name of {@link ConfigLoaderFactory}
*/
public Optional<String> getConfigLoaderFactory() {
return Optional.ofNullable(get(CONFIG_LOADER_FACTORY));
}
public boolean getStartpointEnabled() {
return getBoolean(JOB_STARTPOINT_ENABLED, true);
}
}