blob: 468d9c9a9784e53461e0f50be37d4787f847bf05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointManagerFactory;
import org.apache.samza.container.grouper.task.GroupByContainerCountFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.chooser.RoundRobinChooserFactory;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TaskConfig extends MapConfig {
public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfig.class);
// comma-separated list of system-streams
public static final String INPUT_STREAMS = "task.inputs";
// window period in milliseconds
public static final String WINDOW_MS = "task.window.ms";
static final long DEFAULT_WINDOW_MS = -1L;
// commit period in milliseconds
public static final String COMMIT_MS = "task.commit.ms";
static final long DEFAULT_COMMIT_MS = 60000L;
// how long to wait for a clean shutdown
public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
// legacy config for specifying task class; replaced by SamzaApplication and app.class
public static final String TASK_CLASS = "task.class";
// command builder to use for launching a Samza job
public static final String COMMAND_BUILDER = "task.command.class";
// message chooser for controlling stream consumption
public static final String MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class";
// define whether to drop the messages or not when deserialization fails
public static final String DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors";
// define whether to drop the messages or not when serialization fails
public static final String DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors";
// whether to ignore producer errors and drop the messages that failed to send
public static final String DROP_PRODUCER_ERRORS = "task.drop.producer.errors";
// exceptions to ignore in process and window
public static final String IGNORED_EXCEPTIONS = "task.ignored.exceptions";
// class name for task grouper
public static final String GROUPER_FACTORY = "task.name.grouper.factory";
// max number of messages to process concurrently
public static final String MAX_CONCURRENCY = "task.max.concurrency";
static final int DEFAULT_MAX_CONCURRENCY = 1;
// timeout for triggering a callback
public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
// enable async commit
public static final String ASYNC_COMMIT = "task.async.commit";
// maximum time to wait for a task worker to complete when there are no new messages to handle
public static final String MAX_IDLE_MS = "task.max.idle.ms";
static final long DEFAULT_MAX_IDLE_MS = 10L;
/**
* Samza's container polls for more messages under two conditions. The first
* condition arises when there are simply no remaining buffered messages to
* process for any input SystemStreamPartition. The second condition arises
* when some input SystemStreamPartitions have empty buffers, but some do
* not. In the latter case, a polling interval is defined to determine how
* often to refresh the empty SystemStreamPartition buffers. By default,
* this interval is 50ms, which means that any empty SystemStreamPartition
* buffer will be refreshed at least every 50ms. A higher value here means
* that empty SystemStreamPartitions will be refreshed less often, which
* means more latency is introduced, but less CPU and network will be used.
* Decreasing this value means that empty SystemStreamPartitions are
* refreshed more frequently, thereby introducing less latency, but
* increasing CPU and network utilization.
*/
public static final String POLL_INTERVAL_MS = "task.poll.interval.ms";
public static final int DEFAULT_POLL_INTERVAL_MS = 50;
// broadcast streams consumed by all tasks. e.g. kafka.foo#1
public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$";
public static final String CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory";
// standby containers use this flag to indicate that checkpoints will be polled continually, rather than only once at startup like in an active container
public static final String INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ = "samza.internal.task.checkpoint.consumer.stop.after.first.read";
public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
"task.transactional.state.retain.existing.state";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
public TaskConfig(Config config) {
super(config);
}
/**
* Get the input streams, not including the broadcast streams. Use {@link #getAllInputStreams()} to also get the
* broadcast streams.
*/
public Set<SystemStream> getInputStreams() {
Optional<String> inputStreams = Optional.ofNullable(get(INPUT_STREAMS));
if (!inputStreams.isPresent() || inputStreams.get().isEmpty()) {
return Collections.emptySet();
} else {
return Stream.of(inputStreams.get().split(","))
.map(systemStreamNames -> StreamUtil.getSystemStreamFromNames(systemStreamNames.trim()))
.collect(Collectors.toSet());
}
}
public long getWindowMs() {
return getLong(WINDOW_MS, DEFAULT_WINDOW_MS);
}
public long getCommitMs() {
return getLong(COMMIT_MS, DEFAULT_COMMIT_MS);
}
public Optional<String> getTaskClass() {
return Optional.ofNullable(get(TASK_CLASS));
}
public String getCommandClass(String defaultCommandClass) {
return get(COMMAND_BUILDER, defaultCommandClass);
}
public String getMessageChooserClass() {
return Optional.ofNullable(get(MESSAGE_CHOOSER_CLASS_NAME)).orElse(RoundRobinChooserFactory.class.getName());
}
public boolean getDropDeserializationErrors() {
return getBoolean(DROP_DESERIALIZATION_ERRORS, false);
}
public boolean getDropSerializationErrors() {
return getBoolean(DROP_SERIALIZATION_ERRORS, false);
}
public boolean getDropProducerErrors() {
return getBoolean(DROP_PRODUCER_ERRORS, false);
}
public int getPollIntervalMs() {
return getInt(POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
}
public Optional<String> getIgnoredExceptions() {
return Optional.ofNullable(get(IGNORED_EXCEPTIONS));
}
public String getTaskNameGrouperFactory() {
Optional<String> taskNameGrouperFactory = Optional.ofNullable(get(GROUPER_FACTORY));
if (taskNameGrouperFactory.isPresent()) {
return taskNameGrouperFactory.get();
} else {
LOGGER.info(String.format("No %s configuration, using %s", GROUPER_FACTORY,
GroupByContainerCountFactory.class.getName()));
return GroupByContainerCountFactory.class.getName();
}
}
public int getMaxConcurrency() {
return getInt(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
}
public long getCallbackTimeoutMs() {
return getLong(CALLBACK_TIMEOUT_MS, DEFAULT_CALLBACK_TIMEOUT_MS);
}
public boolean getAsyncCommit() {
return getBoolean(ASYNC_COMMIT, false);
}
public long getMaxIdleMs() {
return getLong(MAX_IDLE_MS, DEFAULT_MAX_IDLE_MS);
}
/**
* Create the checkpoint manager
*
* @param metricsRegistry Registry of metrics to use. Can be null if not using metrics.
* @return CheckpointManager object if checkpoint manager factory is configured, otherwise empty.
*/
public Optional<CheckpointManager> getCheckpointManager(MetricsRegistry metricsRegistry) {
return Optional.ofNullable(get(CHECKPOINT_MANAGER_FACTORY))
.filter(StringUtils::isNotBlank)
.map(checkpointManagerFactoryName -> ReflectionUtil.getObj(checkpointManagerFactoryName,
CheckpointManagerFactory.class).getCheckpointManager(this, metricsRegistry));
}
/**
* Internal config to indicate whether the SystemConsumer underlying a CheckpointManager should be stopped after
* initial read of checkpoints.
*/
public boolean getCheckpointManagerConsumerStopAfterFirstRead() {
return getBoolean(INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, true);
}
/**
* Get the systemStreamPartitions of the broadcast stream. Specifying
* one partition for one stream or a range of the partitions for one
* stream is allowed.
*
* @return a Set of SystemStreamPartitions
*/
public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<>();
List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, Collections.emptyList());
for (String systemStreamPartition : systemStreamPartitions) {
int hashPosition = systemStreamPartition.indexOf("#");
if (hashPosition == -1) {
throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+ ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
} else {
String systemStreamName = systemStreamPartition.substring(0, hashPosition);
String partitionSegment = systemStreamPartition.substring(hashPosition + 1);
SystemStream systemStream = StreamUtil.getSystemStreamFromNames(systemStreamName);
if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment))));
} else {
if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, partitionSegment)) {
int partitionStart = Integer.valueOf(partitionSegment.substring(1, partitionSegment.lastIndexOf("-")));
int partitionEnd = Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 1, partitionSegment.indexOf("]")));
if (partitionStart > partitionEnd) {
LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
}
for (int i = partitionStart; i <= partitionEnd; i++) {
systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
}
} else {
throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+ ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
}
}
}
}
return systemStreamPartitionSet;
}
/**
* Get the SystemStreams for the configured broadcast streams.
*
* @return the set of SystemStreams for which there are broadcast stream SSPs configured.
*/
public Set<SystemStream> getBroadcastSystemStreams() {
Set<SystemStream> broadcastSS = new HashSet<>();
Set<SystemStreamPartition> broadcastSSPs = getBroadcastSystemStreamPartitions();
for (SystemStreamPartition bssp : broadcastSSPs) {
broadcastSS.add(bssp.getSystemStream());
}
return Collections.unmodifiableSet(broadcastSS);
}
/**
* Get the SystemStreams for the configured input and broadcast streams.
*
* @return the set of SystemStreams for both standard inputs and broadcast stream inputs.
*/
public Set<SystemStream> getAllInputStreams() {
Set<SystemStream> allInputSS = new HashSet<>();
allInputSS.addAll(getInputStreams());
allInputSS.addAll(getBroadcastSystemStreams());
return Collections.unmodifiableSet(allInputSS);
}
/**
* Returns a value indicating how long to wait for the tasks to shutdown
* If the value is not defined in the config or if does not parse correctly, we return the default value -
* {@value #DEFAULT_TASK_SHUTDOWN_MS}
*
* @return Long value indicating how long to wait for all the tasks to shutdown
*/
public long getShutdownMs() {
String shutdownMs = get(TASK_SHUTDOWN_MS);
try {
return Long.parseLong(shutdownMs);
} catch (NumberFormatException nfe) {
LOGGER.warn(String.format(
"Unable to parse user-configure value for %s - %s. Using default value %d",
TASK_SHUTDOWN_MS,
shutdownMs,
DEFAULT_TASK_SHUTDOWN_MS));
return DEFAULT_TASK_SHUTDOWN_MS;
}
}
public boolean getTransactionalStateCheckpointEnabled() {
return getBoolean(TRANSACTIONAL_STATE_CHECKPOINT_ENABLED, DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED);
}
public boolean getTransactionalStateRestoreEnabled() {
return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
}
public boolean getTransactionalStateRetainExistingState() {
return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
}
}