blob: cb0184448213c67323ef8ab44379dcc5e984c9df [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv
import (
"encoding/csv"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/array"
"github.com/apache/arrow/go/v6/arrow/internal/debug"
"github.com/apache/arrow/go/v6/arrow/memory"
"golang.org/x/xerrors"
)
// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
type Reader struct {
r *csv.Reader
schema *arrow.Schema
refs int64
bld *array.RecordBuilder
cur array.Record
err error
chunk int
done bool
next func() bool
mem memory.Allocator
header bool
once sync.Once
fieldConverter []func(field array.Builder, val string)
stringsCanBeNull bool
nulls []string
}
// NewReader returns a reader that reads from the CSV file and creates
// array.Records from the given schema.
//
// NewReader panics if the given schema contains fields that have types that are not
// primitive types.
func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
validate(schema)
rr := &Reader{
r: csv.NewReader(r),
schema: schema,
refs: 1,
chunk: 1,
stringsCanBeNull: false,
}
rr.r.ReuseRecord = true
for _, opt := range opts {
opt(rr)
}
if rr.mem == nil {
rr.mem = memory.DefaultAllocator
}
rr.bld = array.NewRecordBuilder(rr.mem, rr.schema)
switch {
case rr.chunk < 0:
rr.next = rr.nextall
case rr.chunk > 1:
rr.next = rr.nextn
default:
rr.next = rr.next1
}
// Create a table of functions that will parse columns. This optimization
// allows us to specialize the implementation of each column's decoding
// and hoist type-based branches outside the inner loop.
rr.fieldConverter = make([]func(array.Builder, string), len(schema.Fields()))
for idx, field := range schema.Fields() {
rr.fieldConverter[idx] = rr.initFieldConverter(&field)
}
return rr
}
func (r *Reader) readHeader() error {
records, err := r.r.Read()
if err != nil {
return xerrors.Errorf("arrow/csv: could not read header from file: %w", err)
}
if len(records) != len(r.schema.Fields()) {
return ErrMismatchFields
}
fields := make([]arrow.Field, len(records))
for idx, name := range records {
fields[idx] = r.schema.Field(idx)
fields[idx].Name = name
}
meta := r.schema.Metadata()
r.schema = arrow.NewSchema(fields, &meta)
r.bld = array.NewRecordBuilder(r.mem, r.schema)
return nil
}
// Err returns the last error encountered during the iteration over the
// underlying CSV file.
func (r *Reader) Err() error { return r.err }
func (r *Reader) Schema() *arrow.Schema { return r.schema }
// Record returns the current record that has been extracted from the
// underlying CSV file.
// It is valid until the next call to Next.
func (r *Reader) Record() array.Record { return r.cur }
// Next returns whether a Record could be extracted from the underlying CSV file.
//
// Next panics if the number of records extracted from a CSV row does not match
// the number of fields of the associated schema.
func (r *Reader) Next() bool {
if r.header {
r.once.Do(func() {
r.err = r.readHeader()
})
}
if r.cur != nil {
r.cur.Release()
r.cur = nil
}
if r.err != nil || r.done {
return false
}
return r.next()
}
// next1 reads one row from the CSV file and creates a single Record
// from that row.
func (r *Reader) next1() bool {
var recs []string
recs, r.err = r.r.Read()
if r.err != nil {
r.done = true
if r.err == io.EOF {
r.err = nil
}
return false
}
r.validate(recs)
r.read(recs)
r.cur = r.bld.NewRecord()
return true
}
// nextall reads the whole CSV file into memory and creates one single
// Record from all the CSV rows.
func (r *Reader) nextall() bool {
defer func() {
r.done = true
}()
var (
recs [][]string
)
recs, r.err = r.r.ReadAll()
if r.err != nil {
return false
}
for _, rec := range recs {
r.validate(rec)
r.read(rec)
}
r.cur = r.bld.NewRecord()
return true
}
// nextn reads n rows from the CSV file, where n is the chunk size, and creates
// a Record from these rows.
func (r *Reader) nextn() bool {
var (
recs []string
n = 0
)
for i := 0; i < r.chunk && !r.done; i++ {
recs, r.err = r.r.Read()
if r.err != nil {
r.done = true
break
}
r.validate(recs)
r.read(recs)
n++
}
if r.err != nil {
r.done = true
if r.err == io.EOF {
r.err = nil
}
}
r.cur = r.bld.NewRecord()
return n > 0
}
func (r *Reader) validate(recs []string) {
if r.err != nil {
return
}
if len(recs) != len(r.schema.Fields()) {
r.err = ErrMismatchFields
return
}
}
func (r *Reader) isNull(val string) bool {
for _, v := range r.nulls {
if v == val {
return true
}
}
return false
}
func (r *Reader) read(recs []string) {
for i, str := range recs {
r.fieldConverter[i](r.bld.Field(i), str)
}
}
func (r *Reader) initFieldConverter(field *arrow.Field) func(array.Builder, string) {
switch field.Type.(type) {
case *arrow.BooleanType:
return func(field array.Builder, str string) {
r.parseBool(field, str)
}
case *arrow.Int8Type:
return func(field array.Builder, str string) {
r.parseInt8(field, str)
}
case *arrow.Int16Type:
return func(field array.Builder, str string) {
r.parseInt16(field, str)
}
case *arrow.Int32Type:
return func(field array.Builder, str string) {
r.parseInt32(field, str)
}
case *arrow.Int64Type:
return func(field array.Builder, str string) {
r.parseInt64(field, str)
}
case *arrow.Uint8Type:
return func(field array.Builder, str string) {
r.parseUint8(field, str)
}
case *arrow.Uint16Type:
return func(field array.Builder, str string) {
r.parseUint16(field, str)
}
case *arrow.Uint32Type:
return func(field array.Builder, str string) {
r.parseUint32(field, str)
}
case *arrow.Uint64Type:
return func(field array.Builder, str string) {
r.parseUint64(field, str)
}
case *arrow.Float32Type:
return func(field array.Builder, str string) {
r.parseFloat32(field, str)
}
case *arrow.Float64Type:
return func(field array.Builder, str string) {
r.parseFloat64(field, str)
}
case *arrow.StringType:
// specialize the implementation when we know we cannot have nulls
if r.stringsCanBeNull {
return func(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
} else {
field.(*array.StringBuilder).Append(str)
}
}
} else {
return func(field array.Builder, str string) {
field.(*array.StringBuilder).Append(str)
}
}
default:
panic(fmt.Errorf("arrow/csv: unhandled field type %T", field.Type))
}
}
func (r *Reader) parseBool(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
var v bool
switch str {
case "false", "False", "0":
v = false
case "true", "True", "1":
v = true
default:
r.err = fmt.Errorf("Unrecognized boolean: %s", str)
field.AppendNull()
return
}
field.(*array.BooleanBuilder).Append(v)
}
func (r *Reader) parseInt8(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseInt(str, 10, 8)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Int8Builder).Append(int8(v))
}
func (r *Reader) parseInt16(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseInt(str, 10, 16)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Int16Builder).Append(int16(v))
}
func (r *Reader) parseInt32(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseInt(str, 10, 32)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Int32Builder).Append(int32(v))
}
func (r *Reader) parseInt64(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseInt(str, 10, 64)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Int64Builder).Append(v)
}
func (r *Reader) parseUint8(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseUint(str, 10, 8)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Uint8Builder).Append(uint8(v))
}
func (r *Reader) parseUint16(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseUint(str, 10, 16)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Uint16Builder).Append(uint16(v))
}
func (r *Reader) parseUint32(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseUint(str, 10, 32)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Uint32Builder).Append(uint32(v))
}
func (r *Reader) parseUint64(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseUint(str, 10, 64)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Uint64Builder).Append(v)
}
func (r *Reader) parseFloat32(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseFloat(str, 32)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Float32Builder).Append(float32(v))
}
func (r *Reader) parseFloat64(field array.Builder, str string) {
if r.isNull(str) {
field.AppendNull()
return
}
v, err := strconv.ParseFloat(str, 64)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
field.(*array.Float64Builder).Append(v)
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *Reader) Retain() {
atomic.AddInt64(&r.refs, 1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *Reader) Release() {
debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
if atomic.AddInt64(&r.refs, -1) == 0 {
if r.cur != nil {
r.cur.Release()
}
}
}
var (
_ array.RecordReader = (*Reader)(nil)
)