blob: 4e5dc2ccddb2766443d41411633b755f5ba2239f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coder
import (
"io"
"reflect"
)
// TODO(lostluck): 2020.08.04 export these for use for others?
// mapDecoder produces a decoder for the beam schema map encoding.
func mapDecoder(rt reflect.Type, decodeToKey, decodeToElem func(reflect.Value, io.Reader) error) func(reflect.Value, io.Reader) error {
return func(ret reflect.Value, r io.Reader) error {
// (1) Read count prefixed encoded data
size, err := DecodeInt32(r)
if err != nil {
return err
}
n := int(size)
ret.Set(reflect.MakeMapWithSize(rt, n))
for i := 0; i < n; i++ {
rvk := reflect.New(rt.Key()).Elem()
if err := decodeToKey(rvk, r); err != nil {
return err
}
rvv := reflect.New(rt.Elem()).Elem()
if err := decodeToElem(rvv, r); err != nil {
return err
}
ret.SetMapIndex(rvk, rvv)
}
return nil
}
}
// containerNilDecoder handles when a value is nillable for map or iterable components.
// Nillable types have an extra byte prefixing them indicating nil status.
func containerNilDecoder(decodeToElem func(reflect.Value, io.Reader) error) func(reflect.Value, io.Reader) error {
return func(ret reflect.Value, r io.Reader) error {
hasValue, err := DecodeBool(r)
if err != nil {
return err
}
if !hasValue {
return nil
}
rv := reflect.New(ret.Type().Elem())
if err := decodeToElem(rv.Elem(), r); err != nil {
return err
}
ret.Set(rv)
return nil
}
}
// mapEncoder reflectively encodes a map or array type using the beam map encoding.
func mapEncoder(rt reflect.Type, encodeKey, encodeValue func(reflect.Value, io.Writer) error) func(reflect.Value, io.Writer) error {
return func(rv reflect.Value, w io.Writer) error {
size := rv.Len()
if err := EncodeInt32((int32)(size), w); err != nil {
return err
}
iter := rv.MapRange()
for iter.Next() {
if err := encodeKey(iter.Key(), w); err != nil {
return err
}
if err := encodeValue(iter.Value(), w); err != nil {
return err
}
}
return nil
}
}
// containerNilEncoder handles when a value is nillable for map or iterable components.
// Nillable types have an extra byte prefixing them indicating nil status.
func containerNilEncoder(encodeElem func(reflect.Value, io.Writer) error) func(reflect.Value, io.Writer) error {
return func(rv reflect.Value, w io.Writer) error {
if rv.IsNil() {
return EncodeBool(false, w)
}
if err := EncodeBool(true, w); err != nil {
return err
}
return encodeElem(rv.Elem(), w)
}
}