blob: 3f0e14126957e41ed0eb2d74c6245d595b39aba9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co._
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.util.Collector
/**
* [[ConnectedStreams]] represents two connected streams of (possibly) different data types.
* Connected streams are useful for cases where operations on one stream directly
* affect the operations on the other stream, usually via shared state between the streams.
*
* An example for the use of connected streams would be to apply rules that change over time
* onto another stream. One of the connected streams has the rules, the other stream the
* elements to apply the rules to. The operation on the connected stream maintains the
* current set of rules in the state. It may receive either a rule update and update the state
* or a data element and apply the rules in the state to the element.
*
* The connected stream can be conceptually viewed as a union stream of an Either type, that
* holds either the first stream's type or the second stream's type.
*/
@Public
class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
// ------------------------------------------------------
// Transformations
// ------------------------------------------------------
/**
* Applies a CoMap transformation on the connected streams.
*
* The transformation consists of two separate functions, where
* the first one is called for each element of the first connected stream,
* and the second one is called for each element of the second connected stream.
*
* @param fun1 Function called per element of the first input.
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R):
DataStream[R] = {
if (fun1 == null || fun2 == null) {
throw new NullPointerException("Map function must not be null.")
}
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
val comapper = new CoMapFunction[IN1, IN2, R] {
def map1(in1: IN1): R = cleanFun1(in1)
def map2(in2: IN2): R = cleanFun2(in2)
}
map(comapper)
}
/**
* Applies a CoMap transformation on these connected streams.
*
* The transformation calls [[CoMapFunction#map1]] for each element
* in the first stream and [[CoMapFunction#map2]] for each element
* of the second stream.
*
* On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoMapFunction]]
* to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]]
* and to additional life cycle methods.
*
* @param coMapper
* The CoMapFunction used to transform the two connected streams
* @return
* The resulting data stream
*/
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = {
if (coMapper == null) {
throw new NullPointerException("Map function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.map(coMapper, outType).asInstanceOf[JavaStream[R]])
}
/**
* Applies the given [[CoProcessFunction]] on the connected input streams,
* thereby creating a transformed output stream.
*
* The function will be called for every element in the input streams and can produce zero
* or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function,
* this function can also query the time and set timers. When reacting to the firing of set
* timers the function can directly emit elements and/or register yet more timers.
*
* @param coProcessFunction The [[CoProcessFunction]] that is called for each element
* in the stream.
* @return The transformed [[DataStream]].
*/
@PublicEvolving
def process[R: TypeInformation](
coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = {
if (coProcessFunction == null) {
throw new NullPointerException("CoProcessFunction function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.process(coProcessFunction, outType))
}
/**
* Applies the given [[KeyedCoProcessFunction]] on the connected input keyed streams,
* thereby creating a transformed output stream.
*
* The function will be called for every element in the input keyed streams and can produce
* zero or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function, this
* function can also query the time and set timers. When reacting to the firing of set timers
* the function can directly emit elements and/or register yet more timers.
*
* @param keyedCoProcessFunction The [[KeyedCoProcessFunction]] that is called for each element
* in the stream.
* @return The transformed [[DataStream]].
*/
@PublicEvolving
def process[K, R: TypeInformation](
keyedCoProcessFunction: KeyedCoProcessFunction[K, IN1, IN2, R]) : DataStream[R] = {
if (keyedCoProcessFunction == null) {
throw new NullPointerException("KeyedCoProcessFunction function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.process(keyedCoProcessFunction, outType))
}
/**
* Applies a CoFlatMap transformation on these connected streams.
*
* The transformation calls [[CoFlatMapFunction#flatMap1]] for each element
* in the first stream and [[CoFlatMapFunction#flatMap2]] for each element
* of the second stream.
*
* On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction]]
* to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]]
* and to additional life cycle methods.
*
* @param coFlatMapper
* The CoFlatMapFunction used to transform the two connected streams
* @return
* The resulting data stream.
*/
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
DataStream[R] = {
if (coFlatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.flatMap(coFlatMapper, outType).asInstanceOf[JavaStream[R]])
}
/**
* Applies a CoFlatMap transformation on the connected streams.
*
* The transformation consists of two separate functions, where
* the first one is called for each element of the first connected stream,
* and the second one is called for each element of the second connected stream.
*
* @param fun1 Function called per element of the first input.
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def flatMap[R: TypeInformation](
fun1: (IN1, Collector[R]) => Unit,
fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
if (fun1 == null || fun2 == null) {
throw new NullPointerException("FlatMap functions must not be null.")
}
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
}
flatMap(flatMapper)
}
/**
* Applies a CoFlatMap transformation on the connected streams.
*
* The transformation consists of two separate functions, where
* the first one is called for each element of the first connected stream,
* and the second one is called for each element of the second connected stream.
*
* @param fun1 Function called per element of the first input.
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def flatMap[R: TypeInformation](
fun1: IN1 => TraversableOnce[R],
fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
if (fun1 == null || fun2 == null) {
throw new NullPointerException("FlatMap functions must not be null.")
}
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
}
flatMap(flatMapper)
}
// ------------------------------------------------------
// grouping and partitioning
// ------------------------------------------------------
/**
* Keys the two connected streams together. After this operation, all
* elements with the same key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param keyPosition1 The first stream's key field
* @param keyPosition2 The second stream's key field
* @return The key-grouped connected streams
*/
def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.keyBy(keyPosition1, keyPosition2))
}
/**
* Keys the two connected streams together. After this operation, all
* elements with the same key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param keyPositions1 The first stream's key fields
* @param keyPositions2 The second stream's key fields
* @return The key-grouped connected streams
*/
def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.keyBy(keyPositions1, keyPositions2))
}
/**
* Keys the two connected streams together. After this operation, all
* elements with the same key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param field1 The first stream's key expression
* @param field2 The second stream's key expression
* @return The key-grouped connected streams
*/
def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.keyBy(field1, field2))
}
/**
* Keys the two connected streams together. After this operation, all
* elements with the same key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param fields1 The first stream's key expressions
* @param fields2 The second stream's key expressions
* @return The key-grouped connected streams
*/
def keyBy(fields1: Array[String], fields2: Array[String]): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.keyBy(fields1, fields2))
}
/**
* Keys the two connected streams together. After this operation, all
* elements with the same key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param fun1 The first stream's key function
* @param fun2 The second stream's key function
* @return The key-grouped connected streams
*/
def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY):
ConnectedStreams[IN1, IN2] = {
val keyType = implicitly[TypeInformation[KEY]]
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
val keyExtractor1 = new JavaKeySelector[IN1, KEY](cleanFun1)
val keyExtractor2 = new JavaKeySelector[IN2, KEY](cleanFun2)
asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2, keyType))
}
/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
}
@PublicEvolving
def transform[R: TypeInformation](
functionName: String,
operator: TwoInputStreamOperator[IN1, IN2, R]): DataStream[R] = {
asScalaStream(javaStream.transform(functionName, implicitly[TypeInformation[R]], operator))
}
}
@Internal
class JavaKeySelector[IN, K](private[this] val fun: IN => K) extends KeySelector[IN, K] {
override def getKey(value: IN): K = fun(value)
}