blob: 8fd1b7a261ba8b8979c7d3c0f8d4b31ea6fdbb21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
/**
* A stream of messages that can be transformed into another {@link MessageStream}.
* <p>
* A {@link MessageStream} corresponding to an input stream can be obtained using
* {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getInputStream}.
*
* @param <M> the type of messages in this stream
*/
@InterfaceStability.Evolving
public interface MessageStream<M> {
/**
* Applies the provided 1:1 function to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
*
* @param mapFn the function to transform a message to another message
* @param <OM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<OM> MessageStream<OM> map(MapFunction<? super M, ? extends OM> mapFn);
/**
* Applies the provided 1:n function to transform a message in this {@link MessageStream}
* to n messages in the transformed {@link MessageStream}
*
* @param flatMapFn the function to transform a message to zero or more messages
* @param <OM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<OM> MessageStream<OM> flatMap(FlatMapFunction<? super M, ? extends OM> flatMapFn);
/**
* Applies the provided 1:n transformation asynchronously to this {@link MessageStream}. The asynchronous transformation
* is specified through {@link AsyncFlatMapFunction}. The results are emitted to the downstream operators upon the
* completion of the {@link CompletionStage} returned from the {@link AsyncFlatMapFunction}.
* <p>
* The operator can operate in two modes depending on <i>task.max.concurrency.</i>.
* <ul>
* <li>
* Serialized (task.max.concurrency=1) - In this mode, each invocation of the {@link AsyncFlatMapFunction} is guaranteed
* to happen-before next invocation.
* </li>
* <li>
* Parallel (task.max.concurrency&gt;1) - In this mode, multiple invocations can happen in parallel without happens-before guarantee
* and the {@link AsyncFlatMapFunction} is required synchronize any shared state. The operator doesn't provide any ordering guarantees.
* i.e The results corresponding to each invocation of this operator might not be emitted in the same order as invocations.
* By extension, the operator chain that follows it also doesn't have any ordering guarantees.
* </li>
* </ul>
*
* @param asyncFlatMapFn the async function to transform a message to zero or more messages
* @param <OM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
<OM> MessageStream<OM> flatMapAsync(AsyncFlatMapFunction<? super M, ? extends OM> asyncFlatMapFn);
/**
* Applies the provided function to messages in this {@link MessageStream} and returns the
* filtered {@link MessageStream}.
* <p>
* The {@link FilterFunction} is a predicate which determines whether a message in this {@link MessageStream}
* should be retained in the filtered {@link MessageStream}.
*
* @param filterFn the predicate to filter messages from this {@link MessageStream}.
* @return the filtered {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<? super M> filterFn);
/**
* Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
* <p>
* Offers more control over processing and sending messages than {@link #sendTo(OutputStream)} since
* the {@link SinkFunction} has access to the {@link org.apache.samza.task.MessageCollector} and
* {@link org.apache.samza.task.TaskCoordinator}.
*
* This can also be used to send output to a system (e.g. a database) that doesn't have a corresponding
* Samza SystemProducer implementation.
*
* @param sinkFn the function to send messages in this stream to an external system
*/
void sink(SinkFunction<? super M> sinkFn);
/**
* Allows sending messages in this {@link MessageStream} to an {@link OutputStream} and then propagates this
* {@link MessageStream} to the next chained operator
* <p>
* When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key.
* When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key.
* <p>
* Note: The message will be written but not flushed to the underlying output system before its propagated to the
* chained operators. Messages retain the original partitioning scheme when propogated to next operator.
*
* @param outputStream the output stream to send messages to
* @return this {@link MessageStream}
*/
MessageStream<M> sendTo(OutputStream<M> outputStream);
/**
* Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
* (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
* {@link WindowPane}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
* <p>
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
* state from the previous version, and changed for the new operator to discard the state from the previous version.
*
* @param window the window to group and process messages from this {@link MessageStream}
* @param id the unique id of this operator in this application
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
* panes are emitted per-key.
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
* @return the windowed {@link MessageStream}
*/
<K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String id);
/**
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided
* pairwise {@link JoinFunction}.
* <p>
* Messages in each stream are retained for the provided {@code ttl} and join results are
* emitted as matches are found.
* <p>
* Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
* <p>
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
* state from the previous version, and changed for the new operator to discard the state from the previous version.
*
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param keySerde the serde for the join key
* @param messageSerde the serde for messages in this stream
* @param otherMessageSerde the serde for messages in the other stream
* @param ttl the ttl for messages in each stream
* @param id the unique id of this operator in this application
* @param <K> the type of join key
* @param <OM> the type of messages in the other stream
* @param <JM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
<K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String id);
/**
* Joins this {@link MessageStream} with another {@link Table} using the provided
* pairwise {@link StreamTableJoinFunction}.
* <p>
* The type of input message is expected to be {@link KV}.
* <p>
* Records are looked up from the joined table using the join key, join function
* is applied and join results are emitted as matches are found.
* <p>
* The join function allows implementation of both inner and left outer join. A null will be
* passed to the join function, if no record matching the join key is found in the table.
* The join function can choose to return an instance of JM (outer left join) or null
* (inner join); if null is returned, it won't be processed further.
* <p>
* Both the input stream and table being joined must have the same number of partitions,
* and should be partitioned by the same join key.
* <p>
*
* @param table the table being joined
* @param joinFn the join function
* @param args additional arguments passed to the table
* @param <K> the type of join key
* @param <R> the type of table record
* @param <JM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
<K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn, Object ... args);
/**
* Merges all {@code otherStreams} with this {@link MessageStream}.
* <p>
* The merged stream contains messages from all streams in the order they arrive.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams);
/**
* Merges all {@code streams}.
* <p>
* The merged {@link MessageStream} contains messages from all {@code streams} in the order they arrive.
*
* @param streams {@link MessageStream}s to be merged
* @param <T> the type of messages in each of the streams
* @return the merged {@link MessageStream}
* @throws IllegalArgumentException if {@code streams} is empty
*/
static <T> MessageStream<T> mergeAll(Collection<? extends MessageStream<? extends T>> streams) {
if (streams.isEmpty()) {
throw new IllegalArgumentException("No streams to merge.");
}
ArrayList<MessageStream<T>> messageStreams = new ArrayList<>((Collection<MessageStream<T>>) streams);
MessageStream<T> firstStream = messageStreams.remove(0);
return firstStream.merge(messageStreams);
}
/**
* Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new
* intermediate stream on the default system provided via
* {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#withDefaultSystem}.
* This intermediate stream is both an output and input to the job.
* <p>
* Uses the provided {@link KVSerde} for serialization of keys and values.
* <p>
* The number of partitions for this intermediate stream is determined as follows:
* If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known,
* then number of partitions for this stream is set to the number of partitions in the other input stream.
* Else, the number of partitions is set to the value of the {@code job.intermediate.stream.partitions}
* configuration, if present.
* Else, the number of partitions is set to to the max of number of partitions for all input and output streams
* (excluding intermediate streams).
* <p>
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
* state from the previous version, and changed for the new operator to discard the state from the previous version.
* <p>
* Unlike {@link #sendTo}, messages with a null key are all sent to partition 0.
*
* @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message.
* Messages with a null key are all sent to partition 0.
* @param valueExtractor the {@link MapFunction} to extract the value from the input message
* @param serde the {@link KVSerde} to use for (de)serializing the key and value.
* @param id the unique id of this operator in this application
* @param <K> the type of output key
* @param <V> the type of output value
* @return the repartitioned {@link MessageStream}
*/
<K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
/**
* Allows sending messages in this {@link MessageStream} to a {@link Table} and then propagates this
* {@link MessageStream} to the next chained operator. The type of input message is expected to be {@link KV},
* otherwise a {@link ClassCastException} will be thrown.
* <p>
* Note: The message will be written but may not be flushed to the underlying table before its propagated to the
* chained operators. Whether the message can be read back from the Table in the chained operator depends on whether
* it was flushed and whether the Table offers read after write consistency. Messages retain the original partitioning
* scheme when propogated to next operator.
*
* @param table the table to write messages to
* @param args additional arguments passed to the table
* @param <K> the type of key in the table
* @param <V> the type of record value in the table
* @return this {@link MessageStream}
*/
<K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object ... args);
/**
* Broadcasts messages in this {@link MessageStream} to all instances of its downstream operators..
* @param serde the {@link Serde} to use for (de)serializing the message.
* @param id id the unique id of this operator in this application
* @return the broadcast {@link MessageStream}
*/
MessageStream<M> broadcast(Serde<M> serde, String id);
}