| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.config; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.samza.system.SystemStream; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.lang.invoke.MethodHandles; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| public class StreamConfig extends MapConfig { |
| public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| |
| // Samza configs for streams |
| public static final String SAMZA_PROPERTY = "samza."; |
| public static final String SYSTEM = SAMZA_PROPERTY + "system"; |
| public static final String PHYSICAL_NAME = SAMZA_PROPERTY + "physical.name"; |
| public static final String MSG_SERDE = SAMZA_PROPERTY + "msg.serde"; |
| public static final String KEY_SERDE = SAMZA_PROPERTY + "key.serde"; |
| public static final String CONSUMER_RESET_OFFSET = SAMZA_PROPERTY + "reset.offset"; |
| public static final String CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default"; |
| public static final String BOOTSTRAP = SAMZA_PROPERTY + "bootstrap"; |
| public static final String PRIORITY = SAMZA_PROPERTY + "priority"; |
| public static final String IS_INTERMEDIATE = SAMZA_PROPERTY + "intermediate"; |
| public static final String DELETE_COMMITTED_MESSAGES = SAMZA_PROPERTY + "delete.committed.messages"; |
| public static final String IS_BOUNDED = SAMZA_PROPERTY + "bounded"; |
| public static final String BROADCAST = SAMZA_PROPERTY + "broadcast"; |
| |
| // We don't want any external dependencies on these patterns while both exist. |
| // Use the corresponding get*() method to ensure proper values. |
| private static final String STREAMS_PREFIX = "streams."; |
| |
| public static final String STREAM_PREFIX = "systems.%s.streams.%s."; |
| public static final String STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."; |
| public static final String SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM; |
| public static final String PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME; |
| public static final String IS_INTERMEDIATE_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE; |
| public static final String DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID = STREAM_ID_PREFIX + DELETE_COMMITTED_MESSAGES; |
| public static final String IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED; |
| public static final String PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY; |
| public static final String CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT; |
| public static final String BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + BOOTSTRAP; |
| public static final String BROADCAST_FOR_STREAM_ID = STREAM_ID_PREFIX + BROADCAST; |
| |
| /* |
| * Implementation notes: |
| * Helper for accessing configs related to stream properties. |
| * |
| * For most configs, this currently supports two different formats for specifying stream properties: |
| * 1) "streams.{streamId}.{property}" (recommended to use this format) |
| * 2) "systems.{systemName}.streams.{streamName}.{property}" (legacy) |
| * Note that some config lookups are only supported through the "streams.{streamId}.{property}". See the specific |
| * accessor method to determine which formats are supported. |
| * |
| * Summary of terms: |
| * - streamId: logical identifier used for a stream; configs are specified using this streamId |
| * - physical stream: concrete name for a stream (if the physical stream is not explicitly configured, then the streamId |
| * is used as the physical stream |
| * - streamName: within the javadoc for this class, streamName is the same as physical stream |
| * - samza property: property which is Samza-specific, which will have "samza." as a prefix (e.g. "samza.key.serde"); |
| * this is in contrast to stream-specific properties which are related to specific stream technologies |
| */ |
| |
| private Optional<String> nonEmptyOption(String value) { |
| if (value == null || value.isEmpty()) { |
| return Optional.empty(); |
| } else { |
| return Optional.of(value); |
| } |
| } |
| |
| public StreamConfig(Config config) { |
| super(config); |
| } |
| |
| |
| /** |
| * Finds the properties from the legacy config style (config key includes system). |
| * This will return a Config with the properties that match the following formats (if a property is specified through |
| * multiple formats, priority is top to bottom): |
| * 1) "systems.{systemName}.streams.{streamName}.{property}" |
| * 2) "systems.{systemName}.default.stream.{property}" |
| * |
| * @param systemName the system name under which the properties are configured |
| * @param streamName the stream name |
| * @return the map of properties for the stream |
| */ |
| private Map<String, String> getSystemStreamProperties(String systemName, String streamName) { |
| if (systemName == null) { |
| Collections.emptyMap(); |
| } |
| SystemConfig systemConfig = new SystemConfig(this); |
| Config defaults = systemConfig.getDefaultStreamProperties(systemName); |
| Config explicitConfigs = subset(String.format(STREAM_PREFIX, systemName, streamName), true); |
| return new MapConfig(defaults, explicitConfigs); |
| } |
| |
| |
| /** |
| * Gets all of the properties for the specified streamId (includes current and legacy config styles). |
| * This will return a Config with the properties that match the following formats (if a property is specified through |
| * multiple formats, priority is top to bottom): |
| * 1) "streams.{streamId}.{property}" |
| * 2) "systems.{systemName}.streams.{streamName}.{property}" where systemName is the system mapped to the streamId in |
| * the config and streamName is the physical stream name mapped to the stream id |
| * 3) "systems.{systemName}.default.stream.{property}" where systemName is the system mapped to the streamId in the |
| * config |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return the merged map of config properties from both the legacy and new config styles |
| */ |
| private MapConfig getAllStreamProperties(String streamId) { |
| Config allProperties = subset(String.format(STREAM_ID_PREFIX, streamId)); |
| Map<String, String> inheritedLegacyProperties = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId)); |
| return new MapConfig(Arrays.asList(inheritedLegacyProperties, allProperties)); |
| } |
| |
| /** |
| * Gets the distinct stream IDs of all the streams defined in the config |
| * |
| * @return collection of stream IDs |
| */ |
| public Set<String> getStreamIds() { |
| // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId. |
| return subset(STREAMS_PREFIX).keySet().stream().map(key -> key.substring(0, key.indexOf("."))) |
| .distinct().collect(Collectors.toSet()); |
| } |
| |
| private List<String> getStreamIdsForSystem(String system) { |
| return getStreamIds().stream().filter(streamId -> system.equals(getSystem(streamId))).collect(Collectors.toList()); |
| } |
| |
| /** |
| * Finds the stream id which corresponds to the systemStream. |
| * This finds the stream id that is mapped to the system in systemStream through the config and that has a physical |
| * name (the physical name might be the stream id itself if there is no explicit mapping) that matches the stream in |
| * systemStream. |
| * Note: If the stream in the systemStream is a stream id which is mapped to a physical stream, then that stream won't |
| * be returned as a stream id here, since the stream in systemStream doesn't match the physical stream name. |
| * |
| * @param systemStream system stream to map to stream id |
| * @return stream id corresponding to the system stream |
| */ |
| private String systemStreamToStreamId(SystemStream systemStream) { |
| List<String> streamIds = getStreamIdsForSystem(systemStream.getSystem()).stream() |
| .filter(streamId -> systemStream.getStream().equals(getPhysicalName(streamId))).collect(Collectors.toList()); |
| if (streamIds.size() > 1) { |
| throw new IllegalStateException(String.format("There was more than one stream found for system stream %s", systemStream)); |
| } |
| |
| return streamIds.isEmpty() ? null : streamIds.get(0); |
| } |
| |
| /** |
| * Gets the System associated with the specified streamId. |
| * It first looks for the property |
| * streams.{streamId}.system |
| * <p> |
| * If no value was provided, it uses |
| * job.default.system |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return the system name associated with the stream or null. |
| */ |
| public String getSystem(String streamId) { |
| String system = get(String.format(SYSTEM_FOR_STREAM_ID, streamId)); |
| return (system != null) ? system : new JobConfig(this).getDefaultSystem().orElse(null); |
| } |
| |
| /** |
| * Gets the physical name for the specified streamId. |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return the physical identifier for the stream or the default if it is undefined. |
| */ |
| public String getPhysicalName(String streamId) { |
| // use streamId as the default physical name |
| return getOrDefault(String.format(PHYSICAL_NAME_FOR_STREAM_ID, streamId), streamId); |
| } |
| |
| |
| public Optional<String> getStreamMsgSerde(SystemStream systemStream) { |
| return nonEmptyOption(getSamzaProperty(systemStream, MSG_SERDE)); |
| } |
| |
| public Optional<String> getStreamKeySerde(SystemStream systemStream) { |
| return nonEmptyOption(getSamzaProperty(systemStream, KEY_SERDE)); |
| } |
| |
| public boolean getResetOffset(SystemStream systemStream) { |
| String resetOffset = getSamzaProperty(systemStream, CONSUMER_RESET_OFFSET, "false"); |
| if (!resetOffset.equalsIgnoreCase("true") && !resetOffset.equalsIgnoreCase("false")) { |
| LOG.warn("Got a .samza.reset.offset configuration for SystemStream {} that is not true or false (was {})." + |
| " Defaulting to false.", systemStream, resetOffset); |
| |
| resetOffset = "false"; |
| } |
| return Boolean.valueOf(resetOffset); |
| } |
| |
| /** |
| * Determines if a Samza property is specified. |
| * See getSamzaProperty(SystemStream, String). |
| * |
| * @param systemStream the SystemStream for the property value to check |
| * @param property the samza property key (including the "samza." prefix); for example, for both |
| * "streams.streamId.samza.prop.key" and "systems.system.streams.streamName.samza.prop.key", this |
| * argument should have the value "samza.prop.key" |
| */ |
| protected boolean containsSamzaProperty(SystemStream systemStream, String property) { |
| if (!property.startsWith(SAMZA_PROPERTY)) { |
| throw new IllegalArgumentException( |
| String.format("Attempt to fetch a non samza property for SystemStream %s named %s", systemStream, property)); |
| } |
| return getSamzaProperty(systemStream, property) != null; |
| } |
| |
| public boolean isResetOffsetConfigured(SystemStream systemStream) { |
| return containsSamzaProperty(systemStream, CONSUMER_RESET_OFFSET); |
| } |
| |
| public Optional<String> getDefaultStreamOffset(SystemStream systemStream) { |
| return Optional.ofNullable(getSamzaProperty(systemStream, CONSUMER_OFFSET_DEFAULT)); |
| } |
| |
| public boolean isDefaultStreamOffsetConfigured(SystemStream systemStream) { |
| return containsSamzaProperty(systemStream, CONSUMER_OFFSET_DEFAULT); |
| } |
| |
| public boolean getBootstrapEnabled(SystemStream systemStream) { |
| return Boolean.parseBoolean(getSamzaProperty(systemStream, BOOTSTRAP)); |
| } |
| |
| public boolean getBroadcastEnabled(SystemStream systemStream) { |
| return Boolean.parseBoolean(getSamzaProperty(systemStream, BROADCAST)); |
| } |
| |
| public int getPriority(SystemStream systemStream) { |
| return Integer.parseInt(getSamzaProperty(systemStream, PRIORITY, "-1")); |
| } |
| |
| /** |
| * A streamId is translated to a SystemStream by looking up its System and physicalName. It |
| * will use the streamId as the stream name if the physicalName doesn't exist. |
| */ |
| public SystemStream streamIdToSystemStream(String streamId) { |
| return new SystemStream(getSystem(streamId), getPhysicalName(streamId)); |
| } |
| |
| |
| /** |
| * Returns a list of all SystemStreams that have a serde defined from the config file. |
| */ |
| public Set<SystemStream> getSerdeStreams(String systemName) { |
| Config subConf = subset(String.format("systems.%s.streams.", systemName, true)); |
| Set<SystemStream> legacySystemStreams = subConf.keySet().stream() |
| .filter(k -> k.endsWith(MSG_SERDE) || k.endsWith(KEY_SERDE)) |
| .map(k -> { |
| String streamName = k.substring(0, k.length() - 16 /* .samza.XXX.serde length */); |
| return new SystemStream(systemName, streamName); |
| }) |
| .collect(Collectors.toSet()); |
| |
| Set<SystemStream> systemStreams = subset(STREAMS_PREFIX).keySet().stream() |
| .filter(k -> k.endsWith(MSG_SERDE) || k.endsWith(KEY_SERDE)) |
| .map(k -> k.substring(0, k.length() - 16 /* .samza.XXX.serde length */)) |
| .filter(streamId -> systemName.equals(getSystem(streamId))) |
| .map(streamId -> streamIdToSystemStream(streamId)).collect(Collectors.toSet()); |
| |
| return Sets.union(legacySystemStreams, systemStreams).immutableCopy(); |
| } |
| |
| /* Gets the properties for the streamId which are not Samza properties (i.e. do not have a "samza." prefix). This |
| * includes current and legacy config styles. |
| * This will return a Config with the properties that match the following formats (if a property is specified through |
| * multiple formats, priority is top to bottom): |
| * 1) "streams.{streamId}.{property}" |
| * 2) "systems.{systemName}.streams.{streamName}.{property}" where systemName is the system mapped to the streamId in |
| * the config and streamName is the physical stream name mapped to the stream id |
| * 3) "systems.{systemName}.default.stream.{property}" where systemName is the system mapped to the streamId in the |
| * config |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return the merged map of config properties from both the legacy and new config styles |
| */ |
| public Config getStreamProperties(String streamId) { |
| MapConfig allProperties = getAllStreamProperties(streamId); |
| Config samzaProperties = allProperties.subset(SAMZA_PROPERTY, false); |
| Map<String, String> filteredStreamProperties = |
| allProperties.entrySet().stream().filter(kv -> !samzaProperties.containsKey(kv.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
| return new MapConfig(filteredStreamProperties); |
| } |
| |
| /** |
| * Gets the boolean flag of whether the specified streamId is an intermediate stream |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return true if the stream is intermediate |
| */ |
| public boolean getIsIntermediateStream(String streamId) { |
| return getBoolean(String.format(IS_INTERMEDIATE_FOR_STREAM_ID, streamId), false); |
| } |
| |
| /** |
| * Gets the boolean flag of whether the committed messages specified streamId can be deleted |
| * |
| * @param streamId the identifier for the stream in the config. |
| * @return true if the committed messages of the stream can be deleted |
| */ |
| public boolean getDeleteCommittedMessages(String streamId) { |
| return getBoolean(String.format(DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID, streamId), false); |
| } |
| |
| public boolean getIsBounded(String streamId) { |
| return getBoolean(String.format(IS_BOUNDED_FOR_STREAM_ID, streamId), false); |
| } |
| |
| /** |
| * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza |
| * interacts with the stream, as opposed to a property of the stream itself. |
| * |
| * First, tries to map the systemStream to a streamId. This will only find a streamId if the stream is a physical name |
| * (explicitly mapped physical name or a stream id without a physical name mapping). That means this will not map a |
| * stream id to itself if there is a mapping from the stream id to a physical stream name. This also requires that the |
| * stream id is mapped to a system in the config. |
| * If a stream id is found: |
| * 1) Look for "streams.{streamId}.{property}" for the stream id. |
| * 2) Otherwise, look for "systems.{systemName}.streams.{streamName}.{property}" in which the systemName is the system |
| * mapped to the stream id and the streamName is the physical stream name for the stream id. |
| * 3) Otherwise, look for "systems.{systemName}.default.stream.{property}" in which the systemName is the system |
| * mapped to the stream id. |
| * If a stream id was not found or no property could be found using the above keys: |
| * 1) Look for "systems.{systemName}.streams.{streamName}.{property}" in which the systemName is the system in the |
| * input systemStream and the streamName is the stream from the input systemStream. |
| * 2) Otherwise, look for "systems.{systemName}.default.stream.{property}" in which the systemName is the system |
| * in the input systemStream. |
| * 3) or null |
| * |
| * @param systemStream the SystemStream for which the property value will be retrieved. |
| * @param property the samza property key (including the "samza." prefix); for example, for both |
| * "streams.streamId.samza.prop.key" and "systems.system.streams.streamName.samza.prop.key", this |
| * argument should have the value "samza.prop.key" |
| * @return the property value |
| */ |
| private String getSamzaProperty(SystemStream systemStream, String property) { |
| if (!property.startsWith(SAMZA_PROPERTY)) { |
| throw new IllegalArgumentException( |
| String.format("Attempt to fetch a non samza property for SystemStream %s named %s", systemStream, property)); |
| } |
| |
| String streamVal = getAllStreamProperties(systemStreamToStreamId(systemStream)).get(property); |
| return (streamVal != null) ? streamVal : getSystemStreamProperties(systemStream.getSystem(), systemStream.getStream()).get(property); |
| } |
| |
| /** |
| * Gets a Samza property, with a default value used if no property value is found. |
| * See getSamzaProperty(SystemStream, String). |
| * |
| * @param systemStream the SystemStream for which the property value will be retrieved. |
| * @param property the samza property key (including the "samza." prefix); for example, for both |
| * "streams.streamId.samza.prop.key" and "systems.system.streams.streamName.samza.prop.key", this |
| * argument should have the value "samza.prop.key" |
| * @param defaultValue the default value to use if the property value is not found |
| */ |
| private String getSamzaProperty(SystemStream systemStream, String property, String defaultValue) { |
| String streamVal = getSamzaProperty(systemStream, property); |
| |
| return streamVal != null ? streamVal : defaultValue; |
| } |
| } |