blob: e3e9c4c6cfb8887022fd054004081243d26cf748 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ipc // import "github.com/apache/arrow/go/v6/arrow/ipc"
import (
"bytes"
"encoding/binary"
"io"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/array"
"github.com/apache/arrow/go/v6/arrow/bitutil"
"github.com/apache/arrow/go/v6/arrow/internal/flatbuf"
"github.com/apache/arrow/go/v6/arrow/memory"
"golang.org/x/xerrors"
)
// FileReader is an Arrow file reader.
type FileReader struct {
r ReadAtSeeker
footer struct {
offset int64
buffer *memory.Buffer
data *flatbuf.Footer
}
fields dictTypeMap
memo dictMemo
schema *arrow.Schema
record array.Record
irec int // current record index. used for the arrio.Reader interface
err error // last error
}
// NewFileReader opens an Arrow file using the provided reader r.
func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
var (
cfg = newConfig(opts...)
err error
f = FileReader{
r: r,
fields: make(dictTypeMap),
memo: newMemo(),
}
)
if cfg.footer.offset <= 0 {
cfg.footer.offset, err = f.r.Seek(0, io.SeekEnd)
if err != nil {
return nil, xerrors.Errorf("arrow/ipc: could retrieve footer offset: %w", err)
}
}
f.footer.offset = cfg.footer.offset
err = f.readFooter()
if err != nil {
return nil, xerrors.Errorf("arrow/ipc: could not decode footer: %w", err)
}
err = f.readSchema()
if err != nil {
return nil, xerrors.Errorf("arrow/ipc: could not decode schema: %w", err)
}
if cfg.schema != nil && !cfg.schema.Equal(f.schema) {
return nil, xerrors.Errorf("arrow/ipc: inconsistent schema for reading (got: %v, want: %v)", f.schema, cfg.schema)
}
return &f, err
}
func (f *FileReader) readFooter() error {
var err error
if f.footer.offset <= int64(len(Magic)*2+4) {
return xerrors.Errorf("arrow/ipc: file too small (size=%d)", f.footer.offset)
}
eof := int64(len(Magic) + 4)
buf := make([]byte, eof)
n, err := f.r.ReadAt(buf, f.footer.offset-eof)
if err != nil {
return xerrors.Errorf("arrow/ipc: could not read footer: %w", err)
}
if n != len(buf) {
return xerrors.Errorf("arrow/ipc: could not read %d bytes from end of file", len(buf))
}
if !bytes.Equal(buf[4:], Magic) {
return errNotArrowFile
}
size := int64(binary.LittleEndian.Uint32(buf[:4]))
if size <= 0 || size+int64(len(Magic)*2+4) > f.footer.offset {
return errInconsistentFileMetadata
}
buf = make([]byte, size)
n, err = f.r.ReadAt(buf, f.footer.offset-size-eof)
if err != nil {
return xerrors.Errorf("arrow/ipc: could not read footer data: %w", err)
}
if n != len(buf) {
return xerrors.Errorf("arrow/ipc: could not read %d bytes from footer data", len(buf))
}
f.footer.buffer = memory.NewBufferBytes(buf)
f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
return err
}
func (f *FileReader) readSchema() error {
var err error
f.fields, err = dictTypesFromFB(f.footer.data.Schema(nil))
if err != nil {
return xerrors.Errorf("arrow/ipc: could not load dictionary types from file: %w", err)
}
for i := 0; i < f.NumDictionaries(); i++ {
blk, err := f.dict(i)
if err != nil {
return xerrors.Errorf("arrow/ipc: could read dictionary[%d]: %w", i, err)
}
switch {
case !bitutil.IsMultipleOf8(blk.Offset):
return xerrors.Errorf("arrow/ipc: invalid file offset=%d for dictionary %d", blk.Offset, i)
case !bitutil.IsMultipleOf8(int64(blk.Meta)):
return xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for dictionary %d", blk.Meta, i)
case !bitutil.IsMultipleOf8(blk.Body):
return xerrors.Errorf("arrow/ipc: invalid file body=%d position for dictionary %d", blk.Body, i)
}
msg, err := blk.NewMessage()
if err != nil {
return err
}
id, dict, err := readDictionary(msg.meta, f.fields, f.r)
msg.Release()
if err != nil {
return xerrors.Errorf("arrow/ipc: could not read dictionary %d from file: %w", i, err)
}
f.memo.Add(id, dict)
dict.Release() // memo.Add increases ref-count of dict.
}
schema := f.footer.data.Schema(nil)
if schema == nil {
return xerrors.Errorf("arrow/ipc: could not load schema from flatbuffer data")
}
f.schema, err = schemaFromFB(schema, &f.memo)
if err != nil {
return xerrors.Errorf("arrow/ipc: could not read schema: %w", err)
}
return err
}
func (f *FileReader) block(i int) (fileBlock, error) {
var blk flatbuf.Block
if !f.footer.data.RecordBatches(&blk, i) {
return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract file block %d", i)
}
return fileBlock{
Offset: blk.Offset(),
Meta: blk.MetaDataLength(),
Body: blk.BodyLength(),
r: f.r,
}, nil
}
func (f *FileReader) dict(i int) (fileBlock, error) {
var blk flatbuf.Block
if !f.footer.data.Dictionaries(&blk, i) {
return fileBlock{}, xerrors.Errorf("arrow/ipc: could not extract dictionary block %d", i)
}
return fileBlock{
Offset: blk.Offset(),
Meta: blk.MetaDataLength(),
Body: blk.BodyLength(),
r: f.r,
}, nil
}
func (f *FileReader) Schema() *arrow.Schema {
return f.schema
}
func (f *FileReader) NumDictionaries() int {
if f.footer.data == nil {
return 0
}
return f.footer.data.DictionariesLength()
}
func (f *FileReader) NumRecords() int {
return f.footer.data.RecordBatchesLength()
}
func (f *FileReader) Version() MetadataVersion {
return MetadataVersion(f.footer.data.Version())
}
// Close cleans up resources used by the File.
// Close does not close the underlying reader.
func (f *FileReader) Close() error {
if f.footer.data != nil {
f.footer.data = nil
}
if f.footer.buffer != nil {
f.footer.buffer.Release()
f.footer.buffer = nil
}
if f.record != nil {
f.record.Release()
f.record = nil
}
return nil
}
// Record returns the i-th record from the file.
// The returned value is valid until the next call to Record.
// Users need to call Retain on that Record to keep it valid for longer.
func (f *FileReader) Record(i int) (array.Record, error) {
record, err := f.RecordAt(i)
if err != nil {
return nil, err
}
if f.record != nil {
f.record.Release()
}
f.record = record
return record, nil
}
// Record returns the i-th record from the file. Ownership is transferred to the
// caller and must call Release() to free the memory. This method is safe to
// call concurrently.
func (f *FileReader) RecordAt(i int) (array.Record, error) {
if i < 0 || i > f.NumRecords() {
panic("arrow/ipc: record index out of bounds")
}
blk, err := f.block(i)
if err != nil {
return nil, err
}
switch {
case !bitutil.IsMultipleOf8(blk.Offset):
return nil, xerrors.Errorf("arrow/ipc: invalid file offset=%d for record %d", blk.Offset, i)
case !bitutil.IsMultipleOf8(int64(blk.Meta)):
return nil, xerrors.Errorf("arrow/ipc: invalid file metadata=%d position for record %d", blk.Meta, i)
case !bitutil.IsMultipleOf8(blk.Body):
return nil, xerrors.Errorf("arrow/ipc: invalid file body=%d position for record %d", blk.Body, i)
}
msg, err := blk.NewMessage()
if err != nil {
return nil, err
}
defer msg.Release()
if msg.Type() != MessageRecordBatch {
return nil, xerrors.Errorf("arrow/ipc: message %d is not a Record", i)
}
return newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes())), nil
}
// Read reads the current record from the underlying stream and an error, if any.
// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
//
// The returned record value is valid until the next call to Read.
// Users need to call Retain on that Record to keep it valid for longer.
func (f *FileReader) Read() (rec array.Record, err error) {
if f.irec == f.NumRecords() {
return nil, io.EOF
}
rec, f.err = f.Record(f.irec)
f.irec++
return rec, f.err
}
// ReadAt reads the i-th record from the underlying stream and an error, if any.
func (f *FileReader) ReadAt(i int64) (array.Record, error) {
return f.Record(int(i))
}
func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
md flatbuf.RecordBatch
codec decompressor
)
initFB(&md, msg.Header)
rows := md.Length()
bodyCompress := md.Compression(nil)
if bodyCompress != nil {
codec = getDecompressor(bodyCompress.Codec())
}
ctx := &arrayLoaderContext{
src: ipcSource{
meta: &md,
r: body,
codec: codec,
},
max: kMaxNestingDepth,
}
cols := make([]array.Interface, len(schema.Fields()))
for i, field := range schema.Fields() {
cols[i] = ctx.loadArray(field.Type)
}
return array.NewRecord(schema, cols, rows)
}
type ipcSource struct {
meta *flatbuf.RecordBatch
r ReadAtSeeker
codec decompressor
}
func (src *ipcSource) buffer(i int) *memory.Buffer {
var buf flatbuf.Buffer
if !src.meta.Buffers(&buf, i) {
panic("buffer index out of bound")
}
if buf.Length() == 0 {
return memory.NewBufferBytes(nil)
}
var raw []byte
if src.codec == nil {
raw = make([]byte, buf.Length())
_, err := src.r.ReadAt(raw, buf.Offset())
if err != nil {
panic(err)
}
} else {
sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length())
var uncompressedSize uint64
err := binary.Read(sr, binary.LittleEndian, &uncompressedSize)
if err != nil {
panic(err)
}
var r io.Reader = sr
// check for an uncompressed buffer
if int64(uncompressedSize) != -1 {
raw = make([]byte, uncompressedSize)
src.codec.Reset(sr)
r = src.codec
} else {
raw = make([]byte, buf.Length())
}
if _, err = io.ReadFull(r, raw); err != nil {
panic(err)
}
}
return memory.NewBufferBytes(raw)
}
func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode {
var node flatbuf.FieldNode
if !src.meta.Nodes(&node, i) {
panic("field metadata out of bound")
}
return &node
}
type arrayLoaderContext struct {
src ipcSource
ifield int
ibuffer int
max int
}
func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode {
field := ctx.src.fieldMetadata(ctx.ifield)
ctx.ifield++
return field
}
func (ctx *arrayLoaderContext) buffer() *memory.Buffer {
buf := ctx.src.buffer(ctx.ibuffer)
ctx.ibuffer++
return buf
}
func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
switch dt := dt.(type) {
case *arrow.NullType:
return ctx.loadNull()
case *arrow.BooleanType,
*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
*arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
*arrow.Decimal128Type,
*arrow.Time32Type, *arrow.Time64Type,
*arrow.TimestampType,
*arrow.Date32Type, *arrow.Date64Type,
*arrow.MonthIntervalType, *arrow.DayTimeIntervalType, *arrow.MonthDayNanoIntervalType,
*arrow.DurationType:
return ctx.loadPrimitive(dt)
case *arrow.BinaryType, *arrow.StringType:
return ctx.loadBinary(dt)
case *arrow.FixedSizeBinaryType:
return ctx.loadFixedSizeBinary(dt)
case *arrow.ListType:
return ctx.loadList(dt)
case *arrow.FixedSizeListType:
return ctx.loadFixedSizeList(dt)
case *arrow.StructType:
return ctx.loadStruct(dt)
case *arrow.MapType:
return ctx.loadMap(dt)
case arrow.ExtensionType:
storage := ctx.loadArray(dt.StorageType())
defer storage.Release()
return array.NewExtensionArrayWithStorage(dt, storage)
default:
panic(xerrors.Errorf("array type %T not handled yet", dt))
}
}
func (ctx *arrayLoaderContext) loadCommon(nbufs int) (*flatbuf.FieldNode, []*memory.Buffer) {
buffers := make([]*memory.Buffer, 0, nbufs)
field := ctx.field()
var buf *memory.Buffer
switch field.NullCount() {
case 0:
ctx.ibuffer++
default:
buf = ctx.buffer()
}
buffers = append(buffers, buf)
return field, buffers
}
func (ctx *arrayLoaderContext) loadChild(dt arrow.DataType) array.Interface {
if ctx.max == 0 {
panic("arrow/ipc: nested type limit reached")
}
ctx.max--
sub := ctx.loadArray(dt)
ctx.max++
return sub
}
func (ctx *arrayLoaderContext) loadNull() array.Interface {
field := ctx.field()
data := array.NewData(arrow.Null, int(field.Length()), nil, nil, int(field.NullCount()), 0)
defer data.Release()
return array.MakeFromData(data)
}
func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) array.Interface {
field, buffers := ctx.loadCommon(2)
switch field.Length() {
case 0:
buffers = append(buffers, nil)
ctx.ibuffer++
default:
buffers = append(buffers, ctx.buffer())
}
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
return array.MakeFromData(data)
}
func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) array.Interface {
field, buffers := ctx.loadCommon(3)
buffers = append(buffers, ctx.buffer(), ctx.buffer())
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
return array.MakeFromData(data)
}
func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
data := array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0)
defer data.Release()
return array.MakeFromData(data)
}
func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
sub := ctx.loadChild(dt.ValueType())
defer sub.Release()
data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
defer data.Release()
return array.NewMapData(data)
}
func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) array.Interface {
field, buffers := ctx.loadCommon(2)
buffers = append(buffers, ctx.buffer())
sub := ctx.loadChild(dt.Elem())
defer sub.Release()
data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
defer data.Release()
return array.NewListData(data)
}
func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) array.Interface {
field, buffers := ctx.loadCommon(1)
sub := ctx.loadChild(dt.Elem())
defer sub.Release()
data := array.NewData(dt, int(field.Length()), buffers, []*array.Data{sub.Data()}, int(field.NullCount()), 0)
defer data.Release()
return array.NewFixedSizeListData(data)
}
func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) array.Interface {
field, buffers := ctx.loadCommon(1)
arrs := make([]array.Interface, len(dt.Fields()))
subs := make([]*array.Data, len(dt.Fields()))
for i, f := range dt.Fields() {
arrs[i] = ctx.loadChild(f.Type)
subs[i] = arrs[i].Data()
}
defer func() {
for i := range arrs {
arrs[i].Release()
}
}()
data := array.NewData(dt, int(field.Length()), buffers, subs, int(field.NullCount()), 0)
defer data.Release()
return array.NewStructData(data)
}
func readDictionary(meta *memory.Buffer, types dictTypeMap, r ReadAtSeeker) (int64, array.Interface, error) {
// msg := flatbuf.GetRootAsMessage(meta.Bytes(), 0)
// var dictBatch flatbuf.DictionaryBatch
// initFB(&dictBatch, msg.Header)
//
// id := dictBatch.Id()
// v, ok := types[id]
// if !ok {
// return id, nil, errors.Errorf("arrow/ipc: no type metadata for dictionary with ID=%d", id)
// }
//
// fields := []arrow.Field{v}
//
// // we need a schema for the record batch.
// schema := arrow.NewSchema(fields, nil)
//
// // the dictionary is embedded in a record batch with a single column.
// recBatch := dictBatch.Data(nil)
//
// var (
// batchMeta *memory.Buffer
// body *memory.Buffer
// )
//
//
// ctx := &arrayLoaderContext{
// src: ipcSource{
// meta: &md,
// r: bytes.NewReader(body.Bytes()),
// },
// max: kMaxNestingDepth,
// }
//
// cols := make([]array.Interface, len(schema.Fields()))
// for i, field := range schema.Fields() {
// cols[i] = ctx.loadArray(field.Type)
// }
//
// batch := array.NewRecord(schema, cols, rows)
panic("not implemented")
}