blob: 31c0586b2edb255f343a6509fbde4b16cee05c59 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package primitives
import (
"context"
"fmt"
"strconv"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
)
// Based on https://github.com/apache/beam/blob/master/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
func init() {
register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{})
register.DoFn6x0[beam.Window, state.Provider, timers.Provider, string, int, func(kv[string, int])](&eventTimeFn{})
register.DoFn5x0[beam.Window, timers.Provider, string, int, func(int)](&eventTimeFnWithOutputTimestamp{})
register.DoFn3x0[beam.EventTime, int, func(int)](&checkTimestampFn{})
register.Emitter2[string, int]()
register.Emitter1[int]()
}
// checkTimestampFn validates that elements arrived at the expected timestamp.
type checkTimestampFn struct {
Timestamp int64 // millisecond epoch
ExpectMaxTimestamp bool
}
func (fn *checkTimestampFn) ProcessElement(ts beam.EventTime, val int, emit func(int)) {
if fn.ExpectMaxTimestamp {
if mtime.Time(ts) != mtime.MaxTimestamp {
panic(fmt.Errorf("timestamp mismatch: got %v, want %v (MaxTimestamp)", ts, mtime.MaxTimestamp))
}
} else {
if got := int64(ts); got != int64(mtime.FromMilliseconds(fn.Timestamp)) {
panic(fmt.Errorf("timestamp mismatch: got %v, want %v (as mtime)", got, fn.Timestamp))
}
}
emit(val)
}
type kv[K, V any] struct {
Key K
Value V
}
func kvfn[K, V any](k K, v V) kv[K, V] {
return kv[K, V]{k, v}
}
type inputFn[K, V any] struct {
Inputs []kv[K, V]
}
func (fn *inputFn[K, V]) ProcessElement(_ []byte, emit func(K, V)) {
for _, in := range fn.Inputs {
emit(in.Key, in.Value)
}
}
type eventTimeFn struct {
Callback timers.EventTime
MyKey state.Value[string]
Offset int
TimerOutput int
}
func (fn *eventTimeFn) ProcessElement(w beam.Window, sp state.Provider, tp timers.Provider, key string, value int, emit func(kv[string, int])) {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime())
fn.MyKey.Write(sp, key)
emit(kvfn(key, value+fn.Offset))
}
func (fn *eventTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(kv[string, int])) {
switch timer.Family {
case fn.Callback.Family:
switch timer.Tag {
case "":
read, ok, err := fn.MyKey.Read(sp)
if err != nil {
panic(err)
}
if !ok {
panic("State must be set for key: " + key)
}
emit(kvfn(read, fn.TimerOutput))
default:
panic("unexpected timer tag: " + timer.Family + " tag:" + timer.Tag + " want: \"\", for key:" + key)
}
default:
if fn.Callback.Family != timer.Family || timer.Tag != "" {
panic("unexpected timer family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key)
}
}
}
// timersEventTimePipelineBuilder takes in an impulse transform and produces a pipeline construction
// function that validates EventTime timers.
//
// The impulse is provided outside to swap between a bounded impulse, and
// an unbounded one, because the Go SDK uses that to determine if a pipeline
// is "streaming" or not. This matters at least for executions on Dataflow.
//
// The test produces some number of key value elements, with various event time offsets,
// expecting that a single timer event is what fires. The test validates that all the
// input elements, and the timer elements have been emitted.
func timersEventTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs, wantOutputs []kv[string, int]
offset := 5000
timerOutput := 4093
numKeys := 50
numDuplicateTimers := 15
for key := 0; key < numKeys; key++ {
k := strconv.Itoa(key)
wantOutputs = append(wantOutputs, kvfn(k, timerOutput))
for i := 0; i < numDuplicateTimers; i++ {
inputs = append(inputs, kvfn(k, i))
wantOutputs = append(wantOutputs, kvfn(k, i+offset))
}
}
imp := makeImp(s)
keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFn{
Offset: offset,
TimerOutput: timerOutput,
Callback: timers.InEventTime("Callback"),
MyKey: state.MakeValueState[string]("MyKey"),
}, keyed)
passert.EqualsList(s, times, wantOutputs)
}
}
// TimersEventTimeBounded validates event time timers in a bounded pipeline.
func TimersEventTimeBounded(s beam.Scope) {
timersEventTimePipelineBuilder(beam.Impulse)(s)
}
// TimersEventTimeUnbounded validates event time timers in an unbounded pipeline.
func TimersEventTimeUnbounded(s beam.Scope) {
timersEventTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
now := time.Now()
return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false)
})(s)
}
type eventTimeFnWithOutputTimestamp struct {
Callback timers.EventTime
Offset int
TimerOutput int
OutputTimestamp int64 // millisecond epoch
NoOutputTimestamp bool
}
func (fn *eventTimeFnWithOutputTimestamp) ProcessElement(w beam.Window, tp timers.Provider, key string, value int, emit func(int)) {
if fn.NoOutputTimestamp {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithNoOutputTimestamp())
} else {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithOutputTimestamp(time.UnixMilli(fn.OutputTimestamp)))
}
}
func (fn *eventTimeFnWithOutputTimestamp) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key string, timer timers.Context, emit func(int)) {
if fn.Callback.Family != timer.Family || timer.Tag != "" {
panic("unexpected timer, family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key)
}
emit(fn.TimerOutput)
}
// timersEventTimePipelineBuilderWithOutputTimestamp validates EventTime timers with explicit output timestamp.
func timersEventTimePipelineBuilderWithOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs []kv[string, int]
offset := 5000
timerOutput := 4093
outputTimestamp := int64(1234567890000)
inputs = append(inputs, kvfn("key", 0))
imp := makeImp(s)
keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{
Offset: offset,
TimerOutput: timerOutput,
OutputTimestamp: outputTimestamp,
Callback: timers.InEventTime("Callback"),
}, keyed)
// Check that the output element has the expected timestamp.
validatedTimestamps := beam.ParDo(s, &checkTimestampFn{Timestamp: outputTimestamp}, times)
wantOutputs := []int{timerOutput}
passert.EqualsList(s, validatedTimestamps, wantOutputs)
}
}
// timersEventTimePipelineBuilderWithNoOutputTimestamp validates EventTime timers with no output timestamp.
func timersEventTimePipelineBuilderWithNoOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs []kv[string, int]
offset := 5000
timerOutput := 4093
inputs = append(inputs, kvfn("key", 0))
imp := makeImp(s)
keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{
Offset: offset,
TimerOutput: timerOutput,
NoOutputTimestamp: true,
Callback: timers.InEventTime("Callback"),
}, keyed)
// Check that the output element has MaxTimestamp.
validatedTimestamps := beam.ParDo(s, &checkTimestampFn{ExpectMaxTimestamp: true}, times)
wantOutputs := []int{timerOutput}
passert.EqualsList(s, validatedTimestamps, wantOutputs)
}
}
// TimersEventTime_WithOutputTimestamp validates event time timers with explicit output timestamp.
func TimersEventTime_WithOutputTimestamp(s beam.Scope) {
timersEventTimePipelineBuilderWithOutputTimestamp(beam.Impulse)(s)
}
// TimersEventTime_WithNoOutputTimestamp validates event time timers with no output timestamp.
func TimersEventTime_WithNoOutputTimestamp(s beam.Scope) {
timersEventTimePipelineBuilderWithNoOutputTimestamp(beam.Impulse)(s)
}
// Below here are tests for ProcessingTime timers.
func init() {
register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{})
register.DoFn5x0[state.Provider, timers.Provider, string, int, func(string, int)](&processingTimeFn{})
}
type processingTimeFn struct {
Callback timers.ProcessingTime
MyValue state.Value[int]
Emissions state.Value[int]
Offset int
TimerOutput int
Cap int
InitialDelaySec int
RecurringDelaySec int
}
func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
// Sets a processing time callback to occur.
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.InitialDelaySec)*time.Second))
// Only write to the state if we haven't done so already.
// Writing blind would reset the state, and cause duplicated outputs.
_, ok, err := fn.MyValue.Read(sp)
if err != nil {
panic(err)
}
if !ok {
if err := fn.MyValue.Write(sp, 0); err != nil {
panic(err)
}
}
}
func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(string, int)) {
switch timer.Family {
case fn.Callback.Family:
switch timer.Tag {
case "":
read, ok, err := fn.MyValue.Read(sp)
if err != nil {
panic(err)
}
if !ok {
panic("State must be set for key: " + key)
}
emit(key, read)
if read < fn.Cap-1 {
if err := fn.MyValue.Write(sp, read+1); err != nil {
panic(err)
}
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.RecurringDelaySec)*time.Second))
}
if num, _, err := fn.Emissions.Read(sp); err != nil {
panic(err)
} else if num == fn.Cap {
panic(fmt.Sprintf("cap reached! This shouldn't be possible. key %v, num: %v, cap %v read %v", key, num, fn.Cap, read))
} else {
if err := fn.Emissions.Write(sp, num+1); err != nil {
panic(err)
}
}
default:
panic("unexpected timer tag: " + timer.Family + " tag:" + timer.Tag + " want: \"\", for key:" + key)
}
default:
if fn.Callback.Family != timer.Family || timer.Tag != "" {
panic("unexpected timer family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key)
}
}
}
func regroup(key string, vs func(*int) bool, emit func(kv[string, int])) {
var v int
for vs(&v) {
emit(kvfn(key, v))
}
}
func init() {
register.Function3x0(regroup)
}
// timersProcessingTimePipelineBuilder constructs a pipeline to validate the behavior of processing time timers.
// It generates a set of keyed elements and uses a DoFn (`processingTimeFn`) to set an initial processing time
// timer for each key. When a timer fires, the DoFn emits an element, increments a counter in state, and
// sets a new timer to fire after a recurring delay, continuing until a specified number of emissions for that
// key is reached.
//
// The total approximate runtime of the timer-based logic for each key is calculated as:
// InitialDelay + (numDuplicateTimers - 1) * RecurringDelay.
// Note that the number of keys is irrelevant to the runtime, because keys are processed in parallel.
func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs, wantOutputs []kv[string, int]
offset := 5000
timerOutput := 4093
// Control the total runtime of the test to under 30 secs.
// The runtime for the current setting is 3 + (5 - 1) * 1 = 7 secs
numKeys := 40
numDuplicateTimers := 5
initialDelaySec := 3
recurringDelaySec := 1
for key := 0; key < numKeys; key++ {
k := strconv.Itoa(key)
for i := 0; i < numDuplicateTimers; i++ {
inputs = append(inputs, kvfn(k, i))
wantOutputs = append(wantOutputs, kvfn(k, i))
}
}
imp := makeImp(s)
keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &processingTimeFn{
Offset: offset,
TimerOutput: timerOutput,
Callback: timers.InProcessingTime("Callback"),
MyValue: state.MakeValueState[int]("MyValue"),
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
InitialDelaySec: initialDelaySec,
RecurringDelaySec: recurringDelaySec,
}, keyed)
// We GroupByKey here so input to passert is blocked until teststream advances time to Infinity.
gbk := beam.GroupByKey(s, times)
regrouped := beam.ParDo(s, regroup, gbk)
passert.EqualsList(s, regrouped, wantOutputs)
}
}
// TimersProcessingTimeTestStream_Infinity validates processing time timers in a bounded pipeline
// kicked along by TestStream.
func TimersProcessingTimeTestStream_Infinity(s beam.Scope) {
timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
c := teststream.NewConfig()
c.AddElements(123456, []byte{42})
c.AdvanceWatermarkToInfinity()
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
c.AdvanceProcessingTimeToInfinity()
return teststream.Create(s, c)
})(s)
}
// TimersProcessingTimeBounded validates processing time timers in a bounded pipeline.
func TimersProcessingTime_Bounded(s beam.Scope) {
timersProcessingTimePipelineBuilder(beam.Impulse)(s)
}
// TimersProcessingTimeBounded validates processing time timers in an unbounded pipeline.
func TimersProcessingTime_Unbounded(s beam.Scope) {
timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
now := time.Now()
return periodic.Impulse(s, now, now.Add(10*time.Second), 5*time.Second, false)
})(s)
}