| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package file |
| |
| import ( |
| "fmt" |
| |
| "github.com/apache/arrow/go/v9/parquet" |
| "github.com/apache/arrow/go/v9/parquet/metadata" |
| "github.com/apache/arrow/go/v9/parquet/internal/encoding" |
| format "github.com/apache/arrow/go/v9/parquet/internal/gen-go/parquet" |
| ) |
| |
| {{range .In}} |
| // {{.Name}}ColumnChunkWriter is the typed interface for writing columns to a parquet |
| // file for {{.Name}} columns. |
| type {{.Name}}ColumnChunkWriter struct { |
| columnWriter |
| } |
| |
| // New{{.Name}}ColumnChunkWriter constructs a new column writer using the given metadata chunk builder |
| // provided Pager, and desired encoding and properties. |
| // |
| // This will likely not be often called directly by consumers but rather used internally. |
| // |
| // ColumnChunkWriters should be acquired by using fileWriter and RowGroupWriter objects |
| func New{{.Name}}ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *{{.Name}}ColumnChunkWriter { |
| {{- if eq .Name "Boolean"}} |
| if useDict { |
| panic("cannot use dictionary for boolean writer") |
| } |
| |
| {{- end}} |
| ret := &{{.Name}}ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} |
| ret.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) |
| return ret |
| } |
| |
| |
| // WriteBatch writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level |
| // (resp. max repetition level) is 0. |
| // If not null, each of `def_levels` and `rep_levels` must have at least |
| // `len(values)`. |
| // |
| // The number of physical values written (taken from `values`) is returned. |
| // It can be smaller than `len(values)` is there are some undefined values. |
| // |
| // When using DataPageV2 to write a repeated column rows cannot cross data |
| // page boundaries. To ensure this the writer ensures that every batch of |
| // w.props.BatchSize begins and ends on a row boundary. As a consequence, |
| // the first value to WriteBatch must always be the beginning of a row if |
| // repLevels is not nil (repLevels[0] should always be 0) and using DataPageV2. |
| func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values []{{.name}}, defLevels, repLevels []int16) (valueOffset int64, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| switch r := r.(type) { |
| case string: |
| err = xerrors.New(r) |
| case error: |
| err = r |
| default: |
| err = fmt.Errorf("unknown error type: %s", r) |
| } |
| } |
| }() |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be much above the limit. |
| // The purpose of this chunking is to bound this. Even if a user writes large number |
| // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| // pagesize limit |
| var n int64 |
| switch { |
| case defLevels != nil: |
| n = int64(len(defLevels)) |
| case values != nil: |
| n = int64(len(values)) |
| } |
| w.doBatches(n, repLevels, func(offset, batch int64) { |
| var vals []{{.name}} |
| |
| toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset:valueOffset+toWrite] |
| } |
| |
| w.writeValues(vals, batch - toWrite) |
| if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil { |
| panic(err) |
| } |
| |
| valueOffset += toWrite |
| w.checkDictionarySizeLimit() |
| }) |
| return |
| } |
| |
| // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the |
| // column. |
| // |
| // In comparison to WriteBatch the length of repetition and definition levels |
| // is the same as of the number of values read for max_definition_level == 1. |
| // In the case of max_definition_level > 1, the repetition and definition |
| // levels are larger than the values but the values include the null entries |
| // with definition_level == (max_definition_level - 1). Thus we have to differentiate |
| // in the parameters of this function if the input has the length of num_values or the |
| // _number of rows in the lowest nesting level_. |
| // |
| // In the case that the most inner node in the Parquet is required, the _number of rows |
| // in the lowest nesting level_ is equal to the number of non-null values. If the |
| // inner-most schema node is optional, the _number of rows in the lowest nesting level_ |
| // also includes all values with definition_level == (max_definition_level - 1). |
| func (w *{{.Name}}ColumnChunkWriter) WriteBatchSpaced(values []{{.name}}, defLevels, repLevels []int16, validBits []byte, validBitsOffset int64) { |
| valueOffset := int64(0) |
| length := len(defLevels) |
| if defLevels == nil { |
| length = len(values) |
| } |
| doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { |
| var vals []{{.name}} |
| info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) |
| |
| w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) |
| if values != nil { |
| vals = values[valueOffset:] |
| } |
| |
| if w.bitsBuffer != nil { |
| w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum, w.bitsBuffer.Bytes(), 0) |
| } else { |
| w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum, validBits, validBitsOffset+valueOffset) |
| } |
| w.commitWriteAndCheckPageLimit(batch, info.numSpaced()) |
| valueOffset += info.numSpaced() |
| |
| w.checkDictionarySizeLimit() |
| }) |
| } |
| |
| |
| func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls int64) { |
| w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values) |
| if w.pageStatistics != nil { |
| w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls) |
| } |
| } |
| |
| func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues []{{.name}}, numValues int64, validBits []byte, validBitsOffset int64) { |
| if len(spacedValues) != int(numValues) { |
| w.currentEncoder.(encoding.{{.Name}}Encoder).PutSpaced(spacedValues, validBits, validBitsOffset) |
| } else { |
| w.currentEncoder.(encoding.{{.Name}}Encoder).Put(spacedValues) |
| } |
| if w.pageStatistics != nil { |
| nulls := int64(len(spacedValues)) - numValues |
| w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) |
| } |
| } |
| |
| func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() { |
| if !w.hasDict || w.fallbackToNonDict { |
| return |
| } |
| |
| if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { |
| w.fallbackToPlain() |
| } |
| } |
| |
| func (w *{{.Name}}ColumnChunkWriter) fallbackToPlain() { |
| if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { |
| w.WriteDictionaryPage() |
| w.FlushBufferedDataPages() |
| w.fallbackToNonDict = true |
| w.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) |
| w.encoding = parquet.Encodings.Plain |
| } |
| } |
| {{end}} |
| |
| // NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder |
| // and writer properties to determine the correct type of column writer to construct and whether |
| // or not to use dictionary encoding. |
| func NewColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, props *parquet.WriterProperties) ColumnChunkWriter { |
| descr := meta.Descr() |
| useDict := props.DictionaryEnabledFor(descr.Path()) && descr.PhysicalType() != parquet.Types.Boolean && descr.PhysicalType() != parquet.Types.Int96 |
| enc := props.EncodingFor(descr.Path()) |
| if useDict { |
| enc = props.DictionaryIndexEncoding() |
| } |
| |
| switch descr.PhysicalType() { |
| {{- range .In}} |
| case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: |
| return New{{.Name}}ColumnChunkWriter(meta, pager, useDict, enc, props) |
| {{- end}} |
| default: |
| panic("unimplemented") |
| } |
| } |