blob: dbc55c1e42a0c16c027b9e5b9a2a2743089b98b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
import org.apache.samza.operators.spec.BroadcastOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
/**
* The {@link MessageStream} implementation that lets users describe their logical DAG.
* Users can obtain an instance by calling {@link StreamApplicationDescriptorImpl#getInputStream}.
* <p>
* Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows
* users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl}
* presents an "edge-centric" (streams) view of the "node-centric" (specs) logical DAG for the users.
*
* @param <M> type of messages in this {@link MessageStream}
*/
public class MessageStreamImpl<M> implements MessageStream<M> {
/**
* The {@link StreamApplicationDescriptorImpl} that contains this {@link MessageStreamImpl}
*/
private final StreamApplicationDescriptorImpl streamAppDesc;
/**
* The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
*/
private final OperatorSpec operatorSpec;
public MessageStreamImpl(StreamApplicationDescriptorImpl streamAppDesc, OperatorSpec<?, M> operatorSpec) {
this.streamAppDesc = streamAppDesc;
this.operatorSpec = operatorSpec;
}
@Override
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.MAP);
StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.FILTER);
StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.FLAT_MAP);
StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <OM> MessageStream<OM> flatMapAsync(AsyncFlatMapFunction<? super M, ? extends OM> flatMapFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.ASYNC_FLAT_MAP);
AsyncFlatMapOperatorSpec<M, OM> op = OperatorSpecs.createAsyncOperatorSpec(flatMapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public void sink(SinkFunction<? super M> sinkFn) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SINK);
SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public MessageStream<M> sendTo(OutputStream<M> outputStream) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
(OutputStreamImpl<M>) outputStream, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
String opId = this.streamAppDesc.getNextOpId(OpCode.WINDOW, userDefinedId);
OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String userDefinedId) {
if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN, userDefinedId);
OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
JoinOperatorSpec<K, M, OM, JM> op =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde,
messageSerde, otherMessageSerde, ttl.toMillis(), opId);
this.operatorSpec.registerNextOperatorSpec(op);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn, Object ... args) {
String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN);
StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
((TableImpl) table).getTableId(), (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId, args);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec);
}
@Override
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
if (otherStreams.isEmpty()) return this;
String opId = this.streamAppDesc.getNextOpId(OpCode.MERGE);
StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId);
this.operatorSpec.registerNextOperatorSpec(op);
otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op));
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
String opId = this.streamAppDesc.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, false);
if (!intermediateStream.isKeyed()) {
// this can only happen when the default serde partitionBy variant is being used
throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
}
PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
return intermediateStream;
}
@Override
public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
SendToTableOperatorSpec<K, V> op =
OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), opId, args);
this.operatorSpec.registerNextOperatorSpec(op);
return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public MessageStream<M> broadcast(Serde<M> serde, String userDefinedId) {
String opId = this.streamAppDesc.getNextOpId(OpCode.BROADCAST, userDefinedId);
IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, true);
BroadcastOperatorSpec<M> broadcastOperatorSpec =
OperatorSpecs.createBroadCastOperatorSpec(intermediateStream.getOutputStream(), opId);
this.operatorSpec.registerNextOperatorSpec(broadcastOperatorSpec);
return intermediateStream;
}
/**
* Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
* @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
*/
@VisibleForTesting
public OperatorSpec<?, M> getOperatorSpec() {
return this.operatorSpec;
}
}