blob: 08cc279b40df042edb771e3cfa1cadfd8d159721 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"io"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
)
// This file contains support for side input.
// iterableSideInputKey is the fixed runtime key value for iterable side input.
const iterableSideInputKey = ""
// SideInputAdapter provides a concrete ReStream from a low-level side input reader. It
// encapsulates StreamID and coding as needed.
type SideInputAdapter interface {
NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error)
}
type sideInputAdapter struct {
sid StreamID
sideInputID string
wc WindowEncoder
kc ElementEncoder
ec ElementDecoder
}
// NewSideInputAdapter returns a side input adapter for the given StreamID and coder.
// It expects a W<KV<K,V>> coder, because the protocol supports MultiSet access only.
func NewSideInputAdapter(sid StreamID, sideInputID string, c *coder.Coder) SideInputAdapter {
if !coder.IsW(c) || !coder.IsKV(coder.SkipW(c)) {
panic(fmt.Sprintf("expected WKV coder for side input %v: %v", sid, c))
}
wc := MakeWindowEncoder(c.Window)
kc := MakeElementEncoder(coder.SkipW(c).Components[0])
ec := MakeElementDecoder(coder.SkipW(c).Components[1])
return &sideInputAdapter{sid: sid, sideInputID: sideInputID, wc: wc, kc: kc, ec: ec}
}
func (s *sideInputAdapter) NewIterable(ctx context.Context, reader StateReader, w typex.Window) (ReStream, error) {
key, err := EncodeElement(s.kc, []byte(iterableSideInputKey))
if err != nil {
return nil, err
}
win, err := EncodeWindow(s.wc, w)
if err != nil {
return nil, err
}
return &proxyReStream{
open: func() (Stream, error) {
r, err := reader.OpenSideInput(ctx, s.sid, s.sideInputID, key, win)
if err != nil {
return nil, err
}
return &elementStream{r: r, ec: s.ec}, nil
},
}, nil
}
func (s *sideInputAdapter) String() string {
return fmt.Sprintf("SideInputAdapter[%v, %v]", s.sid, s.sideInputID)
}
// proxyReStream is a simple wrapper of an open function.
type proxyReStream struct {
open func() (Stream, error)
}
func (p *proxyReStream) Open() (Stream, error) {
return p.open()
}
// elementStream exposes a Stream from decoding elements.
type elementStream struct {
r io.ReadCloser
ec ElementDecoder
}
func (s *elementStream) Close() error {
return s.r.Close()
}
func (s *elementStream) Read() (*FullValue, error) {
// We should see a stream of unwindowed values -- no sizes, no key.
return s.ec.Decode(s.r)
}
// FixedKey transform any value into KV<K, V> for a fixed K.
type FixedKey struct {
// UID is the unit identifier.
UID UnitID
// Key is the given key
Key interface{}
// Out is the successor node.
Out Node
}
func (n *FixedKey) ID() UnitID {
return n.UID
}
func (n *FixedKey) Up(ctx context.Context) error {
return nil
}
func (n *FixedKey) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.Out.StartBundle(ctx, id, data)
}
func (n *FixedKey) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
// Transform: V to KV<K,V>
v := &FullValue{
Elm: n.Key,
Elm2: elm,
Timestamp: elm.Timestamp,
Windows: elm.Windows,
}
return n.Out.ProcessElement(ctx, v, values...)
}
func (n *FixedKey) FinishBundle(ctx context.Context) error {
return n.Out.FinishBundle(ctx)
}
func (n *FixedKey) Down(ctx context.Context) error {
return nil
}
func (n *FixedKey) String() string {
return fmt.Sprintf("FixedKey[%v]. Out:%v", n.Key, n.Out.ID())
}