| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package primitives |
| |
| import ( |
| "reflect" |
| "time" |
| |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" |
| ) |
| |
| func init() { |
| beam.RegisterFunction(sumPerKey) |
| beam.RegisterType(reflect.TypeOf((*createTimestampedData)(nil)).Elem()) |
| } |
| |
| // createTimestampedData produces data timestamped with the ordinal. |
| type createTimestampedData struct { |
| Data []int |
| } |
| |
| func (f *createTimestampedData) ProcessElement(_ []byte, emit func(beam.EventTime, string, int)) { |
| for i, v := range f.Data { |
| timestamp := mtime.FromMilliseconds(int64((i + 1) * 1000)).Subtract(10 * time.Millisecond) |
| emit(timestamp, "magic", v) |
| } |
| } |
| |
| // WindowSums produces a pipeline that generates the numbers of a 3x3 magic square, and |
| // configures the pipeline so that PCollection. Sum is a closure to handle summing data over the window, in a few conditions. |
| func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) { |
| timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s)) |
| |
| windowSize := 3 * time.Second |
| |
| validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) { |
| // Window the data. |
| windowed := beam.WindowInto(s, wfn, in) |
| // Perform the appropriate sum operation. |
| sums := sumPerKey(s, windowed) |
| // Drop back to Global windows, and drop the key otherwise passert.Equals doesn't work. |
| sums = beam.WindowInto(s, window.NewGlobalWindows(), sums) |
| sums = beam.DropKey(s, sums) |
| passert.Equals(s, sums, expected...) |
| } |
| |
| // Use fixed windows to divide the data into 3 chunks. |
| validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), timestampedData, 15, 15, 15) |
| // This should be identical to the "fixed" windows. |
| validate(s.Scope("SlidingFixed"), window.NewSlidingWindows(windowSize, windowSize), timestampedData, 15, 15, 15) |
| // This will have overlap, but each value should be a multiple of the magic number. |
| validate(s.Scope("Sliding"), window.NewSlidingWindows(windowSize, 3*windowSize), timestampedData, 15, 30, 45, 30, 15) |
| // With such a large gap, there should be a single session which will sum to 45. |
| validate(s.Scope("Session"), window.NewSessions(windowSize), timestampedData, 45) |
| } |
| |
| func sumPerKey(ws beam.Window, ts beam.EventTime, key beam.U, iter func(*int) bool) (beam.U, int) { |
| var v, sum int |
| for iter(&v) { |
| sum += v |
| } |
| return key, sum |
| } |
| |
| func gbkSumPerKey(s beam.Scope, in beam.PCollection) beam.PCollection { |
| grouped := beam.GroupByKey(s, in) |
| return beam.ParDo(s, sumPerKey, grouped) |
| } |
| |
| func WindowSums_GBK(s beam.Scope) { |
| WindowSums(s.Scope("GBK"), gbkSumPerKey) |
| } |
| |
| func WindowSums_Lifted(s beam.Scope) { |
| WindowSums(s.Scope("Lifted"), stats.SumPerKey) |
| } |
| |
| func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m beam.AccumulationMode, expected ...interface{}) { |
| windowed := beam.WindowInto(s, wfn, in, beam.Trigger(tr), m) |
| sums := stats.Sum(s, windowed) |
| sums = beam.WindowInto(s, window.NewGlobalWindows(), sums) |
| passert.Equals(s, sums, expected...) |
| } |
| |
| // TriggerDefault tests the default trigger which fires the pane after the end of the window |
| func TriggerDefault(s beam.Scope) { |
| con := teststream.NewConfig() |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceWatermark(11000) |
| con.AddElements(12000, 4.0, 5.0) |
| con.AdvanceWatermark(13000) |
| |
| col := teststream.Create(s, con) |
| windowSize := 10 * time.Second |
| validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerDefault(), beam.PanesDiscard(), 6.0, 9.0) |
| } |
| |
| // TriggerAlways tests the Always trigger, it is expected to receive every input value as the output. |
| func TriggerAlways(s beam.Scope) { |
| con := teststream.NewConfig() |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceWatermark(11000) |
| col := teststream.Create(s, con) |
| windowSize := 10 * time.Second |
| |
| validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerAlways(), beam.PanesDiscard(), 1.0, 2.0, 3.0) |
| } |
| |
| func validateCount(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m beam.AccumulationMode, expected int) { |
| windowed := beam.WindowInto(s, wfn, in, beam.Trigger(tr), m) |
| sums := stats.Sum(s, windowed) |
| sums = beam.WindowInto(s, window.NewGlobalWindows(), sums) |
| passert.Count(s, sums, "total collections", expected) |
| } |
| |
| // TriggerElementCount tests the ElementCount Trigger, it waits for atleast N elements to be ready |
| // to fire an output pane |
| func TriggerElementCount(s beam.Scope) { |
| con := teststream.NewConfig() |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceWatermark(2000) |
| con.AddElements(6000, 4.0, 5.0) |
| con.AdvanceWatermark(10000) |
| con.AddElements(52000, 10.0) |
| con.AdvanceWatermark(53000) |
| |
| col := teststream.Create(s, con) |
| windowSize := 10 * time.Second |
| |
| // waits only for two elements to arrive and fires output after that and never fires that. |
| // For the trigger to fire every 2 elements, combine it with Repeat Trigger |
| validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerAfterCount(2), beam.PanesDiscard(), 2) |
| } |
| |
| // TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires output panes once 't' processing time has passed |
| // Not yet supported by the flink runner: |
| // java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner. |
| func TriggerAfterProcessingTime(s beam.Scope) { |
| con := teststream.NewConfig() |
| con.AdvanceProcessingTime(100) |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceProcessingTime(2000) |
| con.AddElements(22000, 4.0) |
| |
| col := teststream.Create(s, con) |
| |
| validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col, window.TriggerAfterProcessingTime(5000), beam.PanesDiscard(), 6.0) |
| } |
| |
| // TriggerRepeat tests the repeat trigger. As of now is it is configure to take only one trigger as a subtrigger. |
| // In the below test, it is expected to receive three output panes with two elements each. |
| func TriggerRepeat(s beam.Scope) { |
| // create a teststream pipeline and get the pcollection |
| con := teststream.NewConfig() |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceWatermark(2000) |
| con.AddElements(6000, 4.0, 5.0, 6.0) |
| con.AdvanceWatermark(10000) |
| |
| col := teststream.Create(s, con) |
| |
| validateCount(s.Scope("Global"), window.NewGlobalWindows(), col, window.TriggerRepeat(window.TriggerAfterCount(2)), beam.PanesDiscard(), 3) |
| } |
| |
| // TriggerAfterEndOfWindow tests the AfterEndOfWindow Trigger. With AfterCount(2) as the early firing trigger and AfterCount(1) as late firing trigger. |
| // It fires two times, one with early firing when there are two elements while the third elements waits in. This third element is fired in the late firing. |
| func TriggerAfterEndOfWindow(s beam.Scope) { |
| con := teststream.NewConfig() |
| con.AddElements(1000, 1.0, 2.0, 3.0) |
| con.AdvanceWatermark(11000) |
| |
| col := teststream.Create(s, con) |
| windowSize := 10 * time.Second |
| trigger := window.TriggerAfterEndOfWindow().EarlyFiring(window.TriggerAfterCount(2)).LateFiring(window.TriggerAfterCount(1)) |
| |
| validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, trigger, beam.PanesDiscard(), 2) |
| } |