blob: 5d40b4af40570da2be9b97e553d22dfcbb004c94 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
// DataSource is a Root execution unit.
type DataSource struct {
UID UnitID
SID StreamID
Coder *coder.Coder
Out Node
source DataManager
count int64
start time.Time
}
func (n *DataSource) ID() UnitID {
return n.UID
}
func (n *DataSource) Up(ctx context.Context) error {
return nil
}
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
n.source = data.Data
n.start = time.Now()
atomic.StoreInt64(&n.count, 0)
return n.Out.StartBundle(ctx, id, data)
}
func (n *DataSource) Process(ctx context.Context) error {
r, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return err
}
defer r.Close()
c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)
switch {
case coder.IsCoGBK(c):
ck := MakeElementDecoder(c.Components[0])
cv := MakeElementDecoder(c.Components[1])
for {
ws, t, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "source failed")
}
// Decode key
key, err := ck.Decode(r)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
key.Timestamp = t
key.Windows = ws
// TODO(herohde) 4/30/2017: the State API will be handle re-iterations
// and only "small" value streams would be inline. Presumably, that
// would entail buffering the whole stream. We do that for now.
var buf []FullValue
size, err := coder.DecodeInt32(r)
if err != nil {
return errors.Wrap(err, "stream size decoding failed")
}
if size > -1 {
// Single chunk stream.
// log.Printf("Fixed size=%v", size)
atomic.AddInt64(&n.count, int64(size))
for i := int32(0); i < size; i++ {
value, err := cv.Decode(r)
if err != nil {
return errors.Wrap(err, "stream value decode failed")
}
buf = append(buf, *value)
}
} else {
// Multi-chunked stream.
for {
chunk, err := coder.DecodeVarUint64(r)
if err != nil {
return errors.Wrap(err, "stream chunk size decoding failed")
}
// log.Printf("Chunk size=%v", chunk)
if chunk == 0 {
break
}
atomic.AddInt64(&n.count, int64(chunk))
for i := uint64(0); i < chunk; i++ {
value, err := cv.Decode(r)
if err != nil {
return errors.Wrap(err, "stream value decode failed")
}
buf = append(buf, *value)
}
}
}
values := &FixedReStream{Buf: buf}
if err := n.Out.ProcessElement(ctx, key, values); err != nil {
return err
}
}
default:
ec := MakeElementDecoder(c)
for {
atomic.AddInt64(&n.count, 1)
ws, t, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "source failed")
}
elm, err := ec.Decode(r)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
elm.Timestamp = t
elm.Windows = ws
// log.Printf("READ: %v %v", elm.Key.Type(), elm.Key.Interface())
if err := n.Out.ProcessElement(ctx, elm); err != nil {
return err
}
}
}
}
func (n *DataSource) FinishBundle(ctx context.Context) error {
log.Infof(ctx, "DataSource: %d elements in %d ns", atomic.LoadInt64(&n.count), time.Now().Sub(n.start))
n.source = nil
err := n.Out.FinishBundle(ctx)
atomic.StoreInt64(&n.count, 0)
return err
}
func (n *DataSource) Down(ctx context.Context) error {
n.source = nil
return nil
}
func (n *DataSource) String() string {
return fmt.Sprintf("DataSource[%v] Coder:%v Out:%v", n.SID, n.Coder, n.Out.ID())
}
// ProgressReportSnapshot captures the progress reading an input source.
type ProgressReportSnapshot struct {
ID, Name string
Count int64
}
// Progress returns a snapshot of the source's progress.
func (n *DataSource) Progress() ProgressReportSnapshot {
if n == nil {
return ProgressReportSnapshot{}
}
return ProgressReportSnapshot{n.SID.Target.ID, n.SID.Target.Name, atomic.LoadInt64(&n.count)}
}