blob: ef11454a863481cafd29e18d03b9112415803fa9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parquet
import (
"github.com/apache/arrow/go/arrow/memory"
"github.com/apache/arrow/go/parquet/compress"
)
// Constants for default property values used for the default reader, writer and column props.
const (
// Default Buffer size used for the Reader
DefaultBufSize int64 = 4096 * 4
// Default data page size limit is 1K it's not guaranteed, but we will try to
// cut data pages off at this size where possible.
DefaultDataPageSize int64 = 1024 * 1024
// Default is for dictionary encoding to be turned on, use WithDictionaryDefault
// writer property to change that.
DefaultDictionaryEnabled = true
// If the dictionary reaches the size of this limitation, the writer will use
// the fallback encoding (usually plain) instead of continuing to build the
// dictionary index.
DefaultDictionaryPageSizeLimit = DefaultDataPageSize
// In order to attempt to facilitate data page size limits for writing,
// data is written in batches. Increasing the batch size may improve performance
// but the larger the batch size, the easier it is to overshoot the datapage limit.
DefaultWriteBatchSize int64 = 1024
// Default maximum number of rows for a single row group
DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024
// Default is to have stats enabled for all columns, use writer properties to
// change the default, or to enable/disable for specific columns.
DefaultStatsEnabled = true
// If the stats are larger than 4K the writer will skip writing them out anyways.
DefaultMaxStatsSize int64 = 4096
DefaultCreatedBy = "parquet-go version 1.0.0"
)
// ColumnProperties defines the encoding, codec, and so on for a given column.
type ColumnProperties struct {
Encoding Encoding
Codec compress.Compression
DictionaryEnabled bool
StatsEnabled bool
MaxStatsSize int64
CompressionLevel int
}
// DefaultColumnProperties returns the default properties which get utilized for writing.
//
// The default column properties are the following constants:
// Encoding: Encodings.Plain
// Codec: compress.Codecs.Uncompressed
// DictionaryEnabled: DefaultDictionaryEnabled
// StatsEnabled: DefaultStatsEnabled
// MaxStatsSize: DefaultMaxStatsSize
// CompressionLevel: compress.DefaultCompressionLevel
func DefaultColumnProperties() ColumnProperties {
return ColumnProperties{
Encoding: Encodings.Plain,
Codec: compress.Codecs.Uncompressed,
DictionaryEnabled: DefaultDictionaryEnabled,
StatsEnabled: DefaultStatsEnabled,
MaxStatsSize: DefaultMaxStatsSize,
CompressionLevel: compress.DefaultCompressionLevel,
}
}
type writerPropConfig struct {
wr *WriterProperties
encodings map[string]Encoding
codecs map[string]compress.Compression
compressLevel map[string]int
dictEnabled map[string]bool
statsEnabled map[string]bool
}
// WriterProperty is used as the options for building a writer properties instance
type WriterProperty func(*writerPropConfig)
// WithAllocator specifies the writer to use the given allocator
func WithAllocator(mem memory.Allocator) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.mem = mem
}
}
// WithDictionaryDefault sets the default value for whether to enable dictionary encoding
func WithDictionaryDefault(dict bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.DictionaryEnabled = dict
}
}
// WithDictionaryFor allows enabling or disabling dictionary encoding for a given column path string
func WithDictionaryFor(path string, dict bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.dictEnabled[path] = true
}
}
// WithDictionaryPath is like WithDictionaryFor, but takes a ColumnPath type
func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty {
return WithDictionaryFor(path.String(), dict)
}
// WithDictionaryPageSizeLimit is the limit of the dictionary at which the writer
// will fallback to plain encoding instead
func WithDictionaryPageSizeLimit(limit int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.dictPagesize = limit
}
}
// WithBatchSize specifies the number of rows to use for batch writes to columns
func WithBatchSize(batch int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.batchSize = batch
}
}
// WithMaxRowGroupLength specifies the number of rows as the maximum number of rows for a given row group in the writer.
func WithMaxRowGroupLength(nrows int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.maxRowGroupLen = nrows
}
}
// WithDataPageSize specifies the size to use for splitting data pages for column writing.
func WithDataPageSize(pgsize int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.pageSize = pgsize
}
}
// WithDataPageVersion specifies whether to use Version 1 or Version 2 of the DataPage spec
func WithDataPageVersion(version DataPageVersion) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.dataPageVersion = version
}
}
// WithVersion specifies which Parquet Spec version to utilize for writing.
func WithVersion(version Version) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.parquetVersion = version
}
}
// WithCreatedBy specifies the "created by" string to use for the writer
func WithCreatedBy(createdby string) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.createdBy = createdby
}
}
// WithEncoding defines the encoding that is used when we aren't using dictionary encoding.
//
// This is either applied if dictionary encoding is disabled, or if we fallback if the dictionary
// grew too large.
func WithEncoding(encoding Encoding) WriterProperty {
return func(cfg *writerPropConfig) {
if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
panic("parquet: can't use dictionary encoding as fallback encoding")
}
cfg.wr.defColumnProps.Encoding = encoding
}
}
// WithEncodingFor is for defining the encoding only for a specific column path. This encoding will be used
// if dictionary encoding is disabled for the column or if we fallback because the dictionary grew too large
func WithEncodingFor(path string, encoding Encoding) WriterProperty {
return func(cfg *writerPropConfig) {
if encoding == Encodings.PlainDict || encoding == Encodings.RLEDict {
panic("parquet: can't use dictionary encoding as fallback encoding")
}
cfg.encodings[path] = encoding
}
}
// WithEncodingPath is the same as WithEncodingFor but takes a ColumnPath directly.
func WithEncodingPath(path ColumnPath, encoding Encoding) WriterProperty {
return WithEncodingFor(path.String(), encoding)
}
// WithCompression specifies the default compression type to use for column writing.
func WithCompression(codec compress.Compression) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.Codec = codec
}
}
// WithCompressionFor specifies the compression type for the given column.
func WithCompressionFor(path string, codec compress.Compression) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.codecs[path] = codec
}
}
// WithCompressionPath is the same as WithCompressionFor but takes a ColumnPath directly.
func WithCompressionPath(path ColumnPath, codec compress.Compression) WriterProperty {
return WithCompressionFor(path.String(), codec)
}
// WithMaxStatsSize sets a maximum size for the statistics before we decide not to include them.
func WithMaxStatsSize(maxStatsSize int64) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.MaxStatsSize = maxStatsSize
}
}
// WithCompressionLevel specifies the default compression level for the compressor in every column.
//
// The provided compression level is compressor specific. The user would have to know what the available
// levels are for the selected compressor. If the compressor does not allow for selecting different
// compression levels, then this function will have no effect. Parquet and Arrow will not validate the
// passed compression level. If no level is selected by the user or if the special compress.DefaultCompressionLevel
// value is used, then parquet will select the compression level.
func WithCompressionLevel(level int) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.CompressionLevel = level
}
}
// WithCompressionLevelFor is like WithCompressionLevel but only for the given column path.
func WithCompressionLevelFor(path string, level int) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.compressLevel[path] = level
}
}
// WithCompressionLevelPath is the same as WithCompressionLevelFor but takes a ColumnPath
func WithCompressionLevelPath(path ColumnPath, level int) WriterProperty {
return WithCompressionLevelFor(path.String(), level)
}
// WithStats specifies a default for whether or not to enable column statistics.
func WithStats(enabled bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.defColumnProps.StatsEnabled = enabled
}
}
// WithStatsFor specifies a per column value as to enable or disable statistics in the resulting file.
func WithStatsFor(path string, enabled bool) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.statsEnabled[path] = enabled
}
}
// WithStatsPath is the same as WithStatsFor but takes a ColumnPath
func WithStatsPath(path ColumnPath, enabled bool) WriterProperty {
return WithStatsFor(path.String(), enabled)
}
// WithEncryptionProperties specifies the file level encryption handling for writing the file.
func WithEncryptionProperties(props *FileEncryptionProperties) WriterProperty {
return func(cfg *writerPropConfig) {
cfg.wr.encryptionProps = props
}
}
// WriterProperties is the collection of properties to use for writing a parquet file. The values are
// read only once it has been constructed.
type WriterProperties struct {
mem memory.Allocator
dictPagesize int64
batchSize int64
maxRowGroupLen int64
pageSize int64
parquetVersion Version
createdBy string
dataPageVersion DataPageVersion
defColumnProps ColumnProperties
columnProps map[string]*ColumnProperties
encryptionProps *FileEncryptionProperties
}
func defaultWriterProperties() *WriterProperties {
return &WriterProperties{
mem: memory.DefaultAllocator,
dictPagesize: DefaultDictionaryPageSizeLimit,
batchSize: DefaultWriteBatchSize,
maxRowGroupLen: DefaultMaxRowGroupLen,
pageSize: DefaultDataPageSize,
parquetVersion: V1,
dataPageVersion: DataPageV1,
createdBy: DefaultCreatedBy,
defColumnProps: DefaultColumnProperties(),
}
}
// NewWriterProperties takes a list of options for building the properties. If multiple options are used which conflict
// then the last option is the one which will take effect. If no WriterProperty options are provided, then the default
// properties will be utilized for writing.
//
// The Default properties use the following constants:
// Allocator: memory.DefaultAllocator
// DictionaryPageSize: DefaultDictionaryPageSizeLimit
// BatchSize: DefaultWriteBatchSize
// MaxRowGroupLength: DefaultMaxRowGroupLen
// PageSize: DefaultDataPageSize
// ParquetVersion: V1
// DataPageVersion: DataPageV1
// CreatedBy: DefaultCreatedBy
func NewWriterProperties(opts ...WriterProperty) *WriterProperties {
cfg := writerPropConfig{
wr: defaultWriterProperties(),
encodings: make(map[string]Encoding),
codecs: make(map[string]compress.Compression),
compressLevel: make(map[string]int),
dictEnabled: make(map[string]bool),
statsEnabled: make(map[string]bool),
}
for _, o := range opts {
o(&cfg)
}
cfg.wr.columnProps = make(map[string]*ColumnProperties)
get := func(key string) *ColumnProperties {
if p, ok := cfg.wr.columnProps[key]; ok {
return p
}
cfg.wr.columnProps[key] = new(ColumnProperties)
*cfg.wr.columnProps[key] = cfg.wr.defColumnProps
return cfg.wr.columnProps[key]
}
for key, value := range cfg.encodings {
get(key).Encoding = value
}
for key, value := range cfg.codecs {
get(key).Codec = value
}
for key, value := range cfg.compressLevel {
get(key).CompressionLevel = value
}
for key, value := range cfg.dictEnabled {
get(key).DictionaryEnabled = value
}
for key, value := range cfg.statsEnabled {
get(key).StatsEnabled = value
}
return cfg.wr
}
// FileEncryptionProperties returns the current encryption properties that were
// used to create the writer properties.
func (w *WriterProperties) FileEncryptionProperties() *FileEncryptionProperties {
return w.encryptionProps
}
func (w *WriterProperties) Allocator() memory.Allocator { return w.mem }
func (w *WriterProperties) CreatedBy() string { return w.createdBy }
func (w *WriterProperties) WriteBatchSize() int64 { return w.batchSize }
func (w *WriterProperties) DataPageSize() int64 { return w.pageSize }
func (w *WriterProperties) DictionaryPageSizeLimit() int64 { return w.dictPagesize }
func (w *WriterProperties) Version() Version { return w.parquetVersion }
func (w *WriterProperties) DataPageVersion() DataPageVersion { return w.dataPageVersion }
func (w *WriterProperties) MaxRowGroupLength() int64 { return w.maxRowGroupLen }
// Compression returns the default compression type that will be used for any columns that don't
// have a specific compression defined.
func (w *WriterProperties) Compression() compress.Compression { return w.defColumnProps.Codec }
// CompressionFor will return the compression type that is specified for the given column path, or
// the default compression codec if there isn't one specific to this column.
func (w *WriterProperties) CompressionFor(path string) compress.Compression {
if p, ok := w.columnProps[path]; ok {
return p.Codec
}
return w.defColumnProps.Codec
}
//CompressionPath is the same as CompressionFor but takes a ColumnPath
func (w *WriterProperties) CompressionPath(path ColumnPath) compress.Compression {
return w.CompressionFor(path.String())
}
// CompressionLevel returns the default compression level that will be used for any column
// that doesn't have a compression level specified for it.
func (w *WriterProperties) CompressionLevel() int { return w.defColumnProps.CompressionLevel }
// CompressionLevelFor returns the compression level that will be utilized for the given column,
// or the default compression level if the column doesn't have a specific level specified.
func (w *WriterProperties) CompressionLevelFor(path string) int {
if p, ok := w.columnProps[path]; ok {
return p.CompressionLevel
}
return w.defColumnProps.CompressionLevel
}
// CompressionLevelPath is the same as CompressionLevelFor but takes a ColumnPath object
func (w *WriterProperties) CompressionLevelPath(path ColumnPath) int {
return w.CompressionLevelFor(path.String())
}
// Encoding returns the default encoding that will be utilized for any columns which don't have a different value
// specified.
func (w *WriterProperties) Encoding() Encoding { return w.defColumnProps.Encoding }
// EncodingFor returns the encoding that will be used for the given column path, or the default encoding if there
// isn't one specified for this column.
func (w *WriterProperties) EncodingFor(path string) Encoding {
if p, ok := w.columnProps[path]; ok {
return p.Encoding
}
return w.defColumnProps.Encoding
}
// EncodingPath is the same as EncodingFor but takes a ColumnPath object
func (w *WriterProperties) EncodingPath(path ColumnPath) Encoding {
return w.EncodingFor(path.String())
}
// DictionaryIndexEncoding returns which encoding will be used for the Dictionary Index values based on the
// parquet version. V1 uses PlainDict and V2 uses RLEDict
func (w *WriterProperties) DictionaryIndexEncoding() Encoding {
if w.parquetVersion == V1 {
return Encodings.PlainDict
}
return Encodings.RLEDict
}
// DictionaryPageEncoding returns the encoding that will be utilized for the DictionaryPage itself based on the parquet
// version. V1 uses PlainDict, v2 uses Plain
func (w *WriterProperties) DictionaryPageEncoding() Encoding {
if w.parquetVersion == V1 {
return Encodings.PlainDict
}
return Encodings.Plain
}
// DictionaryEnabled returns the default value as for whether or not dictionary encoding will be utilized for columns
// that aren't separately specified.
func (w *WriterProperties) DictionaryEnabled() bool { return w.defColumnProps.DictionaryEnabled }
// DictionaryEnabledFor returns whether or not dictionary encoding will be used for the specified column when writing
// or the default value if the column was not separately specified.
func (w *WriterProperties) DictionaryEnabledFor(path string) bool {
if p, ok := w.columnProps[path]; ok {
return p.DictionaryEnabled
}
return w.defColumnProps.DictionaryEnabled
}
// DictionaryEnabledPath is the same as DictionaryEnabledFor but takes a ColumnPath object.
func (w *WriterProperties) DictionaryEnabledPath(path ColumnPath) bool {
return w.DictionaryEnabledFor(path.String())
}
// StatisticsEnabled returns the default value for whether or not stats are enabled to be written for columns
// that aren't separately specified.
func (w *WriterProperties) StatisticsEnabled() bool { return w.defColumnProps.StatsEnabled }
// StatisticsEnabledFor returns whether stats will be written for the given column path, or the default value if
// it wasn't separately specified.
func (w *WriterProperties) StatisticsEnabledFor(path string) bool {
if p, ok := w.columnProps[path]; ok {
return p.StatsEnabled
}
return w.defColumnProps.StatsEnabled
}
// StatisticsEnabledPath is the same as StatisticsEnabledFor but takes a ColumnPath object.
func (w *WriterProperties) StatisticsEnabledPath(path ColumnPath) bool {
return w.StatisticsEnabledFor(path.String())
}
// MaxStatsSize returns the default maximum size for stats
func (w *WriterProperties) MaxStatsSize() int64 { return w.defColumnProps.MaxStatsSize }
// MaxStatsSizeFor returns the maximum stat size for the given column path
func (w *WriterProperties) MaxStatsSizeFor(path string) int64 {
if p, ok := w.columnProps[path]; ok {
return p.MaxStatsSize
}
return w.defColumnProps.MaxStatsSize
}
// MaxStatsSizePath is the same as MaxStatsSizeFor but takes a ColumnPath
func (w *WriterProperties) MaxStatsSizePath(path ColumnPath) int64 {
return w.MaxStatsSizeFor(path.String())
}
// ColumnEncryptionProperties returns the specific properties for encryption that will be used for the given column path
func (w *WriterProperties) ColumnEncryptionProperties(path string) *ColumnEncryptionProperties {
if w.encryptionProps != nil {
return w.encryptionProps.ColumnEncryptionProperties(path)
}
return nil
}