| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system; |
| |
| import com.google.common.base.Joiner; |
| |
| import java.io.Serializable; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| |
| /** |
| * StreamSpec is a blueprint for creating, validating, or simply describing a stream in the runtime environment. |
| * |
| * It has specific attributes for common behaviors that Samza uses. |
| * |
| * It also includes a map of configurations which may be system-specific. |
| * |
| * It is immutable by design. |
| */ |
| public class StreamSpec implements Serializable { |
| |
| private static final int DEFAULT_PARTITION_COUNT = 1; |
| |
| // Internal changelog stream id. It is used for creating changelog StreamSpec. |
| private static final String CHANGELOG_STREAM_ID = "samza-internal-changelog-stream-id"; |
| |
| // Internal coordinator stream id. It is used for creating coordinator StreamSpec. |
| private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id"; |
| |
| // Internal checkpoint stream id. It is used for creating checkpoint StreamSpec. |
| private static final String CHECKPOINT_STREAM_ID = "samza-internal-checkpoint-stream-id"; |
| |
| // Internal stream appender stream id. It is used for creating stream appender StreamSpec. |
| private static final String STREAM_APPENDER_ID = "samza-internal-stream-appender-stream-id"; |
| |
| /** |
| * Unique identifier for the stream in a Samza application. |
| * This identifier is used as a key for stream properties in the |
| * job config and to distinguish between streams in a graph. |
| */ |
| private final String id; |
| |
| /** |
| * The System name on which this stream will exist. Corresponds to a named implementation of the |
| * Samza System abstraction. |
| */ |
| private final String systemName; |
| |
| /** |
| * The physical identifier for the stream. This is the identifier that will be used in remote |
| * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it |
| * might be a file URN. |
| */ |
| private final String physicalName; |
| |
| /** |
| * The number of partitions for the stream. |
| */ |
| private final int partitionCount; |
| |
| /** |
| * A set of all system-specific configurations for the stream. |
| */ |
| private final Map<String, String> config; |
| |
| /** |
| * @param id The application-unique logical identifier for the stream. It is used to distinguish between |
| * streams in a Samza application so it must be unique in the context of one deployable unit. |
| * It does not need to be globally unique or unique with respect to a host. |
| * |
| * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote |
| * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it |
| * might be a file URN. |
| * |
| * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the |
| * Samza System abstraction. See {@link SystemFactory} |
| */ |
| public StreamSpec(String id, String physicalName, String systemName) { |
| this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap()); |
| } |
| |
| /** |
| * |
| * @param id The application-unique logical identifier for the stream. It is used to distinguish between |
| * streams in a Samza application so it must be unique in the context of one deployable unit. |
| * It does not need to be globally unique or unique with respect to a host. |
| * |
| * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote |
| * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it |
| * might be a file URN. |
| * |
| * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the |
| * Samza System abstraction. See {@link SystemFactory} |
| * |
| * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. |
| */ |
| public StreamSpec(String id, String physicalName, String systemName, int partitionCount) { |
| this(id, physicalName, systemName, partitionCount, Collections.emptyMap()); |
| } |
| |
| /** |
| * @param id The application-unique logical identifier for the stream. It is used to distinguish between |
| * streams in a Samza application so it must be unique in the context of one deployable unit. |
| * It does not need to be globally unique or unique with respect to a host. |
| * |
| * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote |
| * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it |
| * might be a file URN. |
| * |
| * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the |
| * Samza System abstraction. See {@link SystemFactory} |
| * |
| * @param config A map of properties for the stream. These may be System-specfic. |
| */ |
| public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) { |
| this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config); |
| } |
| |
| /** |
| * @param id The application-unique logical identifier for the stream. It is used to distinguish between |
| * streams in a Samza application so it must be unique in the context of one deployable unit. |
| * It does not need to be globally unique or unique with respect to a host. |
| * |
| * @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote |
| * systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it |
| * might be a file URN. |
| * |
| * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the |
| * Samza System abstraction. See {@link SystemFactory} |
| * |
| * @param partitionCount The number of partitions for the stream. A value of {@code 1} indicates unpartitioned. |
| * |
| * @param config A map of properties for the stream. These may be System-specfic. |
| */ |
| public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) { |
| validateLogicalIdentifier("streamId", id); |
| validateLogicalIdentifier("systemName", systemName); |
| |
| // partition count being 0 is a valid use case in Hadoop when the output stream is an empty folder |
| if (partitionCount < 0) { |
| throw new IllegalArgumentException("Parameter 'partitionCount' must be >= 0"); |
| } |
| |
| this.id = id; |
| this.systemName = systemName; |
| this.physicalName = physicalName; |
| this.partitionCount = partitionCount; |
| |
| if (config != null) { |
| this.config = Collections.unmodifiableMap(new HashMap<>(config)); |
| } else { |
| this.config = Collections.emptyMap(); |
| } |
| } |
| |
| /** |
| * Copies this StreamSpec, but applies a new partitionCount. |
| * |
| * This method is not static s.t. subclasses can override it. |
| * |
| * @param partitionCount The partitionCount for the returned StreamSpec. |
| * @return A copy of this StreamSpec with the specified partitionCount. |
| */ |
| public StreamSpec copyWithPartitionCount(int partitionCount) { |
| return new StreamSpec(id, physicalName, systemName, partitionCount, config); |
| } |
| |
| public StreamSpec copyWithPhysicalName(String physicalName) { |
| return new StreamSpec(id, physicalName, systemName, partitionCount, config); |
| } |
| |
| public String getId() { |
| return id; |
| } |
| |
| public String getSystemName() { |
| return systemName; |
| } |
| |
| public String getPhysicalName() { |
| return physicalName; |
| } |
| |
| public int getPartitionCount() { |
| return partitionCount; |
| } |
| |
| public Map<String, String> getConfig() { |
| return config; |
| } |
| |
| public String get(String propertyName) { |
| return config.get(propertyName); |
| } |
| |
| public String getOrDefault(String propertyName, String defaultValue) { |
| return config.getOrDefault(propertyName, defaultValue); |
| } |
| |
| public SystemStream toSystemStream() { |
| return new SystemStream(systemName, physicalName); |
| } |
| |
| public boolean isChangeLogStream() { |
| return id.equals(CHANGELOG_STREAM_ID); |
| } |
| |
| public boolean isCoordinatorStream() { |
| return id.equals(COORDINATOR_STREAM_ID); |
| } |
| |
| public boolean isCheckpointStream() { |
| return id.equals(CHECKPOINT_STREAM_ID); |
| } |
| |
| private void validateLogicalIdentifier(String identifierName, String identifierValue) { |
| if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) { |
| throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue)); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || !getClass().equals(o.getClass())) return false; |
| |
| StreamSpec that = (StreamSpec) o; |
| |
| return id.equals(that.id); |
| } |
| |
| @Override |
| public int hashCode() { |
| return id.hashCode(); |
| } |
| |
| public static StreamSpec createChangeLogStreamSpec(String physicalName, String systemName, int partitionCount) { |
| return new StreamSpec(CHANGELOG_STREAM_ID, physicalName, systemName, partitionCount); |
| } |
| |
| public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) { |
| return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1); |
| } |
| |
| public static StreamSpec createCheckpointStreamSpec(String physicalName, String systemName) { |
| return new StreamSpec(CHECKPOINT_STREAM_ID, physicalName, systemName, 1); |
| } |
| |
| public static StreamSpec createStreamAppenderStreamSpec(String physicalName, String systemName, int partitionCount) { |
| return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, partitionCount); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config)); |
| } |
| } |