blob: f5d01bdfbba54404449884ec488a10caf6d18aed [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package primitives
import (
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
func init() {
register.Function4x2(sumPerKey)
register.Function3x0(sumSideInputs)
register.DoFn2x0[[]byte, func(beam.EventTime, string, int)](&createTimestampedData{})
register.Emitter3[beam.EventTime, string, int]()
register.Emitter1[int]()
register.Iter1[int]()
}
// createTimestampedData produces data timestamped with the ordinal.
type createTimestampedData struct {
Data []int
}
func (f *createTimestampedData) ProcessElement(_ []byte, emit func(beam.EventTime, string, int)) {
for i, v := range f.Data {
timestamp := mtime.FromMilliseconds(int64((i + 1) * 1000)).Subtract(10 * time.Millisecond)
emit(timestamp, "magic", v)
}
}
// WindowSums produces a pipeline that generates the numbers of a 3x3 magic square, and
// configures the pipeline so that PCollection. Sum is a closure to handle summing data over the window, in a few conditions.
func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
windowSize := 3 * time.Second
validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...any) {
// Window the data.
windowed := beam.WindowInto(s, wfn, in)
// Perform the appropriate sum operation.
sums := sumPerKey(s, windowed)
// Drop back to Global windows, and drop the key otherwise passert.Equals doesn't work.
sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
sums = beam.DropKey(s, sums)
passert.Equals(s, sums, expected...)
}
// Use fixed windows to divide the data into 3 chunks.
validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), timestampedData, 15, 15, 15)
// This should be identical to the "fixed" windows.
validate(s.Scope("SlidingFixed"), window.NewSlidingWindows(windowSize, windowSize), timestampedData, 15, 15, 15)
// This will have overlap, but each value should be a multiple of the magic number.
validate(s.Scope("Sliding"), window.NewSlidingWindows(windowSize, 3*windowSize), timestampedData, 15, 30, 45, 30, 15)
// With such a large gap, there should be a single session which will sum to 45.
validate(s.Scope("Session"), window.NewSessions(windowSize), timestampedData, 45)
}
func sumPerKey(ws beam.Window, ts beam.EventTime, key beam.U, iter func(*int) bool) (beam.U, int) {
var v, sum int
for iter(&v) {
sum += v
}
return key, sum
}
func gbkSumPerKey(s beam.Scope, in beam.PCollection) beam.PCollection {
grouped := beam.GroupByKey(s, in)
return beam.ParDo(s, sumPerKey, grouped)
}
func WindowSums_GBK(s beam.Scope) {
WindowSums(s.Scope("GBK"), gbkSumPerKey)
}
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
func ValidateWindowedSideInputs(s beam.Scope) {
timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
timestampedData = beam.DropKey(s, timestampedData)
windowSize := 1 * time.Second
validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...any) {
wData := beam.WindowInto(s, wfn, in)
wSide := beam.WindowInto(s, sideFn, side)
sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
passert.Equals(s, sums, expected...)
}
validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedData, 7, 8, 9)
// Main: With window size 1, each window contains 1 element (1, 2, 3)
// Side: Window size 2 with period 1, so each window covers 2 seconds of time
// Have [1], [1,2], [2,3], [3]
// Each main input should map to the earliest occuring sliding window it maps to:
// (1, [1]) = 2
// (2, [1, 2]) = 5
// (3, [2, 3]) = 8
validateSums(s.Scope("Fixed-Sliding"), window.NewFixedWindows(windowSize), window.NewSlidingWindows(windowSize, 2*windowSize), timestampedData, timestampedData, 2, 5, 8)
// Main: Window size 2 with period 1, so each window has up to two elements
// Have [1], [1,2], [2,3], [3]
// Side: With window size 1, each window contains 1 element (1, 2, 3)
// Each main input will map to the window its latest timestamp corresponds to:
// ([1], 1) = 2
// ([1, 2], 2) = 3, 4
// ([2, 3], 3) = 5, 6
// ([3], -) = 3
validateSums(s.Scope("Sliding-Fixed"), window.NewSlidingWindows(windowSize, 2*windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 3, 4, 5, 6, 3)
}
func sumSideInputs(input int, iter func(*int) bool, emit func(int)) {
var v, sum int
sum += input
for iter(&v) {
sum += v
}
emit(sum)
}
func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected ...any) {
windowed := beam.WindowInto(s, wfn, in, opts...)
sums := stats.Sum(s, windowed)
sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
passert.Equals(s, sums, expected...)
}
// TriggerDefault tests the default trigger which fires the pane after the end of the window
func TriggerDefault(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
con.AddElements(12000, 4.0, 5.0)
con.AdvanceWatermark(13000)
col := teststream.Create(s, con)
windowSize := 10 * time.Second
validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.Default()),
}, 6.0, 9.0)
}
// TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
func TriggerAlways(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
col := teststream.Create(s, con)
windowSize := 10 * time.Second
validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.Always()),
}, 1.0, 2.0, 3.0)
}
// validateCount handles cases where we can only be sure of the count of elements
// and not their ordering.
func validateCount(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected int) {
windowed := beam.WindowInto(s, wfn, in, opts...)
sums := stats.Sum(s, windowed)
sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
passert.Count(s, sums, "total collections", expected)
}
// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N elements to be ready
// to fire an output pane
func TriggerElementCount(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(2000)
con.AddElements(6000, 4.0, 5.0)
con.AdvanceWatermark(10000)
con.AddElements(52000, 10.0)
con.AdvanceWatermark(53000)
col := teststream.Create(s, con)
windowSize := 10 * time.Second
// waits only for two elements to arrive and fires output after that and never fires that.
// For the trigger to fire every 2 elements, combine it with Repeat Trigger
validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.AfterCount(2)),
}, 2)
}
// TriggerAfterProcessingTimeNotTriggered tests the AfterProcessingTime Trigger. It won't fire because 't' processing time is not reached
// Not yet supported by the flink runner:
// java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner.
func TriggerAfterProcessingTimeNotTriggered(s beam.Scope) {
con := teststream.NewConfig()
con.AdvanceProcessingTime(100)
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceProcessingTime(4999) // advance processing time but not enough to fire the trigger
con.AddElements(22000, 4.0)
col := teststream.Create(s, con)
validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 * time.Second)),
}, 10.0)
}
// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger. It fires output panes once 't' processing time has passed
// Not yet supported by the flink runner:
// java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner.
func TriggerAfterProcessingTime(s beam.Scope) {
con := teststream.NewConfig()
con.AdvanceProcessingTime(100)
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceProcessingTime(5000) // advance processing time to fire the trigger
con.AddElements(22000, 4.0)
col := teststream.Create(s, con)
validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 * time.Second)),
}, 6.0, 4.0)
}
// TriggerRepeat tests the repeat trigger. As of now is it is configure to take only one trigger as a subtrigger.
// In the below test, it is expected to receive three output panes with two elements each.
func TriggerRepeat(s beam.Scope) {
// create a teststream pipeline and get the pcollection
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(2000)
con.AddElements(6000, 4.0, 5.0, 6.0)
con.AdvanceWatermark(10000)
col := teststream.Create(s, con)
validateCount(s.Scope("Global"), window.NewGlobalWindows(), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger.Repeat(trigger.AfterCount(2))),
}, 3)
}
// TriggerAfterEndOfWindow tests the AfterEndOfWindow Trigger. With AfterCount(2) as the early firing trigger and AfterCount(1) as late firing trigger.
// It fires two times, one with early firing when there are two elements while the third elements waits in. This third element is fired in the late firing.
func TriggerAfterEndOfWindow(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
col := teststream.Create(s, con)
windowSize := 10 * time.Second
trigger := trigger.AfterEndOfWindow().
EarlyFiring(trigger.AfterCount(2)).
LateFiring(trigger.AfterCount(1))
validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 2)
}
// TriggerAfterAll tests AfterAll trigger. The output pane is fired when all triggers in the subtriggers
// are ready. In this test, since trigger.AfterCount(int32(5)) won't be ready unless we see 5 elements,
// trigger.Always() won't fire until we meet that condition. So we fire only once when we see the 5th element.
func TriggerAfterAll(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0, 5.0, 8.0)
con.AdvanceWatermark(11000)
col := teststream.Create(s, con)
trigger := trigger.Repeat(
trigger.AfterAll(
[]trigger.Trigger{
trigger.Always(),
trigger.AfterCount(int32(5)),
},
),
)
validateCount(s.Scope("Global"), window.NewFixedWindows(10*time.Second), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 1)
}
// TriggerAfterEach tests AfterEach trigger. The output pane is fired after each trigger
// is ready in the order set in subtriggers. In this test, since trigger.AfterCount(int32(3)) is first,
// first pane is fired after 3 elements, then a pane is fired each for trigger.Always() for
// element 5.0 and 8.0
func TriggerAfterEach(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0, 5.0, 8.0)
con.AdvanceWatermark(11000)
col := teststream.Create(s, con)
trigger := trigger.Repeat(
trigger.AfterEach(
[]trigger.Trigger{
trigger.AfterCount(int32(3)),
trigger.Always(),
},
),
)
validateCount(s.Scope("Global"), window.NewGlobalWindows(), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 3)
}
// TriggerAfterAny tests AfterAny trigger. In this test, trigger.Always() gets ready everytime.
// So we would expect panes to be fired at every element irrespective of checking for other triggers.
func TriggerAfterAny(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
con.AddElements(12000, 5.0, 8.0)
col := teststream.Create(s, con)
trigger := trigger.Repeat(
trigger.AfterAny(
[]trigger.Trigger{
trigger.AfterCount(int32(3)),
trigger.Always(),
},
),
)
windowSize := 10 * time.Second
validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 5)
}
// TriggerAfterSynchronizedProcessingTime tests AfterSynchronizedProcessingTime trigger. It fires at the window
// expiration since the times doesn't synchronize in this test case.
func TriggerAfterSynchronizedProcessingTime(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
con.AddElements(12000, 5.0, 8.0)
col := teststream.Create(s, con)
trigger := trigger.Repeat(trigger.AfterSynchronizedProcessingTime())
windowSize := 10 * time.Second
validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 2)
}
// TriggerNever tests Never Trigger. It fires at the window expiration.
func TriggerNever(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
con.AddElements(12000, 5.0, 8.0)
col := teststream.Create(s, con)
trigger := trigger.Never()
windowSize := 10 * time.Second
validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 2)
}
// TriggerOrFinally tests OrFinally trigger. The main trigger in this test case trigger.Always()
// is always ready. But the output is produced only when finally trigger is ready. So it is ready at second
// element in first window and produces two output panes. Similarly, for the second window.
func TriggerOrFinally(s beam.Scope) {
con := teststream.NewConfig()
con.AddElements(1000, 1.0, 2.0, 3.0)
con.AdvanceWatermark(11000)
con.AddElements(12000, 5.0, 8.0)
col := teststream.Create(s, con)
trigger := trigger.OrFinally(trigger.Always(), trigger.AfterCount(int32(2)))
windowSize := 10 * time.Second
validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
beam.Trigger(trigger),
}, 4)
}