blob: 3f4a9b5a6b72a8ffee8e435edbeea67aaec0502a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package file
import (
"fmt"
"github.com/apache/arrow/go/v9/parquet"
"github.com/apache/arrow/go/v9/parquet/metadata"
"github.com/apache/arrow/go/v9/parquet/internal/encoding"
format "github.com/apache/arrow/go/v9/parquet/internal/gen-go/parquet"
)
{{range .In}}
// {{.Name}}ColumnChunkWriter is the typed interface for writing columns to a parquet
// file for {{.Name}} columns.
type {{.Name}}ColumnChunkWriter struct {
columnWriter
}
// New{{.Name}}ColumnChunkWriter constructs a new column writer using the given metadata chunk builder
// provided Pager, and desired encoding and properties.
//
// This will likely not be often called directly by consumers but rather used internally.
//
// ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects
func New{{.Name}}ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *{{.Name}}ColumnChunkWriter {
{{- if eq .Name "Boolean"}}
if useDict {
panic("cannot use dictionary for boolean writer")
}
{{- end}}
ret := &{{.Name}}ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)}
ret.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator())
return ret
}
// WriteBatch writes a batch of repetition levels, definition levels, and values to the
// column.
// `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
// (resp. max repetition level) is 0.
// If not null, each of `def_levels` and `rep_levels` must have at least
// `len(values)`.
//
// The number of physical values written (taken from `values`) is returned.
// It can be smaller than `len(values)` is there are some undefined values.
//
// When using DataPageV2 to write a repeated column rows cannot cross data
// page boundaries. To ensure this the writer ensures that every batch of
// w.props.BatchSize begins and ends on a row boundary. As a consequence,
// the first value to WriteBatch must always be the beginning of a row if
// repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2.
func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values []{{.name}}, defLevels, repLevels []int16) (valueOffset int64, err error) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case string:
err = xerrors.New(r)
case error:
err = r
default:
err = fmt.Errorf("unknown error type: %s", r)
}
}
}()
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
var n int64
switch {
case defLevels != nil:
n = int64(len(defLevels))
case values != nil:
n = int64(len(values))
}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []{{.name}}
toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset:valueOffset+toWrite]
}
w.writeValues(vals, batch - toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
panic(err)
}
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the
// column.
//
// In comparison to WriteBatch the length of repetition and definition levels
// is the same as of the number of values read for max_definition_level == 1.
// In the case of max_definition_level > 1, the repetition and definition
// levels are larger than the values but the values include the null entries
// with definition_level == (max_definition_level - 1). Thus we have to differentiate
// in the parameters of this function if the input has the length of num_values or the
// _number of rows in the lowest nesting level_.
//
// In the case that the most inner node in the Parquet is required, the _number of rows
// in the lowest nesting level_ is equal to the number of non-null values. If the
// inner-most schema node is optional, the _number of rows in the lowest nesting level_
// also includes all values with definition_level == (max_definition_level - 1).
func (w *{{.Name}}ColumnChunkWriter) WriteBatchSpaced(values []{{.name}}, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) {
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) {
var vals []{{.name}}
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
vals = values[valueOffset:]
}
if w.bitsBuffer != nil {
w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum, validBits, validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
})
}
func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls int64) {
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
}
}
func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues []{{.name}}, numValues int64, validBits []byte, validBitsOffset int64) {
if len(spacedValues) != int(numValues) {
w.currentEncoder.(encoding.{{.Name}}Encoder).PutSpaced(spacedValues, validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := int64(len(spacedValues)) - numValues
w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls)
}
}
func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() {
if !w.hasDict || w.fallbackToNonDict {
return
}
if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.fallbackToPlain()
}
}
func (w *{{.Name}}ColumnChunkWriter) fallbackToPlain() {
if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
w.WriteDictionaryPage()
w.FlushBufferedDataPages()
w.fallbackToNonDict = true
w.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem)
w.encoding = parquet.Encodings.Plain
}
}
{{end}}
// NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder
// and writer properties to determine the correct type of column writer to construct and whether
// or not to use dictionary encoding.
func NewColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, props *parquet.WriterProperties) ColumnChunkWriter {
descr := meta.Descr()
useDict := props.DictionaryEnabledFor(descr.Path()) && descr.PhysicalType() != parquet.Types.Boolean && descr.PhysicalType() != parquet.Types.Int96
enc := props.EncodingFor(descr.Path())
if useDict {
enc = props.DictionaryIndexEncoding()
}
switch descr.PhysicalType() {
{{- range .In}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
return New{{.Name}}ColumnChunkWriter(meta, pager, useDict, enc, props)
{{- end}}
default:
panic("unimplemented")
}
}