blob: 538eb9a632ef4d99e0ff7ffad57d823d9d000ddb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
// TODO(herohde) 1/29/2018: using FullValue for nested KVs is somewhat of a hack
// and is errorprone. Consider making it a more explicit data structure.
// FullValue represents the full runtime value for a data element, incl. the
// implicit context. The result of a GBK or CoGBK is not a single FullValue.
// The consumer is responsible for converting the values to the correct type.
type FullValue struct {
Elm interface{} // Element or KV key.
Elm2 interface{} // KV value, if not invalid
Timestamp typex.EventTime
Windows []typex.Window
func (v *FullValue) String() string {
if v.Elm2 == nil {
return fmt.Sprintf("%v [@%v:%v]", v.Elm, v.Timestamp, v.Windows)
return fmt.Sprintf("KV<%v,%v> [@%v:%v]", v.Elm, v.Elm2, v.Timestamp, v.Windows)
// Stream is a FullValue reader. It returns io.EOF when complete, but can be
// prematurely closed.
type Stream interface {
Read() (*FullValue, error)
// ReStream is re-iterable stream, i.e., a Stream factory.
type ReStream interface {
Open() (Stream, error)
// FixedReStream is a simple in-memory ReStream.
type FixedReStream struct {
Buf []FullValue
// Open returns the a Stream from the start of the in-memory ReStream.
func (n *FixedReStream) Open() (Stream, error) {
return &FixedStream{Buf: n.Buf}, nil
// FixedStream is a simple in-memory Stream from a fixed array.
type FixedStream struct {
Buf []FullValue
next int
// Close releases the buffer, closing the stream.
func (s *FixedStream) Close() error {
s.Buf = nil
return nil
// Read produces the next value in the stream.
func (s *FixedStream) Read() (*FullValue, error) {
if s.Buf == nil || == len(s.Buf) {
return nil, io.EOF
ret := s.Buf[]
return &ret, nil
// TODO(herohde) 1/19/2018: type-specialize list and other conversions?
// Convert converts type of the runtime value to the desired one. It is needed
// to drop the universal type and convert Aggregate types.
func Convert(v interface{}, to reflect.Type) interface{} {
from := reflect.TypeOf(v)
switch {
case from == to:
return v
case typex.IsUniversal(from):
// We need to drop T to obtain the underlying type of the value.
return reflectx.UnderlyingType(reflect.ValueOf(v)).Interface()
// TODO(herohde) 1/19/2018: reflect.ValueOf(v).Convert(to).Interface() instead?
case typex.IsList(from) && typex.IsList(to):
// Convert []A to []B.
value := reflect.ValueOf(v)
ret := reflect.New(to).Elem()
for i := 0; i < value.Len(); i++ {
ret = reflect.Append(ret, reflect.ValueOf(Convert(value.Index(i).Interface(), to.Elem())))
return ret.Interface()
// Arguably this should be:
// reflect.ValueOf(v).Convert(to).Interface()
// but this isn't desirable as it would add avoidable overhead to
// functions where it applies. A user will have better performance
// by explicitly doing the type conversion in their code, which
// the error will indicate. Slow Magic vs Fast & Explicit.
return v
// ConvertFn returns a function that converts type of the runtime value to the desired one. It is needed
// to drop the universal type and convert Aggregate types.
func ConvertFn(from, to reflect.Type) func(interface{}) interface{} {
switch {
case from == to:
return identity
case typex.IsUniversal(from):
return universal
case typex.IsList(from) && typex.IsList(to):
fromE := from.Elem()
toE := to.Elem()
cvtFn := ConvertFn(fromE, toE)
return func(v interface{}) interface{} {
// Convert []A to []B.
value := reflect.ValueOf(v)
ret := reflect.New(to).Elem()
for i := 0; i < value.Len(); i++ {
ret = reflect.Append(ret, reflect.ValueOf(cvtFn(value.Index(i).Interface())))
return ret.Interface()
return identity
// identity is the identity function.
func identity(v interface{}) interface{} {
return v
// universal drops the universal type and re-interfaces it to the actual one.
func universal(v interface{}) interface{} {
return reflectx.UnderlyingType(reflect.ValueOf(v)).Interface()
// ReadAll read a full restream and returns the result.
func ReadAll(rs ReStream) ([]FullValue, error) {
s, err := rs.Open()
if err != nil {
return nil, err
defer s.Close()
var ret []FullValue
for {
elm, err := s.Read()
if err != nil {
if err == io.EOF {
return ret, nil
return nil, err
ret = append(ret, *elm)