blob: 2a4b2d7fdd2ea7792c0884b11487de5aa58d3a98 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package harness
import (
"context"
"io"
"io/ioutil"
"log"
"testing"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
)
type fakeClient struct {
t *testing.T
done chan bool
calls int
}
func (f *fakeClient) Recv() (*pb.Elements, error) {
f.calls++
data := []byte{1, 2, 3, 4}
elemData := pb.Elements_Data{
InstructionReference: "inst_ref",
Data: data,
Target: &pb.Target{
PrimitiveTransformReference: "ptr",
Name: "instruction_name",
},
}
msg := pb.Elements{}
for i := 0; i < bufElements+1; i++ {
msg.Data = append(msg.Data, &elemData)
}
// The first two calls fill up the buffer completely to stimulate the deadlock
// The third call ends the data stream normally.
// Subsequent calls return no data.
switch f.calls {
case 1:
return &msg, nil
case 2:
return &msg, nil
case 3:
elemData.Data = []byte{}
msg.Data = []*pb.Elements_Data{&elemData}
// Broadcasting done here means that this code providing messages
// has not been blocked by the bug blocking the dataReader
// from getting more messages.
return &msg, nil
default:
f.done <- true
return nil, io.EOF
}
}
func (f *fakeClient) Send(*pb.Elements) error {
return nil
}
func TestDataChannelTerminateOnClose(t *testing.T) {
// The logging of channels closed is quite noisy for this test
log.SetOutput(ioutil.Discard)
done := make(chan bool, 1)
client := &fakeClient{t: t, done: done}
c := makeDataChannel(context.Background(), "id", client)
r := c.OpenRead(context.Background(), exec.Target{ID: "ptr", Name: "instruction_name"}, "inst_ref")
var read = make([]byte, 4)
// We don't read up all the buffered data, but immediately close the reader.
// Previously, since nothing was consuming the incoming gRPC data, the whole
// data channel would get stuck, and the client.Recv() call was eventually
// no longer called.
_, err := r.Read(read)
if err != nil {
t.Errorf("Unexpected error from read: %v", err)
}
r.Close()
// If done is signaled, that means client.Recv() has been called to flush the
// channel, meaning consumer code isn't stuck.
<-done
}