blob: fca500e626f684407c10dd644c727e6717e91091 [file] [log] [blame]
// Code generated by column_writer_types.gen.go.tmpl. DO NOT EDIT.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package file
import (
"fmt"
"github.com/apache/arrow/go/v9/parquet"
"github.com/apache/arrow/go/v9/parquet/internal/encoding"
format "github.com/apache/arrow/go/v9/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v9/parquet/metadata"
"golang.org/x/xerrors"
)
// Int32ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Int32 columns.
type Int32ColumnChunkWriter struct {
columnWriter
}
// NewInt32ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewInt32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int32ColumnChunkWriter {
ret := &Int32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *Int32ColumnChunkWriter) WriteBatch(values []int32, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []int32
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *Int32ColumnChunkWriter) WriteBatchSpaced(values []int32, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []int32
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *Int32ColumnChunkWriter) writeValues(values []int32, numNulls int64) {
w.currentEncoder.(encoding.Int32Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int32Statistics).Update(values, numNulls)
}
}
func (w *Int32ColumnChunkWriter) writeValuesSpaced(spacedValues []int32, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.Int32Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.Int32Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.Int32Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *Int32ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// Int64ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Int64 columns.
type Int64ColumnChunkWriter struct {
columnWriter
}
// NewInt64ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewInt64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int64ColumnChunkWriter {
ret := &Int64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *Int64ColumnChunkWriter) WriteBatch(values []int64, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []int64
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *Int64ColumnChunkWriter) WriteBatchSpaced(values []int64, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []int64
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *Int64ColumnChunkWriter) writeValues(values []int64, numNulls int64) {
w.currentEncoder.(encoding.Int64Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int64Statistics).Update(values, numNulls)
}
}
func (w *Int64ColumnChunkWriter) writeValuesSpaced(spacedValues []int64, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.Int64Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.Int64Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.Int64Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *Int64ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// Int96ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Int96 columns.
type Int96ColumnChunkWriter struct {
columnWriter
}
// NewInt96ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewInt96ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int96ColumnChunkWriter {
ret := &Int96ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *Int96ColumnChunkWriter) WriteBatch(values []parquet.Int96, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []parquet.Int96
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *Int96ColumnChunkWriter) WriteBatchSpaced(values []parquet.Int96, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []parquet.Int96
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *Int96ColumnChunkWriter) writeValues(values []parquet.Int96, numNulls int64) {
w.currentEncoder.(encoding.Int96Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int96Statistics).Update(values, numNulls)
}
}
func (w *Int96ColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.Int96, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.Int96Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.Int96Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.Int96Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *Int96ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// Float32ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Float32 columns.
type Float32ColumnChunkWriter struct {
columnWriter
}
// NewFloat32ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewFloat32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float32ColumnChunkWriter {
ret := &Float32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *Float32ColumnChunkWriter) WriteBatch(values []float32, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []float32
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *Float32ColumnChunkWriter) WriteBatchSpaced(values []float32, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []float32
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *Float32ColumnChunkWriter) writeValues(values []float32, numNulls int64) {
w.currentEncoder.(encoding.Float32Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Float32Statistics).Update(values, numNulls)
}
}
func (w *Float32ColumnChunkWriter) writeValuesSpaced(spacedValues []float32, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.Float32Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.Float32Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.Float32Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *Float32ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// Float64ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Float64 columns.
type Float64ColumnChunkWriter struct {
columnWriter
}
// NewFloat64ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewFloat64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float64ColumnChunkWriter {
ret := &Float64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *Float64ColumnChunkWriter) WriteBatch(values []float64, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []float64
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *Float64ColumnChunkWriter) WriteBatchSpaced(values []float64, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []float64
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *Float64ColumnChunkWriter) writeValues(values []float64, numNulls int64) {
w.currentEncoder.(encoding.Float64Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Float64Statistics).Update(values, numNulls)
}
}
func (w *Float64ColumnChunkWriter) writeValuesSpaced(spacedValues []float64, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.Float64Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.Float64Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.Float64Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *Float64ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// BooleanColumnChunkWriter is the typed interface for writing columns to a parquet
// file for Boolean columns.
type BooleanColumnChunkWriter struct {
columnWriter
}
// NewBooleanColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewBooleanColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *BooleanColumnChunkWriter {
if useDict {
panic("cannot use dictionary for boolean writer")
}
ret := &BooleanColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *BooleanColumnChunkWriter) WriteBatch(values []bool, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []bool
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *BooleanColumnChunkWriter) WriteBatchSpaced(values []bool, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []bool
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *BooleanColumnChunkWriter) writeValues(values []bool, numNulls int64) {
w.currentEncoder.(encoding.BooleanEncoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.BooleanStatistics).Update(values, numNulls)
}
}
func (w *BooleanColumnChunkWriter) writeValuesSpaced(spacedValues []bool, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.BooleanEncoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.BooleanEncoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.BooleanStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *BooleanColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// ByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet
// file for ByteArray columns.
type ByteArrayColumnChunkWriter struct {
columnWriter
}
// NewByteArrayColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *ByteArrayColumnChunkWriter {
ret := &ByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *ByteArrayColumnChunkWriter) WriteBatch(values []parquet.ByteArray, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []parquet.ByteArray
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *ByteArrayColumnChunkWriter) WriteBatchSpaced(values []parquet.ByteArray, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []parquet.ByteArray
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *ByteArrayColumnChunkWriter) writeValues(values []parquet.ByteArray, numNulls int64) {
w.currentEncoder.(encoding.ByteArrayEncoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.ByteArrayStatistics).Update(values, numNulls)
}
}
func (w *ByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.ByteArray, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.ByteArrayEncoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.ByteArrayEncoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.ByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *ByteArrayColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// FixedLenByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet
// file for FixedLenByteArray columns.
type FixedLenByteArrayColumnChunkWriter struct {
columnWriter
}
// NewFixedLenByteArrayColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func NewFixedLenByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *FixedLenByteArrayColumnChunkWriter {
ret := &FixedLenByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *FixedLenByteArrayColumnChunkWriter) WriteBatch(values []parquet.FixedLenByteArray, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []parquet.FixedLenByteArray
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *FixedLenByteArrayColumnChunkWriter) WriteBatchSpaced(values []parquet.FixedLenByteArray, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []parquet.FixedLenByteArray
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset : valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values []parquet.FixedLenByteArray, numNulls int64) {
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values, numNulls)
}
}
func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.FixedLenByteArray, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *FixedLenByteArrayColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
// NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder
// and writer properties to determine the correct type of column writer to construct and whether
// or not to use dictionary encoding.
func NewColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, props *parquet.WriterProperties) ColumnChunkWriter {
descr := meta.Descr()
useDict := props.DictionaryEnabledFor(descr.Path()) && descr.PhysicalType() != parquet.Types.Boolean && descr.PhysicalType() != parquet.Types.Int96
enc := props.EncodingFor(descr.Path())
if useDict {
enc = props.DictionaryIndexEncoding()
}
switch descr.PhysicalType() {
case parquet.Types.Int32:
return NewInt32ColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.Int64:
return NewInt64ColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.Int96:
return NewInt96ColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.Float:
return NewFloat32ColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.Double:
return NewFloat64ColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.Boolean:
return NewBooleanColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.ByteArray:
return NewByteArrayColumnChunkWriter(meta, pager, useDict, enc, props)
case parquet.Types.FixedLenByteArray:
return NewFixedLenByteArrayColumnChunkWriter(meta, pager, useDict, enc, props)
default:
panic("unimplemented")
}
}