blob: 3b17a0a96e1ae1a35792fdf87235da18e8cdac2f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
// TestDynamicSplit tests that a dynamic split of an in-progress SDF succeeds
// with valid input. It coordinates the two threads (processing and splitting)
// to test what happens if operations happen in various orders. The test then
// validates that the output of the SDF is correct according to the split.
func TestDynamicSplit(t *testing.T) {
tests := []struct {
name string
// driver is a function determining how the processing and splitting
// threads are created and coordinated.
driver func(*Plan, DataContext, *splitTestSdf) (error, splitResult)
// Complete a split before beginning processing.
name: "Simple",
driver: nonBlockingDriver,
// Try claiming while blocked on a split.
name: "BlockOnSplit",
driver: splitBlockingDriver,
// Try splitting while blocked on a claim.
name: "BlockOnClaim",
driver: claimBlockingDriver,
for _, test := range tests {
test := test
t.Run(, func(t *testing.T) {
// Create pipeline.
sdf := newSplitTestSdf()
dfn, err := graph.NewDoFn(sdf, graph.NumMainInputs(graph.MainSingle))
if err != nil {
t.Fatalf("invalid function: %v", err)
cdr := createSplitTestInCoder()
plan, out := createSdfPlan(t, t.Name(), dfn, cdr)
// Create thread to send element to pipeline.
pr, pw := io.Pipe()
elm := createElm()
go writeElm(elm, cdr, pw)
dc := DataContext{Data: &TestDataManager{R: pr}}
// Call driver to coordinate processing & splitting threads.
procRes, splitRes := test.driver(plan, dc, sdf)
// Validate we get a valid split result, aside from split elements.
if splitRes.err != nil {
t.Fatalf("Plan.Split failed: %v", splitRes.err)
wantSplit := SplitResult{
PI: -1,
RI: 1,
PS: nil,
RS: nil,
TId: testTransformId,
InId: indexToInputId(0),
if diff := cmp.Diff(splitRes.split, wantSplit, cmpopts.IgnoreFields(SplitResult{}, "PS", "RS")); diff != "" {
t.Errorf("Incorrect split result (ignoring split elements): %v", diff)
// Validate split elements are encoded correctly by decoding them
// with the input coder to the path.
// TODO(BEAM-10579) Switch to using splittable unit's input coder
// once that is implemented.
p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
if err != nil {
t.Errorf("Failed decoding primary element split: %v", err)
_, err = decodeDynSplitElm(splitRes.split.RS, cdr)
if err != nil {
t.Errorf("Failed decoding residual element split: %v", err)
// Validate SDF output. Make sure each restriction matches the split result.
if err := procRes; err != nil {
pRest := p.Elm.(*FullValue).Elm2.(offsetrange.Restriction)
if got, want := len(out.Elements), int(pRest.End-pRest.Start); got != want {
t.Errorf("Unexpected number of elements: got: %v, want: %v", got, want)
for i, fv := range out.Elements {
rest := fv.Elm.(offsetrange.Restriction)
if got, want := rest, pRest; !cmp.Equal(got, want) {
t.Errorf("Output element %v had incorrect restriction: got: %v, want: %v", i, got, want)
// nonBlockingDriver performs a split before starting processing, so no thread
// is forced to wait on a mutex.
func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (procRes error, splitRes splitResult) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
rt := <-sdf.rt // Tracker is created first, retrieve that.
// Complete a split before unblocking processing.
splitResCh := make(chan splitResult)
go splitPlan(plan, splitResCh)
splitRes = <-splitResCh
// Unblock and finishing processing.
procRes = <-procResCh
return procRes, splitRes
// splitBlockingDriver blocks on a split request so that the SDF attempts to
// claim while the split is occurring.
func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (procRes error, splitRes splitResult) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
rt := <-sdf.rt // Tracker is created first, retrieve that.
// Start a split, but block on it so it holds the mutex.
splitResCh := make(chan splitResult)
go splitPlan(plan, splitResCh)
// Start processing and start a claim, that'll be waiting for the mutex.
// Unblock and finish splitting and free the mutex.
splitRes = <-splitResCh
// Unblock and finish claiming and processing.
procRes = <-procResCh
return procRes, splitRes
// claimBlockingDriver blocks on a claim request so that the SDF attempts to
// split while the claim is occurring.
func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (procRes error, splitRes splitResult) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
rt := <-sdf.rt // Tracker is created first, retrieve that.
// Start a claim, but block on it so it holds the mutex.
// Start a split that'll be waiting for the mutex.
splitResCh := make(chan splitResult)
go splitPlan(plan, splitResCh)
// Unblock the claim, freeing the mutex (but not finishing processing yet).
// Finish splitting, allowing processing to finish.
splitRes = <-splitResCh
<-rt.endClaim // Delay the claim end so we don't process too much before splitting.
procRes = <-procResCh
return procRes, splitRes
// createElm creates the element for our test pipeline.
func createElm() *FullValue {
return &FullValue{
Elm: &FullValue{
Elm: 20,
Elm2: offsetrange.Restriction{Start: 0, End: 20},
Elm2: float64(20),
// createSplitTestInCoder outputs the coder for inputs to our test pipeline,
// (in particular, the DataSource transform of the pipeline). For the specific
// element this is a coder for, see createElm.
func createSplitTestInCoder() *coder.Coder {
restT := reflect.TypeOf((*offsetrange.Restriction)(nil)).Elem()
restCdr := coder.LookupCustomCoder(restT)
cdr := coder.NewW(
{Kind: coder.Custom, T: typex.New(restT), Custom: restCdr},
return cdr
// createSdfPlan creates a plan containing the test pipeline, which is
// DataSource -> SDF.ProcessSizedElementsAndRestrictions -> CaptureNode.
func createSdfPlan(t *testing.T, name string, fn *graph.DoFn, cdr *coder.Coder) (*Plan, *CaptureNode) {
out := &CaptureNode{UID: 0}
n := &ParDo{UID: 1, Fn: fn, Out: []Node{out}}
sdf := &ProcessSizedElementsAndRestrictions{PDo: n, TfId: testTransformId}
ds := &DataSource{
UID: 2,
SID: StreamID{PtransformID: "DataSource"},
Name: "name",
Coder: cdr,
Out: sdf,
units := []Unit{ds, sdf, out}
p, err := NewPlan(name+"_plan", units)
if err != nil {
t.Fatalf("NewPlan failed: %v", err)
return p, out
// writeElm is meant to be the goroutine for feeding an element to the
// DataSourc of the test pipeline.
func writeElm(elm *FullValue, cdr *coder.Coder, pw *io.PipeWriter) {
wc := MakeWindowEncoder(cdr.Window)
ec := MakeElementEncoder(coder.SkipW(cdr))
if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow, mtime.ZeroTimestamp, pw); err != nil {
if err := ec.Encode(elm, pw); err != nil {
if err := pw.Close(); err != nil {
func decodeDynSplitElm(elm []byte, cdr *coder.Coder) (*FullValue, error) {
wd := MakeWindowDecoder(cdr.Window)
ed := MakeElementDecoder(coder.SkipW(cdr))
b := bytes.NewBuffer(elm)
w, t, err := DecodeWindowedValueHeader(wd, b)
if err != nil {
return nil, err
e, err := ed.Decode(b)
if err != nil {
return nil, err
e.Windows = w
e.Timestamp = t
return e, nil
// processPlan is meant to be the goroutine representing the thread processing
// the SDF.
func processPlan(plan *Plan, dc DataContext, result chan error) {
if err := plan.Execute(context.Background(), plan.ID()+"_execute", dc); err != nil {
result <- errors.Wrap(err, "Plan.Execute failed")
if err := plan.Down(context.Background()); err != nil {
result <- errors.Wrap(err, "Plan.Down failed")
result <- nil
type splitResult struct {
split SplitResult
err error
// splitPlan is meant to be the goroutine representing the thread handling a
// split request for the SDF.
func splitPlan(plan *Plan, result chan splitResult) {
split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1})
result <- splitResult{split: split, err: err}
// splitTestRTracker adds signals needed to coordinate splitting and claiming
// over multiple threads for this test. Semantically, this tracker is an
// offset range tracker representing a range of integers to output.
type splitTestRTracker struct {
mu sync.Mutex // Lock on accessing underlying tracker.
rt *offsetrange.Tracker
// Send signals when starting a claim, blocking a claim, and ending a claim.
claim chan struct{}
blockClaim chan struct{}
endClaim chan struct{}
blockInd int64 // Only send signals when claiming a specific position.
// Send signals when starting a split, and blocking a split. Important note:
// the spot to use these in this test is dependent on the first operation
// taking place on a split, which may not necessarily be TrySplit.
split chan struct{}
blockSplit chan struct{}
func newSplitTestRTracker(rest offsetrange.Restriction) *splitTestRTracker {
return &splitTestRTracker{
rt: offsetrange.NewTracker(rest),
claim: make(chan struct{}, 1),
blockClaim: make(chan struct{}),
endClaim: make(chan struct{}),
blockInd: rest.Start,
split: make(chan struct{}, 1),
blockSplit: make(chan struct{}),
func (rt *splitTestRTracker) TryClaim(pos interface{}) bool {
i := pos.(int64)
if i == rt.blockInd {
rt.claim <- struct{}{}
if i == rt.blockInd {
rt.blockClaim <- struct{}{}
result := rt.rt.TryClaim(pos)
if i == rt.blockInd {
rt.endClaim <- struct{}{}
return result
func (rt *splitTestRTracker) GetError() error {
return rt.rt.GetError()
func (rt *splitTestRTracker) TrySplit(fraction float64) (interface{}, interface{}, error) {
rt.blockSplit <- struct{}{}
return rt.rt.TrySplit(fraction)
func (rt *splitTestRTracker) GetProgress() (float64, float64) {
// Note: Currently, GetProgress is called first in a split and blocks if
// TryClaim is being called.
rt.split <- struct{}{}
return rt.rt.GetProgress()
func (rt *splitTestRTracker) IsDone() bool {
return rt.rt.IsDone()
func (rt *splitTestRTracker) GetRestriction() interface{} {
return rt.rt.GetRestriction()
// splitTestSdf has signals needed to control processing behavior over multiple
// threads. The actual behavior is to accept an integer N as the element and
// output each element in the range of [0, N).
type splitTestSdf struct {
proc chan struct{}
rt chan *splitTestRTracker // Used to provide created trackers to the test code.
func newSplitTestSdf() *splitTestSdf {
return &splitTestSdf{
proc: make(chan struct{}),
rt: make(chan *splitTestRTracker),
func (fn *splitTestSdf) ProcessElement(rt *splitTestRTracker, _ int, emit func(offsetrange.Restriction, int)) {
i := rt.GetRestriction().(offsetrange.Restriction).Start
fn.proc <- struct{}{}
for rt.TryClaim(i) == true {
rest := rt.GetRestriction().(offsetrange.Restriction)
emit(rest, int(i))
func (fn *splitTestSdf) CreateInitialRestriction(i int) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: int64(i),
func (fn *splitTestSdf) SplitRestriction(_ int, rest offsetrange.Restriction) []offsetrange.Restriction {
return []offsetrange.Restriction{rest}
func (fn *splitTestSdf) RestrictionSize(_ int, rest offsetrange.Restriction) float64 {
return rest.Size()
func (fn *splitTestSdf) CreateTracker(rest offsetrange.Restriction) *splitTestRTracker {
rt := newSplitTestRTracker(rest)
fn.rt <- rt
return rt