| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package snippets |
| |
| import ( |
| "fmt" |
| "math" |
| "reflect" |
| "sort" |
| "strings" |
| "time" |
| |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/register" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" |
| ) |
| |
| // [START model_pardo_pardo] |
| |
| // ComputeWordLengthFn is the DoFn to perform on each element in the input PCollection. |
| type ComputeWordLengthFn struct{} |
| |
| // ProcessElement is the method to execute for each element. |
| func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) { |
| emit(len(word)) |
| } |
| |
| // DoFns must be registered with beam. |
| func init() { |
| beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil))) |
| // 2 inputs and 0 outputs => DoFn2x0 |
| // 1 input => Emitter1 |
| // Input/output types are included in order in the brackets |
| register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{}) |
| register.Emitter1[int]() |
| } |
| |
| // [END model_pardo_pardo] |
| |
| // applyWordLen applies ComputeWordLengthFn to words, which must be |
| // a PCollection<string> |
| func applyWordLen(s beam.Scope, words beam.PCollection) beam.PCollection { |
| // [START model_pardo_apply] |
| wordLengths := beam.ParDo(s, &ComputeWordLengthFn{}, words) |
| // [END model_pardo_apply] |
| return wordLengths |
| } |
| |
| // [START model_pardo_apply_anon] |
| |
| func wordLengths(word string) int { return len(word) } |
| func init() { register.Function1x1(wordLengths) } |
| |
| func applyWordLenAnon(s beam.Scope, words beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, wordLengths, words) |
| } |
| |
| // [END model_pardo_apply_anon] |
| |
| func applyGbk(s beam.Scope, input []stringPair) beam.PCollection { |
| // [START groupbykey] |
| // CreateAndSplit creates and returns a PCollection with <K,V> |
| // from an input slice of stringPair (struct with K, V string fields). |
| pairs := CreateAndSplit(s, input) |
| keyed := beam.GroupByKey(s, pairs) |
| // [END groupbykey] |
| return keyed |
| } |
| |
| // [START cogroupbykey_input_helpers] |
| |
| type stringPair struct { |
| K, V string |
| } |
| |
| func splitStringPair(e stringPair) (string, string) { |
| return e.K, e.V |
| } |
| |
| func init() { |
| // Register DoFn. |
| register.Function1x2(splitStringPair) |
| } |
| |
| // CreateAndSplit is a helper function that creates |
| func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection { |
| initial := beam.CreateList(s, input) |
| return beam.ParDo(s, splitStringPair, initial) |
| } |
| |
| // [END cogroupbykey_input_helpers] |
| |
| type splittableDoFn struct{} |
| |
| type weDoFn struct{} |
| |
| // [START bundlefinalization_simplecallback] |
| |
| func (fn *splittableDoFn) ProcessElement(bf beam.BundleFinalization, rt *sdf.LockRTracker, element string) { |
| // ... produce output ... |
| |
| bf.RegisterCallback(5*time.Minute, func() error { |
| // ... perform a side effect ... |
| |
| return nil |
| }) |
| } |
| |
| // [END bundlefinalization_simplecallback] |
| |
| // [START watermarkestimation_customestimator] |
| |
| // WatermarkState is a custom type.` |
| // |
| // It is optional to write your own state type when making a custom estimator. |
| type WatermarkState struct { |
| Watermark time.Time |
| } |
| |
| // CustomWatermarkEstimator is a custom watermark estimator. |
| // You may use any type here, including some of Beam's built in watermark estimator types, |
| // e.g. sdf.WallTimeWatermarkEstimator, sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator |
| type CustomWatermarkEstimator struct { |
| state WatermarkState |
| } |
| |
| // CurrentWatermark returns the current watermark and is invoked on DoFn splits and self-checkpoints. |
| // Watermark estimators must implement CurrentWatermark() time.Time |
| func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time { |
| return e.state.Watermark |
| } |
| |
| // ObserveTimestamp is called on the output timestamps of all |
| // emitted elements to update the watermark. It is optional |
| func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) { |
| e.state.Watermark = ts |
| } |
| |
| // InitialWatermarkEstimatorState defines an initial state used to initialize the watermark |
| // estimator. It is optional. If this is not defined, WatermarkEstimatorState may not be |
| // defined and CreateWatermarkEstimator must not take in parameters. |
| func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest offsetrange.Restriction, element string) WatermarkState { |
| // Return some watermark state |
| return WatermarkState{Watermark: time.Now()} |
| } |
| |
| // CreateWatermarkEstimator creates the watermark estimator used by this Splittable DoFn. |
| // Must take in a state parameter if InitialWatermarkEstimatorState is defined, otherwise takes no parameters. |
| func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) *CustomWatermarkEstimator { |
| return &CustomWatermarkEstimator{state: initialState} |
| } |
| |
| // WatermarkEstimatorState returns the state used to resume future watermark estimation |
| // after a checkpoint/split. It is required if InitialWatermarkEstimatorState is defined, |
| // otherwise it must not be defined. |
| func (fn *weDoFn) WatermarkEstimatorState(e *CustomWatermarkEstimator) WatermarkState { |
| return e.state |
| } |
| |
| // ProcessElement is the method to execute for each element. |
| // It can optionally take in a watermark estimator. |
| func (fn *weDoFn) ProcessElement(e *CustomWatermarkEstimator, element string) { |
| // ... |
| e.state.Watermark = time.Now() |
| } |
| |
| // [END watermarkestimation_customestimator] |
| |
| // [START sdf_truncate] |
| |
| // TruncateRestriction is a transform that is triggered when pipeline starts to drain. It helps to finish a |
| // pipeline quicker by truncating the restriction. |
| func (fn *splittableDoFn) TruncateRestriction(rt *sdf.LockRTracker, element string) offsetrange.Restriction { |
| start := rt.GetRestriction().(offsetrange.Restriction).Start |
| prevEnd := rt.GetRestriction().(offsetrange.Restriction).End |
| // truncate the restriction by half. |
| newEnd := prevEnd / 2 |
| return offsetrange.Restriction{ |
| Start: start, |
| End: newEnd, |
| } |
| } |
| |
| // [END sdf_truncate] |
| |
| // [START cogroupbykey_output_helpers] |
| |
| func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) string { |
| var s string |
| var emails, phones []string |
| for emailIter(&s) { |
| emails = append(emails, s) |
| } |
| for phoneIter(&s) { |
| phones = append(phones, s) |
| } |
| // Values have no guaranteed order, sort for deterministic output. |
| sort.Strings(emails) |
| sort.Strings(phones) |
| return fmt.Sprintf("%s; %s; %s", key, formatStringIter(emails), formatStringIter(phones)) |
| } |
| |
| func init() { |
| register.Function3x1(formatCoGBKResults) |
| // 1 input of type string => Iter1[string] |
| register.Iter1[string]() |
| } |
| |
| // [END cogroupbykey_output_helpers] |
| |
| func formatStringIter(vs []string) string { |
| var b strings.Builder |
| b.WriteRune('[') |
| for i, v := range vs { |
| b.WriteRune('\'') |
| b.WriteString(v) |
| b.WriteRune('\'') |
| if i < len(vs)-1 { |
| b.WriteString(", ") |
| } |
| } |
| b.WriteRune(']') |
| return b.String() |
| } |
| |
| func coGBKExample(s beam.Scope) beam.PCollection { |
| // [START cogroupbykey_inputs] |
| var emailSlice = []stringPair{ |
| {"amy", "amy@example.com"}, |
| {"carl", "carl@example.com"}, |
| {"julia", "julia@example.com"}, |
| {"carl", "carl@email.com"}, |
| } |
| |
| var phoneSlice = []stringPair{ |
| {"amy", "111-222-3333"}, |
| {"james", "222-333-4444"}, |
| {"amy", "333-444-5555"}, |
| {"carl", "444-555-6666"}, |
| } |
| emails := CreateAndSplit(s.Scope("CreateEmails"), emailSlice) |
| phones := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice) |
| // [END cogroupbykey_inputs] |
| |
| // [START cogroupbykey_outputs] |
| results := beam.CoGroupByKey(s, emails, phones) |
| |
| contactLines := beam.ParDo(s, formatCoGBKResults, results) |
| // [END cogroupbykey_outputs] |
| |
| return contactLines |
| } |
| |
| // [START combine_simple_sum] |
| func sumInts(a, v int) int { |
| return a + v |
| } |
| |
| func init() { |
| register.Function2x1(sumInts) |
| } |
| |
| func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection { |
| return beam.Combine(s, sumInts, ints) |
| } |
| |
| type boundedSum struct { |
| Bound int |
| } |
| |
| func (fn *boundedSum) MergeAccumulators(a, v int) int { |
| sum := a + v |
| if fn.Bound > 0 && sum > fn.Bound { |
| return fn.Bound |
| } |
| return sum |
| } |
| |
| func init() { |
| register.Combiner1[int](&boundedSum{}) |
| } |
| |
| func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection { |
| return beam.Combine(s, &boundedSum{Bound: bound}, ints) |
| } |
| |
| // [END combine_simple_sum] |
| |
| // [START combine_custom_average] |
| |
| type averageFn struct{} |
| |
| type averageAccum struct { |
| Count, Sum int |
| } |
| |
| func (fn *averageFn) CreateAccumulator() averageAccum { |
| return averageAccum{0, 0} |
| } |
| |
| func (fn *averageFn) AddInput(a averageAccum, v int) averageAccum { |
| return averageAccum{Count: a.Count + 1, Sum: a.Sum + v} |
| } |
| |
| func (fn *averageFn) MergeAccumulators(a, v averageAccum) averageAccum { |
| return averageAccum{Count: a.Count + v.Count, Sum: a.Sum + v.Sum} |
| } |
| |
| func (fn *averageFn) ExtractOutput(a averageAccum) float64 { |
| if a.Count == 0 { |
| return math.NaN() |
| } |
| return float64(a.Sum) / float64(a.Count) |
| } |
| |
| func (fn *averageFn) Compact(a averageAccum) averageAccum { |
| // No-op |
| return a |
| } |
| |
| func init() { |
| register.Combiner3[averageAccum, int, float64](&averageFn{}) |
| } |
| |
| // [END combine_custom_average] |
| |
| func globallyAverage(s beam.Scope, ints beam.PCollection) beam.PCollection { |
| // [START combine_global_average] |
| average := beam.Combine(s, &averageFn{}, ints) |
| // [END combine_global_average] |
| return average |
| } |
| |
| // [START combine_global_with_default] |
| |
| func returnSideOrDefault(d float64, iter func(*float64) bool) float64 { |
| var c float64 |
| if iter(&c) { |
| // Side input has a value, so return it. |
| return c |
| } |
| // Otherwise, return the default |
| return d |
| } |
| func init() { register.Function2x1(returnSideOrDefault) } |
| |
| func globallyAverageWithDefault(s beam.Scope, ints beam.PCollection) beam.PCollection { |
| // Setting combine defaults has requires no helper function in the Go SDK. |
| average := beam.Combine(s, &averageFn{}, ints) |
| |
| // To add a default value: |
| defaultValue := beam.Create(s, float64(0)) |
| return beam.ParDo(s, returnSideOrDefault, defaultValue, beam.SideInput{Input: average}) |
| } |
| |
| // [END combine_global_with_default] |
| func perKeyAverage(s beam.Scope, playerAccuracies beam.PCollection) beam.PCollection { |
| // [START combine_per_key] |
| avgAccuracyPerPlayer := stats.MeanPerKey(s, playerAccuracies) |
| // [END combine_per_key] |
| return avgAccuracyPerPlayer |
| } |
| |
| func applyFlatten(s beam.Scope, pcol1, pcol2, pcol3 beam.PCollection) beam.PCollection { |
| // [START model_multiple_pcollections_flatten] |
| merged := beam.Flatten(s, pcol1, pcol2, pcol3) |
| // [END model_multiple_pcollections_flatten] |
| return merged |
| } |
| |
| type Student struct { |
| Percentile int |
| } |
| |
| // [START model_multiple_pcollections_partition_fn] |
| |
| func decileFn(student Student) int { |
| return int(float64(student.Percentile) / float64(10)) |
| } |
| |
| func init() { |
| register.Function1x1(decileFn) |
| } |
| |
| // [END model_multiple_pcollections_partition_fn] |
| |
| // applyPartition returns the 40th percentile of students. |
| func applyPartition(s beam.Scope, students beam.PCollection) beam.PCollection { |
| // [START model_multiple_pcollections_partition] |
| // Partition returns a slice of PCollections |
| studentsByPercentile := beam.Partition(s, 10, decileFn, students) |
| // Each partition can be extracted by indexing into the slice. |
| fortiethPercentile := studentsByPercentile[4] |
| // [END model_multiple_pcollections_partition] |
| return fortiethPercentile |
| } |
| |
| // [START model_pardo_side_input_dofn] |
| |
| // filterWordsAbove is a DoFn that takes in a word, |
| // and a singleton side input iterator as of a length cut off |
| // and only emits words that are beneath that cut off. |
| // |
| // If the iterator has no elements, an error is returned, aborting processing. |
| func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool, emitAboveCutoff func(string)) error { |
| var cutOff float64 |
| ok := lengthCutOffIter(&cutOff) |
| if !ok { |
| return fmt.Errorf("no length cutoff provided") |
| } |
| if float64(len(word)) > cutOff { |
| emitAboveCutoff(word) |
| } |
| return nil |
| } |
| |
| // filterWordsBelow is a DoFn that takes in a word, |
| // and a singleton side input of a length cut off |
| // and only emits words that are beneath that cut off. |
| // |
| // If the side input isn't a singleton, a runtime panic will occur. |
| func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(string)) { |
| if float64(len(word)) <= lengthCutOff { |
| emitBelowCutoff(word) |
| } |
| } |
| |
| func init() { |
| register.Function3x1(filterWordsAbove) |
| register.Function3x0(filterWordsBelow) |
| // 1 input of type string => Emitter1[string] |
| register.Emitter1[string]() |
| // 1 input of type float64 => Iter1[float64] |
| register.Iter1[float64]() |
| } |
| |
| // [END model_pardo_side_input_dofn] |
| |
| // addSideInput demonstrates passing a side input to a DoFn. |
| func addSideInput(s beam.Scope, words beam.PCollection) (beam.PCollection, beam.PCollection) { |
| wordLengths := applyWordLen(s, words) |
| |
| // [START model_pardo_side_input] |
| // avgWordLength is a PCollection containing a single element, a singleton. |
| avgWordLength := stats.Mean(s, wordLengths) |
| |
| // Side inputs are added as with the beam.SideInput option to beam.ParDo. |
| wordsAboveCutOff := beam.ParDo(s, filterWordsAbove, words, beam.SideInput{Input: avgWordLength}) |
| wordsBelowCutOff := beam.ParDo(s, filterWordsBelow, words, beam.SideInput{Input: avgWordLength}) |
| // [END model_pardo_side_input] |
| return wordsAboveCutOff, wordsBelowCutOff |
| } |
| |
| // isMarkedWord is a dummy function. |
| func isMarkedWord(word string) bool { |
| return strings.HasPrefix(word, "MARKER") |
| } |
| |
| // [START model_multiple_output_dofn] |
| |
| // processWords is a DoFn that has 3 output PCollections. The emitter functions |
| // are matched in positional order to the PCollections returned by beam.ParDo3. |
| func processWords(word string, emitBelowCutoff, emitAboveCutoff, emitMarked func(string)) { |
| const cutOff = 5 |
| if len(word) < cutOff { |
| emitBelowCutoff(word) |
| } else { |
| emitAboveCutoff(word) |
| } |
| if isMarkedWord(word) { |
| emitMarked(word) |
| } |
| } |
| |
| // processWordsMixed demonstrates mixing an emitter, with a standard return. |
| // If a standard return is used, it will always be the first returned PCollection, |
| // followed in positional order by the emitter functions. |
| func processWordsMixed(word string, emitMarked func(string)) int { |
| if isMarkedWord(word) { |
| emitMarked(word) |
| } |
| return len(word) |
| } |
| |
| func init() { |
| register.Function4x0(processWords) |
| register.Function2x1(processWordsMixed) |
| // 1 input of type string => Emitter1[string] |
| register.Emitter1[string]() |
| } |
| |
| // [END model_multiple_output_dofn] |
| |
| func applyMultipleOut(s beam.Scope, words beam.PCollection) (belows, aboves, markeds, lengths, mixedMarkeds beam.PCollection) { |
| // [START model_multiple_output] |
| // beam.ParDo3 returns PCollections in the same order as |
| // the emit function parameters in processWords. |
| below, above, marked := beam.ParDo3(s, processWords, words) |
| |
| // processWordsMixed uses both a standard return and an emitter function. |
| // The standard return produces the first PCollection from beam.ParDo2, |
| // and the emitter produces the second PCollection. |
| length, mixedMarked := beam.ParDo2(s, processWordsMixed, words) |
| // [END model_multiple_output] |
| return below, above, marked, length, mixedMarked |
| } |
| |
| // [START model_paneinfo] |
| |
| func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) { |
| if pn.Timing == typex.PaneEarly || pn.Timing == typex.PaneOnTime { |
| // ... perform operation ... |
| } |
| if pn.Timing == typex.PaneLate { |
| // ... perform operation ... |
| } |
| if pn.IsFirst { |
| // ... perform operation ... |
| } |
| if pn.IsLast { |
| // ... perform operation ... |
| } |
| |
| words := strings.Split(line, " ") |
| for _, w := range words { |
| emitWords(w) |
| } |
| } |
| |
| // [END model_paneinfo] |
| |
| func contains(s []string, e string) bool { |
| for _, a := range s { |
| if a == e { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // [START state_and_timers] |
| |
| // stateAndTimersFn is an example stateful DoFn with state and a timer. |
| type stateAndTimersFn struct { |
| Buffer1 state.Bag[string] |
| Buffer2 state.Bag[int64] |
| Watermark timers.EventTime |
| } |
| |
| func (s *stateAndTimersFn) ProcessElement(sp state.Provider, tp timers.Provider, w beam.Window, key string, value int64, emit func(string, int64)) error { |
| // ... handle processing elements here, set a callback timer... |
| |
| // Read all the data from Buffer1 in this window. |
| vals, ok, err := s.Buffer1.Read(sp) |
| if err != nil { |
| return err |
| } |
| if ok && s.shouldClearBuffer(vals) { |
| // clear the buffer data if required conditions are met. |
| s.Buffer1.Clear(sp) |
| } |
| |
| // Add the value to Buffer2. |
| s.Buffer2.Add(sp, value) |
| |
| if s.allConditionsMet() { |
| // Clear the timer if certain condition met and you don't want to trigger |
| // the callback method. |
| s.Watermark.Clear(tp) |
| } |
| |
| emit(key, value) |
| |
| return nil |
| } |
| |
| func (s *stateAndTimersFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string, int64)) error { |
| // Window and key parameters are really useful especially for debugging issues. |
| switch timer.Family { |
| case s.Watermark.Family: |
| // timer expired, emit a different signal |
| emit(key, -1) |
| } |
| return nil |
| } |
| |
| func (s *stateAndTimersFn) shouldClearBuffer([]string) bool { |
| // some business logic |
| return false |
| } |
| |
| func (s *stateAndTimersFn) allConditionsMet() bool { |
| // other business logic |
| return true |
| } |
| |
| // [END state_and_timers] |
| |
| // [START bag_state] |
| |
| // bagStateFn only emits words that haven't been seen |
| type bagStateFn struct { |
| Bag state.Bag[string] |
| } |
| |
| func (s *bagStateFn) ProcessElement(p state.Provider, book, word string, emitWords func(string)) error { |
| // Get all values we've written to this bag state in this window. |
| vals, ok, err := s.Bag.Read(p) |
| if err != nil { |
| return err |
| } |
| if !ok || !contains(vals, word) { |
| emitWords(word) |
| s.Bag.Add(p, word) |
| } |
| |
| if len(vals) > 10000 { |
| // Example of clearing and starting again with an empty bag |
| s.Bag.Clear(p) |
| } |
| |
| return nil |
| } |
| |
| // [END bag_state] |
| |
| // [START value_state] |
| |
| // valueStateFn keeps track of the number of elements seen. |
| type valueStateFn struct { |
| Val state.Value[int] |
| } |
| |
| func (s *valueStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { |
| // Get the value stored in our state |
| val, ok, err := s.Val.Read(p) |
| if err != nil { |
| return err |
| } |
| if !ok { |
| s.Val.Write(p, 1) |
| } else { |
| s.Val.Write(p, val+1) |
| } |
| |
| if val > 10000 { |
| // Example of clearing and starting again with an empty bag |
| s.Val.Clear(p) |
| } |
| |
| return nil |
| } |
| |
| // [END value_state] |
| |
| type MyCustomType struct{} |
| |
| func (m MyCustomType) Bytes() []byte { |
| return nil |
| } |
| |
| func (m MyCustomType) FromBytes(_ []byte) MyCustomType { |
| return m |
| } |
| |
| // [START value_state_coder] |
| |
| type valueStateDoFn struct { |
| Val state.Value[MyCustomType] |
| } |
| |
| func encode(m MyCustomType) []byte { |
| return m.Bytes() |
| } |
| |
| func decode(b []byte) MyCustomType { |
| return MyCustomType{}.FromBytes(b) |
| } |
| |
| func init() { |
| beam.RegisterCoder(reflect.TypeOf((*MyCustomType)(nil)).Elem(), encode, decode) |
| } |
| |
| // [END value_state_coder] |
| |
| type combineFn struct{} |
| |
| // [START combining_state] |
| |
| // combiningStateFn keeps track of the number of elements seen. |
| type combiningStateFn struct { |
| // types are the types of the accumulator, input, and output respectively |
| Val state.Combining[int, int, int] |
| } |
| |
| func (s *combiningStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { |
| // Get the value stored in our state |
| val, _, err := s.Val.Read(p) |
| if err != nil { |
| return err |
| } |
| s.Val.Add(p, 1) |
| |
| if val > 10000 { |
| // Example of clearing and starting again with an empty bag |
| s.Val.Clear(p) |
| } |
| |
| return nil |
| } |
| |
| func combineState(s beam.Scope, input beam.PCollection) beam.PCollection { |
| // ... |
| // CombineFn param can be a simple fn like this or a structural CombineFn |
| cFn := state.MakeCombiningState[int, int, int]("stateKey", func(a, b int) int { |
| return a + b |
| }) |
| combined := beam.ParDo(s, combiningStateFn{Val: cFn}, input) |
| |
| // ... |
| |
| // [END combining_state] |
| |
| return combined |
| } |
| |
| // [START event_time_timer] |
| |
| type eventTimerDoFn struct { |
| State state.Value[int64] |
| Timer timers.EventTime |
| } |
| |
| func (fn *eventTimerDoFn) ProcessElement(ts beam.EventTime, sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) { |
| // ... |
| |
| // Set an event-time timer to the element timestamp. |
| fn.Timer.Set(tp, ts.ToTime()) |
| |
| // ... |
| } |
| |
| func (fn *eventTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { |
| switch timer.Family { |
| case fn.Timer.Family: |
| // process callback for this timer |
| } |
| } |
| |
| func AddEventTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &eventTimerDoFn{ |
| // Timers are given family names so their callbacks can be handled independantly. |
| Timer: timers.InEventTime("processWatermark"), |
| State: state.MakeValueState[int64]("latest"), |
| }, in) |
| } |
| |
| // [END event_time_timer] |
| |
| // [START processing_time_timer] |
| |
| type processingTimerDoFn struct { |
| Timer timers.ProcessingTime |
| } |
| |
| func (fn *processingTimerDoFn) ProcessElement(sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) { |
| // ... |
| |
| // Set a timer to go off 30 seconds in the future. |
| fn.Timer.Set(tp, time.Now().Add(30*time.Second)) |
| |
| // ... |
| } |
| |
| func (fn *processingTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { |
| switch timer.Family { |
| case fn.Timer.Family: |
| // process callback for this timer |
| } |
| } |
| |
| func AddProcessingTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &processingTimerDoFn{ |
| // Timers are given family names so their callbacks can be handled independantly. |
| Timer: timers.InProcessingTime("timer"), |
| }, in) |
| } |
| |
| // [END processing_time_timer] |
| |
| // [START dynamic_timer_tags] |
| |
| type hasAction interface { |
| Action() string |
| } |
| |
| type dynamicTagsDoFn[V hasAction] struct { |
| Timer timers.EventTime |
| } |
| |
| func (fn *dynamicTagsDoFn[V]) ProcessElement(ts beam.EventTime, tp timers.Provider, key string, value V, emitWords func(string)) { |
| // ... |
| |
| // Set a timer to go off 30 seconds in the future. |
| fn.Timer.Set(tp, ts.ToTime(), timers.WithTag(value.Action())) |
| |
| // ... |
| } |
| |
| func (fn *dynamicTagsDoFn[V]) OnTimer(tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { |
| switch timer.Family { |
| case fn.Timer.Family: |
| tag := timer.Tag // Do something with fired tag |
| _ = tag |
| } |
| } |
| |
| func AddDynamicTimerTagsDoFn[V hasAction](s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &dynamicTagsDoFn[V]{ |
| Timer: timers.InEventTime("actionTimers"), |
| }, in) |
| } |
| |
| // [END dynamic_timer_tags] |
| |
| // [START timer_output_timestamps_bad] |
| |
| type badTimerOutputTimestampsFn[V any] struct { |
| ElementBag state.Bag[V] |
| TimerSet state.Value[bool] |
| OutputState timers.ProcessingTime |
| } |
| |
| func (fn *badTimerOutputTimestampsFn[V]) ProcessElement(sp state.Provider, tp timers.Provider, key string, value V, emit func(string)) error { |
| // Add the current element to the bag for this key. |
| if err := fn.ElementBag.Add(sp, value); err != nil { |
| return err |
| } |
| set, _, err := fn.TimerSet.Read(sp) |
| if err != nil { |
| return err |
| } |
| if !set { |
| fn.OutputState.Set(tp, time.Now().Add(1*time.Minute)) |
| fn.TimerSet.Write(sp, true) |
| } |
| return nil |
| } |
| |
| func (fn *badTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string)) error { |
| switch timer.Family { |
| case fn.OutputState.Family: |
| vs, _, err := fn.ElementBag.Read(sp) |
| if err != nil { |
| return err |
| } |
| for _, v := range vs { |
| // Output each element |
| emit(fmt.Sprintf("%v", v)) |
| } |
| |
| fn.ElementBag.Clear(sp) |
| // Note that the timer has now fired. |
| fn.TimerSet.Clear(sp) |
| } |
| return nil |
| } |
| |
| // [END timer_output_timestamps_bad] |
| |
| // [START timer_output_timestamps_good] |
| |
| type element[V any] struct { |
| Timestamp int64 |
| Value V |
| } |
| |
| type goodTimerOutputTimestampsFn[V any] struct { |
| ElementBag state.Bag[element[V]] // The bag of elements accumulated. |
| TimerTimerstamp state.Value[int64] // The timestamp of the timer set. |
| MinTimestampInBag state.Combining[int64, int64, int64] // The minimum timestamp stored in the bag. |
| OutputState timers.ProcessingTime // The timestamp of the timer. |
| } |
| |
| func (fn *goodTimerOutputTimestampsFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) error { |
| // ... |
| // Add the current element to the bag for this key, and preserve the event time. |
| if err := fn.ElementBag.Add(sp, element[V]{Timestamp: et.Milliseconds(), Value: value}); err != nil { |
| return err |
| } |
| |
| // Keep track of the minimum element timestamp currently stored in the bag. |
| fn.MinTimestampInBag.Add(sp, et.Milliseconds()) |
| |
| // If the timer is already set, then reset it at the same time but with an updated output timestamp (otherwise |
| // we would keep resetting the timer to the future). If there is no timer set, then set one to expire in a minute. |
| ts, ok, _ := fn.TimerTimerstamp.Read(sp) |
| var tsToSet time.Time |
| if ok { |
| tsToSet = time.UnixMilli(ts) |
| } else { |
| tsToSet = time.Now().Add(1 * time.Minute) |
| } |
| |
| minTs, _, _ := fn.MinTimestampInBag.Read(sp) |
| outputTs := time.UnixMilli(minTs) |
| |
| // Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the |
| // timer fires. This allows outputting all the elements with their timestamp. |
| fn.OutputState.Set(tp, tsToSet, timers.WithOutputTimestamp(outputTs)) |
| fn.TimerTimerstamp.Write(sp, tsToSet.UnixMilli()) |
| |
| return nil |
| } |
| |
| func (fn *goodTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) error { |
| switch timer.Family { |
| case fn.OutputState.Family: |
| vs, _, err := fn.ElementBag.Read(sp) |
| if err != nil { |
| return err |
| } |
| for _, v := range vs { |
| // Output each element with their timestamp |
| emit(beam.EventTime(v.Timestamp), fmt.Sprintf("%v", v.Value)) |
| } |
| |
| fn.ElementBag.Clear(sp) |
| // Note that the timer has now fired. |
| fn.TimerTimerstamp.Clear(sp) |
| } |
| return nil |
| } |
| |
| func AddTimedOutputBatching[V any](s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &goodTimerOutputTimestampsFn[V]{ |
| ElementBag: state.MakeBagState[element[V]]("elementBag"), |
| TimerTimerstamp: state.MakeValueState[int64]("timerTimestamp"), |
| MinTimestampInBag: state.MakeCombiningState[int64, int64, int64]("minTimestampInBag", func(a, b int64) int64 { |
| if a < b { |
| return a |
| } |
| return b |
| }), |
| OutputState: timers.InProcessingTime("outputState"), |
| }, in) |
| } |
| |
| // [END timer_output_timestamps_good] |
| |
| // updateState exists for example purposes only |
| func updateState(sp, state, k, v any) {} |
| |
| // [START timer_garbage_collection] |
| |
| type timerGarbageCollectionFn[V any] struct { |
| State state.Value[V] // The state for the key. |
| MaxTimestampInBag state.Combining[int64, int64, int64] // The maximum element timestamp seen so far. |
| GcTimer timers.EventTime // The timestamp of the timer. |
| } |
| |
| func (fn *timerGarbageCollectionFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) { |
| updateState(sp, fn.State, key, value) |
| fn.MaxTimestampInBag.Add(sp, et.Milliseconds()) |
| |
| // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so |
| // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's |
| // worth of event time (as measured by the watermark), then the gc timer will fire. |
| maxTs, _, _ := fn.MaxTimestampInBag.Read(sp) |
| expirationTime := time.UnixMilli(maxTs).Add(1 * time.Hour) |
| fn.GcTimer.Set(tp, expirationTime) |
| } |
| |
| func (fn *timerGarbageCollectionFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) { |
| switch timer.Family { |
| case fn.GcTimer.Family: |
| // Clear all the state for the key |
| fn.State.Clear(sp) |
| fn.MaxTimestampInBag.Clear(sp) |
| } |
| } |
| |
| func AddTimerGarbageCollection[V any](s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &timerGarbageCollectionFn[V]{ |
| State: state.MakeValueState[V]("timerTimestamp"), |
| MaxTimestampInBag: state.MakeCombiningState[int64, int64, int64]("maxTimestampInBag", func(a, b int64) int64 { |
| if a > b { |
| return a |
| } |
| return b |
| }), |
| GcTimer: timers.InEventTime("gcTimer"), |
| }, in) |
| } |
| |
| // [END timer_garbage_collection] |
| |
| type Event struct{} |
| |
| func (*Event) isClick() bool { return false } |
| |
| // [START join_dofn_example] |
| |
| type JoinedEvent struct { |
| View, Click *Event |
| } |
| |
| type joinDoFn struct { |
| View state.Value[*Event] // Store the view event. |
| Click state.Value[*Event] // Store the click event. |
| |
| MaxTimestampSeen state.Combining[int64, int64, int64] // The maximum element timestamp seen so far. |
| GcTimer timers.EventTime // The timestamp of the timer. |
| } |
| |
| func (fn *joinDoFn) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, event *Event, emit func(JoinedEvent)) { |
| valueState := fn.View |
| if event.isClick() { |
| valueState = fn.Click |
| } |
| valueState.Write(sp, event) |
| |
| view, _, _ := fn.View.Read(sp) |
| click, _, _ := fn.Click.Read(sp) |
| if view != nil && click != nil { |
| emit(JoinedEvent{View: view, Click: click}) |
| fn.clearState(sp) |
| return |
| } |
| |
| fn.MaxTimestampSeen.Add(sp, et.Milliseconds()) |
| expTs, _, _ := fn.MaxTimestampSeen.Read(sp) |
| fn.GcTimer.Set(tp, time.UnixMilli(expTs).Add(1*time.Hour)) |
| } |
| |
| func (fn *joinDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) { |
| switch timer.Family { |
| case fn.GcTimer.Family: |
| fn.clearState(sp) |
| } |
| } |
| |
| func (fn *joinDoFn) clearState(sp state.Provider) { |
| fn.View.Clear(sp) |
| fn.Click.Clear(sp) |
| fn.MaxTimestampSeen.Clear(sp) |
| } |
| |
| func AddJoinDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &joinDoFn{ |
| View: state.MakeValueState[*Event]("view"), |
| Click: state.MakeValueState[*Event]("click"), |
| MaxTimestampSeen: state.MakeCombiningState[int64, int64, int64]("maxTimestampSeen", func(a, b int64) int64 { |
| if a > b { |
| return a |
| } |
| return b |
| }), |
| GcTimer: timers.InEventTime("gcTimer"), |
| }, in) |
| } |
| |
| // [END join_dofn_example] |
| |
| func sendRpc(...any) {} |
| |
| // [START batching_dofn_example] |
| |
| type bufferDoFn[V any] struct { |
| Elements state.Bag[V] // Store the elements buffered so far. |
| IsTimerSet state.Value[bool] // Keep track of whether a timer is currently set or not. |
| |
| OutputElements timers.ProcessingTime // The processing-time timer user to publish the RPC. |
| } |
| |
| func (fn *bufferDoFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V) { |
| fn.Elements.Add(sp, value) |
| |
| isSet, _, _ := fn.IsTimerSet.Read(sp) |
| if !isSet { |
| fn.OutputElements.Set(tp, time.Now().Add(10*time.Second)) |
| fn.IsTimerSet.Write(sp, true) |
| } |
| } |
| |
| func (fn *bufferDoFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context) { |
| switch timer.Family { |
| case fn.OutputElements.Family: |
| elements, _, _ := fn.Elements.Read(sp) |
| sendRpc(elements) |
| fn.Elements.Clear(sp) |
| fn.IsTimerSet.Clear(sp) |
| } |
| } |
| |
| func AddBufferDoFn[V any](s beam.Scope, in beam.PCollection) beam.PCollection { |
| return beam.ParDo(s, &bufferDoFn[V]{ |
| Elements: state.MakeBagState[V]("elements"), |
| IsTimerSet: state.MakeValueState[bool]("isTimerSet"), |
| |
| OutputElements: timers.InProcessingTime("outputElements"), |
| }, in) |
| } |
| |
| // [END batching_dofn_example] |
| |
| type statefulDoFn struct { |
| S state.Value[int] |
| } |
| |
| func statefulPipeline() beam.PCollection { |
| var s beam.Scope |
| var elements beam.PCollection |
| |
| // [START windowed_state] |
| |
| items := beam.ParDo(s, statefulDoFn{ |
| S: state.MakeValueState[int]("S"), |
| }, elements) |
| out := beam.WindowInto(s, window.NewFixedWindows(24*time.Hour), items) |
| |
| // [END windowed_state] |
| |
| return out |
| } |
| |
| func init() { |
| register.Function3x0(extractWordsFn) |
| // 1 input of type string => Emitter1[string] |
| register.Emitter1[string]() |
| } |
| |
| // [START countwords_composite] |
| // CountWords is a function that builds a composite PTransform |
| // to count the number of times each word appears. |
| func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { |
| // A subscope is required for a function to become a composite transform. |
| // We assign it to the original scope variable s to shadow the original |
| // for the rest of the CountWords function. |
| s = s.Scope("CountWords") |
| |
| // Since the same subscope is used for the following transforms, |
| // they are in the same composite PTransform. |
| |
| // Convert lines of text into individual words. |
| words := beam.ParDo(s, extractWordsFn, lines) |
| |
| // Count the number of times each word occurs. |
| wordCounts := stats.Count(s, words) |
| |
| // Return any PCollections that should be available after |
| // the composite transform. |
| return wordCounts |
| } |
| |
| // [END countwords_composite] |