blob: 321ae737815bf689091d7d40be2f609d4c1f2b11 [file] [log] [blame]
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.streamlet.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import com.twitter.heron.streamlet.KVStreamlet;
import com.twitter.heron.streamlet.KeyValue;
import com.twitter.heron.streamlet.SerializableBiFunction;
import com.twitter.heron.streamlet.SerializableBinaryOperator;
import com.twitter.heron.streamlet.SerializableConsumer;
import com.twitter.heron.streamlet.SerializableFunction;
import com.twitter.heron.streamlet.SerializablePredicate;
import com.twitter.heron.streamlet.SerializableSupplier;
import com.twitter.heron.streamlet.SerializableTransformer;
import com.twitter.heron.streamlet.Sink;
import com.twitter.heron.streamlet.Source;
import com.twitter.heron.streamlet.Streamlet;
import com.twitter.heron.streamlet.Window;
import com.twitter.heron.streamlet.WindowConfig;
import com.twitter.heron.streamlet.impl.streamlets.ConsumerStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.FilterStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.LogStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.MapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.MapToKVStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.ReduceByWindowStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.RemapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SinkStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SourceStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SupplierStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.TransformStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.UnionStreamlet;
/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
* Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from
* static data(such as csv files, HDFS files), or for that matter any other
* source. They are also created by transforming existing Streamlets using
* operations such as map/flatMap, etc.
* Besides the tuples, a Streamlet has the following properties associated with it
* a) name. User assigned or system generated name to refer the streamlet
* b) nPartitions. Number of partitions that the streamlet is composed of. Thus the
* ordering of the tuples in a Streamlet is wrt the tuples within a partition.
* This allows the system to distribute each partition to different nodes across the cluster.
* A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each
* of these transformations operate on every tuple of the Streamlet and produce a new
* Streamlet. One can think of a transformation attaching itself to the stream and processing
* each tuple as they go by. Thus the parallelism of any operator is implicitly determined
* by the number of partitions of the stream that it is operating on. If a particular
* tranformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
public abstract class StreamletImpl<R> extends BaseStreamletImpl<Streamlet<R>>
implements Streamlet<R> {
private static final Logger LOG = Logger.getLogger(StreamletImpl.class.getName());
@Override
protected StreamletImpl<R> returnThis() {
return this;
}
/**
* Create a Streamlet based on the supplier function
* @param supplier The Supplier function to generate the elements
*/
static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supplier) {
return new SupplierStreamlet<T>(supplier);
}
/**
* Create a Streamlet based on the generator function
* @param generator The Generator function to generate the elements
*/
static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
return new SourceStreamlet<T>(generator);
}
/**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
* @param mapFn The Map Function that should be applied to each element
*/
@Override
public <T> Streamlet<T> map(SerializableFunction<? super R, ? extends T> mapFn) {
MapStreamlet<R, T> retval = new MapStreamlet<>(this, mapFn);
addChild(retval);
return retval;
}
/**
* Return a new KVStreamlet by applying mapFn to each element of this Streamlet.
* This differs from the above map transformation in that it returns a KVStreamlet
* instead of a plain Streamlet.
* @param mapFn The Map function that should be applied to each element
*/
@Override
public <K, V> KVStreamlet<K, V> mapToKV(SerializableFunction<? super R,
? extends KeyValue<K, V>> mapFn) {
MapToKVStreamlet<R, K, V> retval = new MapToKVStreamlet<>(this, mapFn);
addChild(retval);
return retval;
}
/**
* Return a new Streamlet by applying flatMapFn to each element of this Streamlet and
* flattening the result
* @param flatMapFn The FlatMap Function that should be applied to each element
*/
@Override
public <T> Streamlet<T> flatMap(
SerializableFunction<? super R, ? extends Iterable<? extends T>> flatMapFn) {
FlatMapStreamlet<R, T> retval = new FlatMapStreamlet<>(this, flatMapFn);
addChild(retval);
return retval;
}
/**
* Return a new Streamlet by applying the filterFn on each element of this streamlet
* and including only those elements that satisfy the filterFn
* @param filterFn The filter Function that should be applied to each element
*/
@Override
public Streamlet<R> filter(SerializablePredicate<? super R> filterFn) {
FilterStreamlet<R> retval = new FilterStreamlet<>(this, filterFn);
addChild(retval);
return retval;
}
/**
* Same as filter(Identity).setNumPartitions(nPartitions)
*/
@Override
public Streamlet<R> repartition(int numPartitions) {
return this.map((a) -> a).setNumPartitions(numPartitions);
}
/**
* A more generalized version of repartition where a user can determine which partitions
* any particular tuple should go to
*/
@Override
public Streamlet<R> repartition(int numPartitions,
SerializableBiFunction<? super R, Integer, List<Integer>> partitionFn) {
RemapStreamlet<R> retval = new RemapStreamlet<>(this, partitionFn);
retval.setNumPartitions(numPartitions);
addChild(retval);
return retval;
}
/**
* Clones the current Streamlet. It returns an array of numClones Streamlets where each
* Streamlet contains all the tuples of the current Streamlet
* @param numClones The number of clones to clone
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
List<Streamlet<R>> retval = new ArrayList<>();
for (int i = 0; i < numClones; ++i) {
retval.add(repartition(getNumPartitions()));
}
return retval;
}
/**
* Returns a new Streamlet by accumulating tuples of this streamlet over a WindowConfig
* windowConfig and applying reduceFn on those tuples
* @param windowConfig This is a specification of what kind of windowing strategy you like
* to have. Typical windowing strategies are sliding windows and tumbling windows
* @param reduceFn The reduceFn to apply over the tuples accumulated on a window
*/
@Override
public KVStreamlet<Window, R> reduceByWindow(WindowConfig windowConfig,
SerializableBinaryOperator<R> reduceFn) {
ReduceByWindowStreamlet<R> retval = new ReduceByWindowStreamlet<>(this,
windowConfig, reduceFn);
addChild(retval);
return retval;
}
/**
* Returns a new Streamlet thats the union of this and the ‘other’ streamlet. Essentially
* the new streamlet will contain tuples belonging to both Streamlets
*/
@Override
public Streamlet<R> union(Streamlet<? extends R> other) {
StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) other;
UnionStreamlet<R> retval = new UnionStreamlet<>(this, joinee);
addChild(retval);
joinee.addChild(retval);
return retval;
}
/**
* Logs every element of the streamlet using String.valueOf function
* Note that LogStreamlet is an empty streamlet. That is its a streamlet
* that does not contain any tuple. Thus this function returns void.
*/
@Override
public void log() {
LogStreamlet<R> logger = new LogStreamlet<>(this);
addChild(logger);
return;
}
/**
* Applies the consumer function for every element of this streamlet
* @param consumer The user supplied consumer function that is invoked for each element
*/
@Override
public void consume(SerializableConsumer<R> consumer) {
ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, consumer);
addChild(consumerStreamlet);
return;
}
/**
* Uses the sink to consume every element of this streamlet
* @param sink The Sink that consumes
*/
@Override
public void toSink(Sink<R> sink) {
SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
addChild(sinkStreamlet);
return;
}
/**
* Returns a new Streamlet by applying the transformFunction on each element of this streamlet.
* Before starting to cycle the transformFunction over the Streamlet, the open function is called.
* This allows the transform Function to do any kind of initialization/loading, etc.
* @param serializableTransformer The transformation function to be applied
* @param <T> The return type of the transform
* @return Streamlet containing the output of the transformFunction
*/
@Override
public <T> Streamlet<T> transform(
SerializableTransformer<? super R, ? extends T> serializableTransformer) {
TransformStreamlet<R, T> transformStreamlet =
new TransformStreamlet<>(this, serializableTransformer);
addChild(transformStreamlet);
return transformStreamlet;
}
/**
* Only used by the implementors
*/
protected StreamletImpl() {
super();
}
}