blob: a994bf7d2650a6fc6d3934a3e0fbb27f18639bc7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pqarrow
import (
"context"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/parquet/internal/encoding"
)
// ArrowWriterProperties are used to determine how to manipulate the arrow data
// when writing it to a parquet file.
type ArrowWriterProperties struct {
mem memory.Allocator
timestampAsInt96 bool
coerceTimestamps bool
coerceTimestampUnit arrow.TimeUnit
allowTruncatedTimestamps bool
storeSchema bool
noMapLogicalType bool
// compliantNestedTypes bool
}
// DefaultWriterProps returns the default properties for the arrow writer,
// which are to use memory.DefaultAllocator and coerceTimestampUnit: arrow.Second.
func DefaultWriterProps() ArrowWriterProperties {
return ArrowWriterProperties{
mem: memory.DefaultAllocator,
coerceTimestampUnit: arrow.Second,
}
}
type config struct {
props ArrowWriterProperties
}
// WriterOption is a convenience for building up arrow writer properties
type WriterOption func(*config)
// NewArrowWriterProperties creates a new writer properties object by passing in
// a set of options to control the properties. Once created, an individual instance
// of ArrowWriterProperties is immutable.
func NewArrowWriterProperties(opts ...WriterOption) ArrowWriterProperties {
cfg := config{DefaultWriterProps()}
for _, o := range opts {
o(&cfg)
}
return cfg.props
}
// WithAllocator specifies the allocator to be used by the writer whenever allocating
// buffers and memory.
func WithAllocator(mem memory.Allocator) WriterOption {
return func(c *config) {
c.props.mem = mem
}
}
// WithDeprecatedInt96Timestamps allows specifying to enable conversion of arrow timestamps
// to int96 columns when constructing the schema. Since int96 is the impala standard, it's
// technically deprecated in terms of parquet files but is sometimes needed.
func WithDeprecatedInt96Timestamps(enabled bool) WriterOption {
return func(c *config) {
c.props.timestampAsInt96 = enabled
}
}
// WithCoerceTimestamps enables coercing of timestamp units to a specific time unit
// when constructing the schema and writing data so that regardless of the unit used
// by the datatypes being written, they will be converted to the desired time unit.
func WithCoerceTimestamps(unit arrow.TimeUnit) WriterOption {
return func(c *config) {
c.props.coerceTimestamps = true
c.props.coerceTimestampUnit = unit
}
}
// WithTruncatedTimestamps called with true turns off the error that would be returned
// if coercing a timestamp unit would cause a loss of data such as converting from
// nanoseconds to seconds.
func WithTruncatedTimestamps(allow bool) WriterOption {
return func(c *config) {
c.props.allowTruncatedTimestamps = allow
}
}
// WithStoreSchema enables writing a binary serialized arrow schema to the file in metadata
// to enable certain read options (like "read_dictionary") to be set automatically
//
// If called, the arrow schema is serialized and base64 encoded before being added to the
// metadata of the parquet file with the key "ARROW:schema". If the key exists when
// opening a file for read with pqarrow.FileReader, the schema will be used to choose
// types and options when constructing the arrow schema of the resulting data.
func WithStoreSchema() WriterOption {
return func(c *config) {
c.props.storeSchema = true
}
}
func WithNoMapLogicalType() WriterOption {
return func(c *config) {
c.props.noMapLogicalType = true
}
}
// func WithCompliantNestedTypes(enabled bool) WriterOption {
// return func(c *config) {
// c.props.compliantNestedTypes = enabled
// }
// }
type arrowWriteContext struct {
props ArrowWriterProperties
dataBuffer *memory.Buffer
defLevelsBuffer encoding.Buffer
repLevelsBuffer encoding.Buffer
}
type arrowCtxKey struct{}
// NewArrowWriteContext is for creating a re-usable context object that contains writer properties
// and other re-usable buffers for writing. The resulting context should not be used to write
// multiple columns concurrently. If nil is passed, then DefaultWriterProps will be used.
func NewArrowWriteContext(ctx context.Context, props *ArrowWriterProperties) context.Context {
if props == nil {
p := DefaultWriterProps()
props = &p
}
return context.WithValue(ctx, arrowCtxKey{}, &arrowWriteContext{props: *props})
}
func arrowCtxFromContext(ctx context.Context) *arrowWriteContext {
awc := ctx.Value(arrowCtxKey{})
if awc != nil {
return awc.(*arrowWriteContext)
}
return &arrowWriteContext{
props: DefaultWriterProps(),
}
}
// ArrowReadProperties is the properties to define how to read a parquet file
// into arrow arrays.
type ArrowReadProperties struct {
// If Parallel is true, then functions which read multiple columns will read
// those columns in parallel from the file with a number of readers equal
// to the number of columns. Otherwise columns are read serially.
Parallel bool
// BatchSize is the size used for calls to NextBatch when reading whole columns
BatchSize int64
}