blob: 4f29c9eeca47e9e0298de89685e87a435844d21a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.dsl.window.impl
import java.time.Instant
import com.gs.collections.api.block.predicate.Predicate
import com.gs.collections.api.block.procedure.Procedure
import com.gs.collections.impl.list.mutable.FastList
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
import org.apache.gearpump.Message
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.TaskUtil
import scala.collection.mutable.ArrayBuffer
/**
* Inputs for [[StreamingOperator]].
*/
case class TimestampedValue[T](value: T, timestamp: Instant) {
def this(msg: Message) = {
this(msg.value.asInstanceOf[T], msg.timestamp)
}
def toMessage: Message = Message(value, timestamp)
}
/**
* Outputs triggered by [[StreamingOperator]]
*/
case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
watermark: Instant)
trait StreamingOperator[IN, OUT] extends java.io.Serializable {
def setup(): Unit = {}
def foreach(tv: TimestampedValue[IN]): Unit
def flatMap(
tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
foreach(tv)
None
}
def trigger(time: Instant): TriggeredOutputs[OUT]
def teardown(): Unit = {}
}
/**
* A composite WindowRunner that first executes its left child and feeds results
* into result child.
*/
case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE],
right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] {
override def setup(): Unit = {
left.setup()
right.setup()
}
override def foreach(
tv: TimestampedValue[IN]): Unit = {
left.flatMap(tv).foreach(right.flatMap)
}
override def flatMap(
tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
left.flatMap(tv).flatMap(right.flatMap)
}
override def trigger(time: Instant): TriggeredOutputs[OUT] = {
val lOutputs = left.trigger(time)
lOutputs.outputs.foreach(right.foreach)
right.trigger(lOutputs.watermark)
}
override def teardown(): Unit = {
left.teardown()
right.teardown()
}
}
/**
* @param runner FlatMapper or chained FlatMappers
*/
class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT])
extends StreamingOperator[IN, OUT] {
override def setup(): Unit = {
runner.setup()
}
override def foreach(tv: TimestampedValue[IN]): Unit = {
throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " +
"please use flatMap instead")
}
override def flatMap(
tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
runner.process(tv.value)
.map(TimestampedValue(_, tv.timestamp))
}
override def trigger(time: Instant): TriggeredOutputs[OUT] = {
TriggeredOutputs(None, time)
}
override def teardown(): Unit = {
runner.teardown()
}
}
/**
* This is responsible for executing window calculation.
* 1. Groups elements into windows as defined by window function
* 2. Applies window calculation to each group
* 3. Emits results on triggering
*/
class WindowOperator[IN, OUT](
windows: Windows,
runner: FunctionRunner[IN, OUT])
extends StreamingOperator[IN, OUT] {
private val windowFn = windows.windowFn
private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]]
private var isSetup = false
private var watermark = Watermark.MIN
override def foreach(
tv: TimestampedValue[IN]): Unit = {
val wins = windowFn(new Context[IN] {
override def element: IN = tv.value
override def timestamp: Instant = tv.timestamp
})
wins.foreach { win =>
if (windowFn.isNonMerging) {
if (!windowInputs.containsKey(win)) {
val inputs = new FastList[TimestampedValue[IN]]
windowInputs.put(win, inputs)
}
windowInputs.get(win).add(tv)
} else {
merge(windowInputs, win, tv)
}
}
def merge(
winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
win: Window, tv: TimestampedValue[IN]): Unit = {
val intersected = winIns.keySet.select(new Predicate[Window] {
override def accept(each: Window): Boolean = {
win.intersects(each)
}
})
var mergedWin = win
val mergedInputs = FastList.newListWith(tv)
intersected.forEach(new Procedure[Window] {
override def value(each: Window): Unit = {
mergedWin = mergedWin.span(each)
mergedInputs.addAll(winIns.remove(each))
}
})
winIns.put(mergedWin, mergedInputs)
}
}
override def trigger(time: Instant): TriggeredOutputs[OUT] = {
@annotation.tailrec
def onTrigger(
outputs: ArrayBuffer[TimestampedValue[OUT]],
wmk: Instant): TriggeredOutputs[OUT] = {
if (windowInputs.notEmpty()) {
val firstWin = windowInputs.firstKey
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
if (!isSetup) {
runner.setup()
isSetup = true
}
inputs.forEach(new Procedure[TimestampedValue[IN]] {
override def value(tv: TimestampedValue[IN]): Unit = {
runner.process(tv.value).foreach {
out: OUT => outputs += TimestampedValue(out, tv.timestamp)
}
}
})
runner.finish().foreach {
out: OUT =>
outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
}
val newWmk = TaskUtil.max(wmk, firstWin.endTime)
if (windows.accumulationMode == Discarding) {
runner.teardown()
// discarding, setup need to be called for each window
isSetup = false
}
onTrigger(outputs, newWmk)
} else {
// The output watermark is the minimum of end of last triggered window
// and start of first un-triggered window
TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
}
} else {
// All windows have been triggered.
if (time == Watermark.MAX) {
// This means there will be no more inputs
// so it's safe to advance to the maximum watermark.
TriggeredOutputs(outputs, Watermark.MAX)
} else {
TriggeredOutputs(outputs, wmk)
}
}
}
val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], watermark)
watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
TriggeredOutputs(triggeredOutputs.outputs, watermark)
}
override def teardown(): Unit = {
runner.teardown()
}
}