| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package encoding |
| |
| import ( |
| "io" |
| "sync" |
| |
| "github.com/apache/arrow/go/v6/arrow/bitutil" |
| "github.com/apache/arrow/go/v6/arrow/memory" |
| "github.com/apache/arrow/go/v6/parquet" |
| "github.com/apache/arrow/go/v6/parquet/internal/utils" |
| "golang.org/x/xerrors" |
| ) |
| |
| // TypedDecoder is the general interface for all decoder types which can |
| // then be type asserted to a specific Type Decoder |
| type TypedDecoder interface { |
| // SetData updates the data in the decoder with the passed in byte slice and the |
| // stated number of values as expected to be decoded. |
| SetData(buffered int, buf []byte) error |
| // Encoding returns the encoding type that this decoder decodes data of |
| Encoding() parquet.Encoding |
| // ValuesLeft returns the number of remaining values to be decoded |
| ValuesLeft() int |
| // Type returns the physical type this can decode. |
| Type() parquet.Type |
| } |
| |
| // DictDecoder is a special TypedDecoder which implements dictionary decoding |
| type DictDecoder interface { |
| TypedDecoder |
| // SetDict takes in a decoder which can decode the dictionary index to be used |
| SetDict(TypedDecoder) |
| } |
| |
| // TypedEncoder is the general interface for all encoding types which |
| // can then be type asserted to a specific Type Encoder |
| type TypedEncoder interface { |
| // Bytes returns the current slice of bytes that have been encoded but does not pass ownership |
| Bytes() []byte |
| // Reset resets the encoder and dumps all the data to let it be reused. |
| Reset() |
| // ReserveForWrite reserves n bytes in the buffer so that the next n bytes written will not |
| // cause a memory allocation. |
| ReserveForWrite(n int) |
| // EstimatedDataEncodedSize returns the estimated number of bytes in the buffer |
| // so far. |
| EstimatedDataEncodedSize() int64 |
| // FlushValues finishes up any unwritten data and returns the buffer of data passing |
| // ownership to the caller, Release needs to be called on the Buffer to free the memory |
| // if error is nil |
| FlushValues() (Buffer, error) |
| // Encoding returns the type of encoding that this encoder operates with |
| Encoding() parquet.Encoding |
| // Allocator returns the allocator that was used when creating this encoder |
| Allocator() memory.Allocator |
| // Type returns the underlying physical type this encodes. |
| Type() parquet.Type |
| } |
| |
| // DictEncoder is a special kind of TypedEncoder which implements Dictionary |
| // encoding. |
| type DictEncoder interface { |
| TypedEncoder |
| // WriteIndices populates the byte slice with the final indexes of data and returns |
| // the number of bytes written |
| WriteIndices(out []byte) (int, error) |
| // DictEncodedSize returns the current size of the encoded dictionary index. |
| DictEncodedSize() int |
| // BitWidth returns the bitwidth needed to encode all of the index values based |
| // on the number of values in the dictionary index. |
| BitWidth() int |
| // WriteDict populates out with the dictionary index values, out should be sized to at least |
| // as many bytes as DictEncodedSize |
| WriteDict(out []byte) |
| // NumEntries returns the number of values currently in the dictionary index. |
| NumEntries() int |
| } |
| |
| var bufferPool = sync.Pool{ |
| New: func() interface{} { |
| return memory.NewResizableBuffer(memory.DefaultAllocator) |
| }, |
| } |
| |
| // Buffer is an interface used as a general interface for handling buffers |
| // regardless of the underlying implementation. |
| type Buffer interface { |
| Len() int |
| Buf() []byte |
| Bytes() []byte |
| Resize(int) |
| Release() |
| } |
| |
| // poolBuffer is a buffer that will release the allocated buffer to a pool |
| // of buffers when release is called in order to allow it to be reused to |
| // cut down on the number of allocations. |
| type poolBuffer struct { |
| buf *memory.Buffer |
| } |
| |
| func (p poolBuffer) Resize(n int) { p.buf.ResizeNoShrink(n) } |
| |
| func (p poolBuffer) Len() int { return p.buf.Len() } |
| |
| func (p poolBuffer) Bytes() []byte { return p.buf.Bytes() } |
| |
| func (p poolBuffer) Buf() []byte { return p.buf.Buf() } |
| |
| func (p poolBuffer) Release() { |
| if p.buf.Mutable() { |
| memory.Set(p.buf.Buf(), 0) |
| p.buf.ResizeNoShrink(0) |
| bufferPool.Put(p.buf) |
| return |
| } |
| |
| p.buf.Release() |
| } |
| |
| // PooledBufferWriter uses buffers from the buffer pool to back it while |
| // implementing io.Writer and io.WriterAt interfaces |
| type PooledBufferWriter struct { |
| buf *memory.Buffer |
| pos int |
| offset int |
| } |
| |
| // NewPooledBufferWriter returns a new buffer with 'initial' bytes reserved |
| // and pre-allocated to guarantee that writing that many more bytes will not |
| // require another allocation. |
| func NewPooledBufferWriter(initial int) *PooledBufferWriter { |
| ret := &PooledBufferWriter{} |
| ret.Reserve(initial) |
| return ret |
| } |
| |
| // SetOffset sets an offset in the buffer which will ensure that all references |
| // to offsets and sizes in the buffer will be offset by this many bytes, allowing |
| // the writer to reserve space in the buffer. |
| func (b *PooledBufferWriter) SetOffset(offset int) { |
| b.pos -= b.offset |
| b.offset = offset |
| b.pos += offset |
| } |
| |
| // Reserve pre-allocates nbytes to ensure that the next write of that many bytes |
| // will not require another allocation. |
| func (b *PooledBufferWriter) Reserve(nbytes int) { |
| if b.buf == nil { |
| b.buf = bufferPool.Get().(*memory.Buffer) |
| } |
| |
| newCap := utils.MaxInt(b.buf.Cap()+b.offset, 256) |
| for newCap < b.pos+nbytes { |
| newCap = bitutil.NextPowerOf2(newCap) |
| } |
| b.buf.Reserve(newCap) |
| } |
| |
| // Reset will release any current memory and initialize it with the new |
| // allocated bytes. |
| func (b *PooledBufferWriter) Reset(initial int) { |
| if b.buf != nil { |
| memory.Set(b.buf.Buf(), 0) |
| b.buf.ResizeNoShrink(0) |
| bufferPool.Put(b.buf) |
| b.buf = nil |
| } |
| |
| b.pos = 0 |
| b.offset = 0 |
| b.Reserve(initial) |
| } |
| |
| // Finish returns the current buffer, with the responsibility for releasing |
| // the memory on the caller, resetting this writer to be re-used |
| func (b *PooledBufferWriter) Finish() Buffer { |
| if b.buf.Len() < b.pos { |
| b.buf.ResizeNoShrink(b.pos) |
| } |
| buf := poolBuffer{b.buf} |
| |
| b.buf = nil |
| b.Reset(0) |
| return buf |
| } |
| |
| // WriteAt writes the bytes from p into this buffer starting at offset. |
| // |
| // Does not affect the internal position of the writer. |
| func (b *PooledBufferWriter) WriteAt(p []byte, offset int64) (n int, err error) { |
| if len(p) == 0 { |
| return 0, nil |
| } |
| offset += int64(b.offset) |
| need := int(offset) + len(p) |
| |
| if need >= b.buf.Cap() { |
| b.Reserve(need - b.pos) |
| } |
| n = copy(b.buf.Buf()[offset:], p) |
| |
| if need > b.buf.Len() { |
| b.buf.ResizeNoShrink(need) |
| } |
| return |
| } |
| |
| func (b *PooledBufferWriter) Write(buf []byte) (int, error) { |
| if len(buf) == 0 { |
| return 0, nil |
| } |
| b.Reserve(len(buf)) |
| return b.UnsafeWrite(buf) |
| } |
| |
| func (b *PooledBufferWriter) UnsafeWriteCopy(ncopies int, pattern []byte) (int, error) { |
| nbytes := len(pattern) * ncopies |
| slc := b.buf.Buf()[b.pos : b.pos+nbytes] |
| copy(slc, pattern) |
| for j := len(pattern); j < len(slc); j *= 2 { |
| copy(slc[j:], slc[:j]) |
| } |
| b.pos += nbytes |
| return nbytes, nil |
| } |
| |
| // UnsafeWrite does not check the capacity / length before writing. |
| func (b *PooledBufferWriter) UnsafeWrite(buf []byte) (n int, err error) { |
| n = copy(b.buf.Buf()[b.pos:], buf) |
| b.pos += n |
| return |
| } |
| |
| func (b *PooledBufferWriter) Tell() int64 { |
| return int64(b.pos) |
| } |
| |
| // Bytes returns the current bytes slice of slice Len |
| func (b *PooledBufferWriter) Bytes() []byte { |
| if b.buf.Len() < b.pos { |
| b.buf.ResizeNoShrink(b.pos) |
| } |
| return b.buf.Bytes()[b.offset:] |
| } |
| |
| // Len provides the current Length of the byte slice |
| func (b *PooledBufferWriter) Len() int { |
| if b.buf.Len() < b.pos { |
| b.buf.ResizeNoShrink(b.pos) |
| } |
| return b.buf.Len() - b.offset |
| } |
| |
| // BufferWriter is a utility class for building and writing to a memory.Buffer |
| // with a given allocator that fulfills the interfaces io.Write, io.WriteAt |
| // and io.Seeker, while providing the ability to pre-allocate memory. |
| type BufferWriter struct { |
| buffer *memory.Buffer |
| pos int |
| mem memory.Allocator |
| |
| offset int |
| } |
| |
| // NewBufferWriterFromBuffer wraps the provided buffer to allow it to fulfill these |
| // interfaces. |
| func NewBufferWriterFromBuffer(b *memory.Buffer, mem memory.Allocator) *BufferWriter { |
| return &BufferWriter{b, 0, mem, 0} |
| } |
| |
| // NewBufferWriter constructs a buffer with initially reserved/allocated memory. |
| func NewBufferWriter(initial int, mem memory.Allocator) *BufferWriter { |
| buf := memory.NewResizableBuffer(mem) |
| buf.Reserve(initial) |
| return &BufferWriter{buffer: buf, mem: mem} |
| } |
| |
| func (b *BufferWriter) SetOffset(offset int) { |
| b.offset = offset |
| } |
| |
| // Bytes returns the current bytes slice of slice Len |
| func (b *BufferWriter) Bytes() []byte { |
| return b.buffer.Bytes()[b.offset:] |
| } |
| |
| // Len provides the current Length of the byte slice |
| func (b *BufferWriter) Len() int { |
| return b.buffer.Len() - b.offset |
| } |
| |
| // Cap returns the current capacity of the underlying buffer |
| func (b *BufferWriter) Cap() int { |
| return b.buffer.Cap() - b.offset |
| } |
| |
| // Finish returns the current buffer, with the responsibility for releasing |
| // the memory on the caller, resetting this writer to be re-used |
| func (b *BufferWriter) Finish() *memory.Buffer { |
| buf := b.buffer |
| b.buffer = nil |
| b.Reset(0) |
| return buf |
| } |
| |
| func (b *BufferWriter) Truncate() { |
| b.pos = 0 |
| b.offset = 0 |
| |
| if b.buffer == nil { |
| b.Reserve(1024) |
| } else { |
| b.buffer.ResizeNoShrink(0) |
| } |
| } |
| |
| // Reset will release any current memory and initialize it with the new |
| // allocated bytes. |
| func (b *BufferWriter) Reset(initial int) { |
| if b.buffer != nil { |
| b.buffer.Release() |
| } |
| |
| b.pos = 0 |
| b.offset = 0 |
| b.Reserve(initial) |
| } |
| |
| // Reserve ensures that there is at least enough capacity to write nbytes |
| // without another allocation, may allocate more than that in order to |
| // efficiently reduce allocations |
| func (b *BufferWriter) Reserve(nbytes int) { |
| if b.buffer == nil { |
| b.buffer = memory.NewResizableBuffer(b.mem) |
| } |
| newCap := utils.MaxInt(b.buffer.Cap()+b.offset, 256) |
| for newCap < b.pos+nbytes+b.offset { |
| newCap = bitutil.NextPowerOf2(newCap) |
| } |
| b.buffer.Reserve(newCap) |
| } |
| |
| // WriteAt writes the bytes from p into this buffer starting at offset. |
| // |
| // Does not affect the internal position of the writer. |
| func (b *BufferWriter) WriteAt(p []byte, offset int64) (n int, err error) { |
| if len(p) == 0 { |
| return 0, nil |
| } |
| offset += int64(b.offset) |
| need := int(offset) + len(p) |
| |
| if need >= b.buffer.Cap() { |
| b.Reserve(need - b.pos) |
| } |
| copy(b.buffer.Buf()[offset:], p) |
| |
| if need > b.buffer.Len() { |
| b.buffer.ResizeNoShrink(need) |
| } |
| return len(p), nil |
| } |
| |
| func (b *BufferWriter) Write(buf []byte) (int, error) { |
| if len(buf) == 0 { |
| return 0, nil |
| } |
| if b.buffer == nil { |
| b.Reserve(len(buf)) |
| } |
| |
| if b.pos+b.offset+len(buf) >= b.buffer.Cap() { |
| b.Reserve(len(buf)) |
| } |
| return b.UnsafeWrite(buf) |
| } |
| |
| func (b *BufferWriter) UnsafeWriteCopy(ncopies int, pattern []byte) (int, error) { |
| nbytes := len(pattern) * ncopies |
| slc := b.buffer.Buf()[b.pos : b.pos+nbytes] |
| copy(slc, pattern) |
| for j := len(pattern); j < len(slc); j *= 2 { |
| copy(slc[j:], slc[:j]) |
| } |
| b.pos += nbytes |
| b.buffer.ResizeNoShrink(b.pos) |
| return nbytes, nil |
| } |
| |
| // UnsafeWrite does not check the capacity / length before writing. |
| func (b *BufferWriter) UnsafeWrite(buf []byte) (int, error) { |
| copy(b.buffer.Buf()[b.pos+b.offset:], buf) |
| b.pos += len(buf) |
| b.buffer.ResizeNoShrink(b.pos) |
| return len(buf), nil |
| } |
| |
| // Seek fulfills the io.Seeker interface returning it's new position |
| // whence must be io.SeekStart, io.SeekCurrent or io.SeekEnd or it will be ignored. |
| func (b *BufferWriter) Seek(offset int64, whence int) (int64, error) { |
| newPos, offs := 0, int(offset) |
| offs += b.offset |
| switch whence { |
| case io.SeekStart: |
| newPos = offs |
| case io.SeekCurrent: |
| newPos = b.pos + offs |
| case io.SeekEnd: |
| newPos = b.buffer.Len() + offs |
| } |
| if newPos < 0 { |
| return 0, xerrors.New("negative result pos") |
| } |
| b.pos = newPos |
| return int64(newPos), nil |
| } |
| |
| func (b *BufferWriter) Tell() int64 { |
| return int64(b.pos) |
| } |