blob: f69f642077ccd3b2620565853dd1a32948dc83db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
* A streaming co-group operation is evaluated over elements in a window.
*
* To finalize the co-group operation you also need to specify a [[KeySelector]] for
* both the first and second input and a [[WindowAssigner]]
*
* Note: Right now, the groups are being built in memory so you need to ensure that they don't
* get too big. Otherwise the JVM might crash.
*
* Example:
*
* {{{
* val one: DataStream[(String, Int)] = ...
* val two: DataStream[(String, Int)] = ...
*
* val result = one.coGroup(two)
* .where(new MyFirstKeySelector())
* .equalTo(new MyFirstKeySelector())
* .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .apply(new MyCoGroupFunction())
* } }}}
*/
@Public
class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
/**
* Specifies a [[KeySelector]] for elements from the first input.
*/
def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY] = {
val cleanFun = clean(keySelector)
val keyType = implicitly[TypeInformation[KEY]]
val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T1) = cleanFun(in)
override def getProducedType: TypeInformation[KEY] = keyType
}
new Where[KEY](javaSelector, keyType)
}
/**
* A co-group operation that has [[KeySelector]]s defined for the first input.
*
* You need to specify a [[KeySelector]] for the second input using [[equalTo()]]
* before you can proceed with specifying a [[WindowAssigner]] using [[EqualTo.window()]].
*
* @tparam KEY Type of the key. This must be the same for both inputs
*/
class Where[KEY](keySelector1: KeySelector[T1, KEY], keyType: TypeInformation[KEY]) {
/**
* Specifies a [[KeySelector]] for elements from the second input.
*/
def equalTo(keySelector: T2 => KEY): EqualTo = {
val cleanFun = clean(keySelector)
val localKeyType = keyType
val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T2) = cleanFun(in)
override def getProducedType: TypeInformation[KEY] = localKeyType
}
new EqualTo(javaSelector)
}
/**
* A co-group operation that a [[KeySelector]] defined for the first and the second input.
*
* A window can now be specified using [[window()]].
*/
class EqualTo(keySelector2: KeySelector[T2, KEY]) {
/**
* Specifies the window on which the co-group operation works.
*/
@PublicEvolving
def window[W <: Window](
assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
: WithWindow[W] = {
if (keySelector1 == null || keySelector2 == null) {
throw new UnsupportedOperationException(
"You first need to specify KeySelectors for both inputs using where() and equalTo().")
}
new WithWindow[W](clean(assigner), null, null, null)
}
/**
* A co-group operation that has [[KeySelector]]s defined for both inputs as
* well as a [[WindowAssigner]].
*
* @tparam W Type of { @link Window} on which the co-group operation works.
*/
@PublicEvolving
class WithWindow[W <: Window](
windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
val allowedLateness: Time) {
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
@PublicEvolving
def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
new WithWindow[W](windowAssigner, newTrigger, evictor, allowedLateness)
}
/**
* Sets the [[Evictor]] that should be used to evict elements from a window before
* emission.
*
* Note: When using an evictor window performance will degrade significantly, since
* pre-aggregation of window results cannot be used.
*/
@PublicEvolving
def evictor(
newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
new WithWindow[W](windowAssigner, trigger, newEvictor, allowedLateness)
}
/**
* Sets the time by which elements are allowed to be late.
* Delegates to [[WindowedStream#allowedLateness(Time)]]
*/
@PublicEvolving
def allowedLateness(newLateness: Time): WithWindow[W] = {
new WithWindow[W](windowAssigner, trigger, evictor, newLateness)
}
/**
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation](
fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
require(fun != null, "CoGroup function must not be null.")
val coGrouper = new CoGroupFunction[T1, T2, O] {
val cleanFun = clean(fun)
def coGroup(
left: java.lang.Iterable[T1],
right: java.lang.Iterable[T2], out: Collector[O]) = {
out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
}
}
apply(coGrouper)
}
/**
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation](
fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
require(fun != null, "CoGroup function must not be null.")
val coGrouper = new CoGroupFunction[T1, T2, O] {
val cleanFun = clean(fun)
def coGroup(
left: java.lang.Iterable[T1],
right: java.lang.Iterable[T2], out: Collector[O]) = {
cleanFun(left.iterator.asScala, right.iterator.asScala, out)
}
}
apply(coGrouper)
}
/**
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*/
def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
asScalaStream(coGroup
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness)
.apply(clean(function), implicitly[TypeInformation[T]]))
}
}
}
}
/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}