blob: 453cf9fbb549e572d5a4611a97762133f1a3e28d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package harness
import (
"context"
"io"
"sync"
"time"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
)
const (
chunkSize = int(4e6) // Bytes to put in a single gRPC message. Max is slightly higher.
bufElements = 20 // Number of chunks buffered per reader.
)
// ScopedDataManager scopes the global gRPC data manager to a single instruction.
// The indirection makes it easier to control access.
type ScopedDataManager struct {
mgr *DataChannelManager
instID string
// TODO(herohde) 7/20/2018: capture and force close open reads/writes. However,
// we would need the underlying Close to be idempotent or a separate method.
closed bool
mu sync.Mutex
}
// NewScopedDataManager returns a ScopedDataManager for the given instruction.
func NewScopedDataManager(mgr *DataChannelManager, instID string) *ScopedDataManager {
return &ScopedDataManager{mgr: mgr, instID: instID}
}
func (s *ScopedDataManager) OpenRead(ctx context.Context, id exec.StreamID) (io.ReadCloser, error) {
ch, err := s.open(ctx, id.Port)
if err != nil {
return nil, err
}
return ch.OpenRead(ctx, id.PtransformID, s.instID), nil
}
func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) (io.WriteCloser, error) {
ch, err := s.open(ctx, id.Port)
if err != nil {
return nil, err
}
return ch.OpenWrite(ctx, id.PtransformID, s.instID), nil
}
func (s *ScopedDataManager) open(ctx context.Context, port exec.Port) (*DataChannel, error) {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil, errors.Errorf("instruction %v no longer processing", s.instID)
}
local := s.mgr
s.mu.Unlock()
return local.Open(ctx, port) // don't hold lock over potentially slow operation
}
func (s *ScopedDataManager) Close() error {
s.mu.Lock()
s.closed = true
s.mgr = nil
s.mu.Unlock()
return nil
}
// DataChannelManager manages data channels over the Data API. A fixed number of channels
// are generally used, each managing multiple logical byte streams. Thread-safe.
type DataChannelManager struct {
ports map[string]*DataChannel
mu sync.Mutex // guards the ports map
}
// Open opens a R/W DataChannel over the given port.
func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataChannel, error) {
if port.URL == "" {
panic("empty port")
}
m.mu.Lock()
defer m.mu.Unlock()
if m.ports == nil {
m.ports = make(map[string]*DataChannel)
}
if con, ok := m.ports[port.URL]; ok {
return con, nil
}
ch, err := newDataChannel(ctx, port)
if err != nil {
return nil, err
}
m.ports[port.URL] = ch
return ch, nil
}
// clientID identifies a client of a connected channel.
type clientID struct {
ptransformID string
instID string
}
// This is a reduced version of the full gRPC interface to help with testing.
// TODO(wcn): need a compile-time assertion to make sure this stays synced with what's
// in pb.BeamFnData_DataClient
type dataClient interface {
Send(*pb.Elements) error
Recv() (*pb.Elements, error)
}
// DataChannel manages a single multiplexed gRPC connection over the Data API. Data is
// pushed over the channel, so data for a reader may arrive before the reader connects.
// Thread-safe.
type DataChannel struct {
id string
client dataClient
writers map[clientID]*dataWriter
readers map[clientID]*dataReader
// TODO: early/late closed, bad instructions, finer locks, reconnect?
mu sync.Mutex // guards both the readers and writers maps.
}
func newDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) {
cc, err := dial(ctx, port.URL, 15*time.Second)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to data service at %v", port.URL)
}
client, err := pb.NewBeamFnDataClient(cc).Data(ctx)
if err != nil {
cc.Close()
return nil, errors.Wrapf(err, "failed to create data client on %v", port.URL)
}
return makeDataChannel(ctx, port.URL, client), nil
}
func makeDataChannel(ctx context.Context, id string, client dataClient) *DataChannel {
ret := &DataChannel{
id: id,
client: client,
writers: make(map[clientID]*dataWriter),
readers: make(map[clientID]*dataReader),
}
go ret.read(ctx)
return ret
}
func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, instID string) io.ReadCloser {
return c.makeReader(ctx, clientID{ptransformID: ptransformID, instID: instID})
}
func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID string) io.WriteCloser {
return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
}
func (c *DataChannel) read(ctx context.Context) {
cache := make(map[clientID]*dataReader)
for {
msg, err := c.client.Recv()
if err != nil {
if err == io.EOF {
// TODO(herohde) 10/12/2017: can this happen before shutdown? Reconnect?
log.Warnf(ctx, "DataChannel.read %v closed", c.id)
return
}
log.Errorf(ctx, "DataChannel.read %v bad", c.id)
return
}
recordStreamReceive(msg)
// Each message may contain segments for multiple streams, so we
// must treat each segment in isolation. We maintain a local cache
// to reduce lock contention.
for _, elm := range msg.GetData() {
id := clientID{ptransformID: elm.TransformId, instID: elm.GetInstructionId()}
// log.Printf("Chan read (%v): %v\n", sid, elm.GetData())
var r *dataReader
if local, ok := cache[id]; ok {
r = local
} else {
r = c.makeReader(ctx, id)
cache[id] = r
}
if r.completed {
// The local reader has closed but the remote is still sending data.
// Just ignore it. We keep the reader config in the cache so we don't
// treat it as a new reader. Eventually the stream will finish and go
// through normal teardown.
continue
}
if len(elm.GetData()) == 0 {
// Sentinel EOF segment for stream. Close buffer to signal EOF.
close(r.buf)
// Clean up local bookkeeping. We'll never see another message
// for it again. We have to be careful not to remove the real
// one, because readers may be initialized after we've seen
// the full stream.
delete(cache, id)
continue
}
// This send is deliberately blocking, if we exceed the buffering for
// a reader. We can't buffer the entire main input, if some user code
// is slow (or gets stuck). If the local side closes, the reader
// will be marked as completed and further remote data will be ignored.
select {
case r.buf <- elm.GetData():
case <-r.done:
r.completed = true
}
}
}
}
func (c *DataChannel) makeReader(ctx context.Context, id clientID) *dataReader {
c.mu.Lock()
defer c.mu.Unlock()
if r, ok := c.readers[id]; ok {
return r
}
r := &dataReader{id: id, buf: make(chan []byte, bufElements), done: make(chan bool, 1), channel: c}
c.readers[id] = r
return r
}
func (c *DataChannel) removeReader(id clientID) {
c.mu.Lock()
delete(c.readers, id)
c.mu.Unlock()
}
func (c *DataChannel) makeWriter(ctx context.Context, id clientID) *dataWriter {
c.mu.Lock()
defer c.mu.Unlock()
if w, ok := c.writers[id]; ok {
return w
}
w := &dataWriter{ch: c, id: id}
c.writers[id] = w
return w
}
type dataReader struct {
id clientID
buf chan []byte
done chan bool
cur []byte
channel *DataChannel
completed bool
}
func (r *dataReader) Close() error {
r.done <- true
r.channel.removeReader(r.id)
return nil
}
func (r *dataReader) Read(buf []byte) (int, error) {
if r.cur == nil {
b, ok := <-r.buf
if !ok {
return 0, io.EOF
}
r.cur = b
}
n := copy(buf, r.cur)
if len(r.cur) == n {
r.cur = nil
} else {
r.cur = r.cur[n:]
}
return n, nil
}
// TODO(herohde) 7/20/2018: we should probably either not be tracking writers or
// make dataWriter threadsafe. Either case is likely a corruption generator.
type dataWriter struct {
buf []byte
id clientID
ch *DataChannel
}
func (w *dataWriter) Close() error {
// Don't acquire the locks as Flush will do so.
err := w.Flush()
if err != nil {
return err
}
// Now acquire the locks since we're sending.
w.ch.mu.Lock()
defer w.ch.mu.Unlock()
delete(w.ch.writers, w.id)
msg := &pb.Elements{
Data: []*pb.Elements_Data{
{
InstructionId: w.id.instID,
TransformId: w.id.ptransformID,
// Empty data == sentinel
},
},
}
// TODO(wcn): if this send fails, we have a data channel that's lingering that
// the runner is still waiting on. Need some way to identify these and resolve them.
recordStreamSend(msg)
return w.ch.client.Send(msg)
}
func (w *dataWriter) Flush() error {
w.ch.mu.Lock()
defer w.ch.mu.Unlock()
if w.buf == nil {
return nil
}
msg := &pb.Elements{
Data: []*pb.Elements_Data{
{
InstructionId: w.id.instID,
TransformId: w.id.ptransformID,
Data: w.buf,
},
},
}
w.buf = nil
recordStreamSend(msg)
return w.ch.client.Send(msg)
}
func (w *dataWriter) Write(p []byte) (n int, err error) {
if len(w.buf)+len(p) > chunkSize {
l := len(w.buf)
// We can't fit this message into the buffer. We need to flush the buffer
if err := w.Flush(); err != nil {
return 0, errors.Wrapf(err, "datamgr.go: error flushing buffer of length %d", l)
}
}
// At this point there's room in the buffer one way or another.
w.buf = append(w.buf, p...)
return len(p), nil
}