blob: 956d34e14e069b986212640212fe5ad3270f1f23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.application.descriptors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.TableImpl;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.system.descriptors.StreamExpander;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
/**
* This class defines:
* 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
* and {@link Table} to create the DAG of transforms.
* 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
*/
public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
implements StreamApplicationDescriptor {
// We use a LHMs for deterministic order in initializing and closing operators.
private final Set<String> intermediateBroadcastStreamIds = new HashSet<>();
private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
private final Set<String> operatorIds = new HashSet<>();
/**
* The 0-based position of the next operator in the graph.
* Part of the unique ID for each OperatorSpec in the graph.
* Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
*/
private int nextOpNum = 0;
public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
super(userApp, config);
userApp.describe(this);
}
@Override
public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
Optional<StreamExpander> expander = systemDescriptor.getExpander();
if (expander.isPresent()) {
return expander.get().apply(this, inputDescriptor);
}
// TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
addInputDescriptor(inputDescriptor);
String streamId = inputDescriptor.getStreamId();
Serde serde = inputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
boolean isKeyed = serde instanceof KVSerde;
InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
InputOperatorSpec inputOperatorSpec =
OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
inputOperators.put(streamId, inputOperatorSpec);
return new MessageStreamImpl(this, inputOperators.get(streamId));
}
@Override
public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
addOutputDescriptor(outputDescriptor);
String streamId = outputDescriptor.getStreamId();
Serde serde = outputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
boolean isKeyed = serde instanceof KVSerde;
outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return outputStreams.get(streamId);
}
@Override
public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
addTableDescriptor(tableDescriptor);
if (tableDescriptor instanceof LocalTableDescriptor) {
LocalTableDescriptor localTableDescriptor = (LocalTableDescriptor) tableDescriptor;
getOrCreateTableSerdes(localTableDescriptor.getTableId(), localTableDescriptor.getSerde());
}
return new TableImpl(tableDescriptor);
}
@Override
public Set<String> getInputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
}
@Override
public Set<String> getOutputStreamIds() {
return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
}
@Override
public Set<String> getIntermediateBroadcastStreamIds() {
return Collections.unmodifiableSet(intermediateBroadcastStreamIds);
}
public Map<String, InputOperatorSpec> getInputOperators() {
return Collections.unmodifiableMap(inputOperators);
}
public Map<String, OutputStreamImpl> getOutputStreams() {
return Collections.unmodifiableMap(outputStreams);
}
public OperatorSpecGraph getOperatorSpecGraph() {
return new OperatorSpecGraph(this);
}
/**
* Gets the unique ID for the next operator in the graph. The ID is of the following format:
* jobName-jobId-opCode-(userDefinedId|nextOpNum);
*
* @param opCode the {@link OpCode} of the next operator
* @param userDefinedId the optional user-provided name of the next operator or null
* @return the unique ID for the next operator in the graph
*/
public String getNextOpId(OpCode opCode, String userDefinedId) {
if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
}
ApplicationConfig applicationConfig = new ApplicationConfig(getConfig());
String nextOpId = String.format("%s-%s-%s-%s",
applicationConfig.getAppName(),
applicationConfig.getAppId(),
opCode.name().toLowerCase(),
StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
if (!operatorIds.add(nextOpId)) {
throw new SamzaException(
String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
}
nextOpNum++;
return nextOpId;
}
/**
* Gets the unique ID for the next operator in the graph. The ID is of the following format:
* jobName-jobId-opCode-nextOpNum;
*
* @param opCode the {@link OpCode} of the next operator
* @return the unique ID for the next operator in the graph
*/
public String getNextOpId(OpCode opCode) {
return getNextOpId(opCode, null);
}
/**
* Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
* An intermediate {@link MessageStream} is both an output and an input stream.
*
* @param streamId the id of the stream to be created.
* @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
* is used.
* @param isBroadcast whether the stream is a broadcast stream.
* @param <M> the type of messages in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStreamImpl}
*/
@VisibleForTesting
public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
Preconditions.checkNotNull(serde, "serde must not be null for intermediate stream: " + streamId);
Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
"getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
if (isBroadcast) {
intermediateBroadcastStreamIds.add(streamId);
}
boolean isKeyed = serde instanceof KVSerde;
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
.flatMap(SystemDescriptor::getTransformer).orElse(null);
InputOperatorSpec inputOperatorSpec =
OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
inputOperators.put(streamId, inputOperatorSpec);
outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
}
}