| // Code generated by column_writer_types.gen.go.tmpl. DO NOT EDIT. |
| |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package file |
| |
| import ( |
| "fmt" |
| |
| "github.com/apache/arrow/go/v9/parquet" |
| "github.com/apache/arrow/go/v9/parquet/internal/encoding" |
| format "github.com/apache/arrow/go/v9/parquet/internal/gen-go/parquet" |
| "github.com/apache/arrow/go/v9/parquet/metadata" |
| "golang.org/x/xerrors" |
| ) |
| |
| // Int32ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Int32 columns. |
| type Int32ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewInt32ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewInt32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int32ColumnChunkWriter { |
| ret := &Int32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *Int32ColumnChunkWriter) WriteBatch(values []int32, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []int32 |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *Int32ColumnChunkWriter) WriteBatchSpaced(values []int32, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []int32 |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *Int32ColumnChunkWriter) writeValues(values []int32, numNulls int64) { |
| w.currentEncoder.(encoding.Int32Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.Int32Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *Int32ColumnChunkWriter) writeValuesSpaced(spacedValues []int32, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.Int32Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.Int32Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.Int32Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *Int32ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // Int64ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Int64 columns. |
| type Int64ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewInt64ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewInt64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int64ColumnChunkWriter { |
| ret := &Int64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *Int64ColumnChunkWriter) WriteBatch(values []int64, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []int64 |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *Int64ColumnChunkWriter) WriteBatchSpaced(values []int64, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []int64 |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *Int64ColumnChunkWriter) writeValues(values []int64, numNulls int64) { |
| w.currentEncoder.(encoding.Int64Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.Int64Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *Int64ColumnChunkWriter) writeValuesSpaced(spacedValues []int64, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.Int64Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.Int64Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.Int64Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *Int64ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // Int96ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Int96 columns. |
| type Int96ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewInt96ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewInt96ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int96ColumnChunkWriter { |
| ret := &Int96ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *Int96ColumnChunkWriter) WriteBatch(values []parquet.Int96, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []parquet.Int96 |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *Int96ColumnChunkWriter) WriteBatchSpaced(values []parquet.Int96, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []parquet.Int96 |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *Int96ColumnChunkWriter) writeValues(values []parquet.Int96, numNulls int64) { |
| w.currentEncoder.(encoding.Int96Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.Int96Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *Int96ColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.Int96, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.Int96Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.Int96Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.Int96Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *Int96ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // Float32ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Float32 columns. |
| type Float32ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewFloat32ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewFloat32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float32ColumnChunkWriter { |
| ret := &Float32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *Float32ColumnChunkWriter) WriteBatch(values []float32, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []float32 |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *Float32ColumnChunkWriter) WriteBatchSpaced(values []float32, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []float32 |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *Float32ColumnChunkWriter) writeValues(values []float32, numNulls int64) { |
| w.currentEncoder.(encoding.Float32Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.Float32Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *Float32ColumnChunkWriter) writeValuesSpaced(spacedValues []float32, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.Float32Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.Float32Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.Float32Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *Float32ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // Float64ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Float64 columns. |
| type Float64ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewFloat64ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewFloat64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float64ColumnChunkWriter { |
| ret := &Float64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *Float64ColumnChunkWriter) WriteBatch(values []float64, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []float64 |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *Float64ColumnChunkWriter) WriteBatchSpaced(values []float64, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []float64 |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *Float64ColumnChunkWriter) writeValues(values []float64, numNulls int64) { |
| w.currentEncoder.(encoding.Float64Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.Float64Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *Float64ColumnChunkWriter) writeValuesSpaced(spacedValues []float64, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.Float64Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.Float64Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.Float64Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *Float64ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // BooleanColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for Boolean columns. |
| type BooleanColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewBooleanColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewBooleanColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *BooleanColumnChunkWriter { |
| if useDict { |
| panic("cannot use dictionary for boolean writer") |
| } |
| ret := &BooleanColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *BooleanColumnChunkWriter) WriteBatch(values []bool, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []bool |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *BooleanColumnChunkWriter) WriteBatchSpaced(values []bool, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []bool |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *BooleanColumnChunkWriter) writeValues(values []bool, numNulls int64) { |
| w.currentEncoder.(encoding.BooleanEncoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.BooleanStatistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *BooleanColumnChunkWriter) writeValuesSpaced(spacedValues []bool, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.BooleanEncoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.BooleanEncoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.BooleanStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *BooleanColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // ByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for ByteArray columns. |
| type ByteArrayColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewByteArrayColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *ByteArrayColumnChunkWriter { |
| ret := &ByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *ByteArrayColumnChunkWriter) WriteBatch(values []parquet.ByteArray, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []parquet.ByteArray |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *ByteArrayColumnChunkWriter) WriteBatchSpaced(values []parquet.ByteArray, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []parquet.ByteArray |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *ByteArrayColumnChunkWriter) writeValues(values []parquet.ByteArray, numNulls int64) { |
| w.currentEncoder.(encoding.ByteArrayEncoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.ByteArrayStatistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *ByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.ByteArray, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.ByteArrayEncoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.ByteArrayEncoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.ByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *ByteArrayColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // FixedLenByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for FixedLenByteArray columns. |
| type FixedLenByteArrayColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // NewFixedLenByteArrayColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func NewFixedLenByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *FixedLenByteArrayColumnChunkWriter { |
| ret := &FixedLenByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *FixedLenByteArrayColumnChunkWriter) WriteBatch(values []parquet.FixedLenByteArray, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []parquet.FixedLenByteArray |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch-toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *FixedLenByteArrayColumnChunkWriter) WriteBatchSpaced(values []parquet.FixedLenByteArray, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []parquet.FixedLenByteArray |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset : valueOffset+info.numSpaced()] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals, info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values []parquet.FixedLenByteArray, numNulls int64) { |
| w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues []parquet.FixedLenByteArray, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.FixedLenByteArrayEncoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *FixedLenByteArrayColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| |
| // NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder |
| // and writer properties to determine the correct type of column writer to construct and whether |
| // or not to use dictionary encoding. |
| func NewColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, props *parquet.WriterProperties) ColumnChunkWriter { |
| descr := meta.Descr() |
| useDict := props.DictionaryEnabledFor(descr.Path()) && descr.PhysicalType() != parquet.Types.Boolean && descr.PhysicalType() != parquet.Types.Int96 |
| enc := props.EncodingFor(descr.Path()) |
| if useDict { |
| enc = props.DictionaryIndexEncoding() |
| } |
| |
| switch descr.PhysicalType() { |
| case parquet.Types.Int32: |
| return NewInt32ColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.Int64: |
| return NewInt64ColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.Int96: |
| return NewInt96ColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.Float: |
| return NewFloat32ColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.Double: |
| return NewFloat64ColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.Boolean: |
| return NewBooleanColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.ByteArray: |
| return NewByteArrayColumnChunkWriter(meta, pager, useDict, enc, props) |
| case parquet.Types.FixedLenByteArray: |
| return NewFixedLenByteArrayColumnChunkWriter(meta, pager, useDict, enc, props) |
| default: |
| panic("unimplemented") |
| } |
| } |