blob: c12237148402408bc84807ba03e70d28c65a59cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system;
import com.google.common.base.Joiner;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* StreamSpec is a blueprint for creating, validating, or simply describing a stream in the runtime environment.
*
* It has specific attributes for common behaviors that Samza uses.
*
* It also includes a map of configurations which may be system-specific.
*
* It is immutable by design.
*/
public class StreamSpec implements Serializable {
private static final int DEFAULT_PARTITION_COUNT = 1;
// Internal changelog stream id. It is used for creating changelog StreamSpec.
private static final String CHANGELOG_STREAM_ID = "samza-internal-changelog-stream-id";
// Internal coordinator stream id. It is used for creating coordinator StreamSpec.
private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id";
// Internal checkpoint stream id. It is used for creating checkpoint StreamSpec.
private static final String CHECKPOINT_STREAM_ID = "samza-internal-checkpoint-stream-id";
// Internal stream appender stream id. It is used for creating stream appender StreamSpec.
private static final String STREAM_APPENDER_ID = "samza-internal-stream-appender-stream-id";
/**
* Unique identifier for the stream in a Samza application.
* This identifier is used as a key for stream properties in the
* job config and to distinguish between streams in a graph.
*/
private final String id;
/**
* The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction.
*/
private final String systemName;
/**
* The physical identifier for the stream. This is the identifier that will be used in remote
* systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
* might be a file URN.
*/
private final String physicalName;
/**
* The number of partitions for the stream.
*/
private final int partitionCount;
/**
* A set of all system-specific configurations for the stream.
*/
private final Map<String, String> config;
/**
* @param id The application-unique logical identifier for the stream. It is used to distinguish between
* streams in a Samza application so it must be unique in the context of one deployable unit.
* It does not need to be globally unique or unique with respect to a host.
*
* @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote
* systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
* might be a file URN.
*
* @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction. See {@link SystemFactory}
*/
public StreamSpec(String id, String physicalName, String systemName) {
this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap());
}
/**
*
* @param id The application-unique logical identifier for the stream. It is used to distinguish between
* streams in a Samza application so it must be unique in the context of one deployable unit.
* It does not need to be globally unique or unique with respect to a host.
*
* @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote
* systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
* might be a file URN.
*
* @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction. See {@link SystemFactory}
*
* @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
*/
public StreamSpec(String id, String physicalName, String systemName, int partitionCount) {
this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
}
/**
* @param id The application-unique logical identifier for the stream. It is used to distinguish between
* streams in a Samza application so it must be unique in the context of one deployable unit.
* It does not need to be globally unique or unique with respect to a host.
*
* @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote
* systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
* might be a file URN.
*
* @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction. See {@link SystemFactory}
*
* @param config A map of properties for the stream. These may be System-specfic.
*/
public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) {
this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
}
/**
* @param id The application-unique logical identifier for the stream. It is used to distinguish between
* streams in a Samza application so it must be unique in the context of one deployable unit.
* It does not need to be globally unique or unique with respect to a host.
*
* @param physicalName The physical identifier for the stream. This is the identifier that will be used in remote
* systems to identify the stream. In Kafka this would be the topic name whereas in HDFS it
* might be a file URN.
*
* @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction. See {@link SystemFactory}
*
* @param partitionCount The number of partitions for the stream. A value of {@code 1} indicates unpartitioned.
*
* @param config A map of properties for the stream. These may be System-specfic.
*/
public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) {
validateLogicalIdentifier("streamId", id);
validateLogicalIdentifier("systemName", systemName);
// partition count being 0 is a valid use case in Hadoop when the output stream is an empty folder
if (partitionCount < 0) {
throw new IllegalArgumentException("Parameter 'partitionCount' must be >= 0");
}
this.id = id;
this.systemName = systemName;
this.physicalName = physicalName;
this.partitionCount = partitionCount;
if (config != null) {
this.config = Collections.unmodifiableMap(new HashMap<>(config));
} else {
this.config = Collections.emptyMap();
}
}
/**
* Copies this StreamSpec, but applies a new partitionCount.
*
* This method is not static s.t. subclasses can override it.
*
* @param partitionCount The partitionCount for the returned StreamSpec.
* @return A copy of this StreamSpec with the specified partitionCount.
*/
public StreamSpec copyWithPartitionCount(int partitionCount) {
return new StreamSpec(id, physicalName, systemName, partitionCount, config);
}
public StreamSpec copyWithPhysicalName(String physicalName) {
return new StreamSpec(id, physicalName, systemName, partitionCount, config);
}
public String getId() {
return id;
}
public String getSystemName() {
return systemName;
}
public String getPhysicalName() {
return physicalName;
}
public int getPartitionCount() {
return partitionCount;
}
public Map<String, String> getConfig() {
return config;
}
public String get(String propertyName) {
return config.get(propertyName);
}
public String getOrDefault(String propertyName, String defaultValue) {
return config.getOrDefault(propertyName, defaultValue);
}
public SystemStream toSystemStream() {
return new SystemStream(systemName, physicalName);
}
public boolean isChangeLogStream() {
return id.equals(CHANGELOG_STREAM_ID);
}
public boolean isCoordinatorStream() {
return id.equals(COORDINATOR_STREAM_ID);
}
public boolean isCheckpointStream() {
return id.equals(CHECKPOINT_STREAM_ID);
}
private void validateLogicalIdentifier(String identifierName, String identifierValue) {
if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !getClass().equals(o.getClass())) return false;
StreamSpec that = (StreamSpec) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
public static StreamSpec createChangeLogStreamSpec(String physicalName, String systemName, int partitionCount) {
return new StreamSpec(CHANGELOG_STREAM_ID, physicalName, systemName, partitionCount);
}
public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) {
return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1);
}
public static StreamSpec createCheckpointStreamSpec(String physicalName, String systemName) {
return new StreamSpec(CHECKPOINT_STREAM_ID, physicalName, systemName, 1);
}
public static StreamSpec createStreamAppenderStreamSpec(String physicalName, String systemName, int partitionCount) {
return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, partitionCount);
}
@Override
public String toString() {
return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config));
}
}