blob: 5053a14977a3c00970d62a3607ee9ad6c649248e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv
import (
"encoding/csv"
"io"
"strconv"
"sync"
"sync/atomic"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/internal/debug"
"github.com/apache/arrow/go/arrow/memory"
"github.com/pkg/errors"
)
// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
type Reader struct {
r *csv.Reader
schema *arrow.Schema
refs int64
bld *array.RecordBuilder
cur array.Record
err error
chunk int
done bool
next func() bool
mem memory.Allocator
header bool
once sync.Once
}
// NewReader returns a reader that reads from the CSV file and creates
// array.Records from the given schema.
//
// NewReader panics if the given schema contains fields that have types that are not
// primitive types.
func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
validate(schema)
rr := &Reader{r: csv.NewReader(r), schema: schema, refs: 1, chunk: 1}
rr.r.ReuseRecord = true
for _, opt := range opts {
opt(rr)
}
if rr.mem == nil {
rr.mem = memory.DefaultAllocator
}
rr.bld = array.NewRecordBuilder(rr.mem, rr.schema)
switch {
case rr.chunk < 0:
rr.next = rr.nextall
case rr.chunk > 1:
rr.next = rr.nextn
default:
rr.next = rr.next1
}
return rr
}
func (r *Reader) readHeader() error {
records, err := r.r.Read()
if err != nil {
return errors.Wrapf(err, "arrow/csv: could not read header from file")
}
if len(records) != len(r.schema.Fields()) {
return ErrMismatchFields
}
fields := make([]arrow.Field, len(records))
for idx, name := range records {
fields[idx] = r.schema.Field(idx)
fields[idx].Name = name
}
meta := r.schema.Metadata()
r.schema = arrow.NewSchema(fields, &meta)
r.bld = array.NewRecordBuilder(r.mem, r.schema)
return nil
}
// Err returns the last error encountered during the iteration over the
// underlying CSV file.
func (r *Reader) Err() error { return r.err }
func (r *Reader) Schema() *arrow.Schema { return r.schema }
// Record returns the current record that has been extracted from the
// underlying CSV file.
// It is valid until the next call to Next.
func (r *Reader) Record() array.Record { return r.cur }
// Next returns whether a Record could be extracted from the underlying CSV file.
//
// Next panics if the number of records extracted from a CSV row does not match
// the number of fields of the associated schema.
func (r *Reader) Next() bool {
if r.header {
r.once.Do(func() {
r.err = r.readHeader()
})
}
if r.cur != nil {
r.cur.Release()
r.cur = nil
}
if r.err != nil || r.done {
return false
}
return r.next()
}
// next1 reads one row from the CSV file and creates a single Record
// from that row.
func (r *Reader) next1() bool {
var recs []string
recs, r.err = r.r.Read()
if r.err != nil {
r.done = true
if r.err == io.EOF {
r.err = nil
}
return false
}
r.validate(recs)
r.read(recs)
r.cur = r.bld.NewRecord()
return true
}
// nextall reads the whole CSV file into memory and creates one single
// Record from all the CSV rows.
func (r *Reader) nextall() bool {
defer func() {
r.done = true
}()
var (
recs [][]string
)
recs, r.err = r.r.ReadAll()
if r.err != nil {
return false
}
for _, rec := range recs {
r.validate(rec)
r.read(rec)
}
r.cur = r.bld.NewRecord()
return true
}
// nextn reads n rows from the CSV file, where n is the chunk size, and creates
// a Record from these rows.
func (r *Reader) nextn() bool {
var (
recs []string
n = 0
)
for i := 0; i < r.chunk && !r.done; i++ {
recs, r.err = r.r.Read()
if r.err != nil {
r.done = true
break
}
r.validate(recs)
r.read(recs)
n++
}
if r.err != nil {
r.done = true
if r.err == io.EOF {
r.err = nil
}
}
r.cur = r.bld.NewRecord()
return n > 0
}
func (r *Reader) validate(recs []string) {
if r.err != nil {
return
}
if len(recs) != len(r.schema.Fields()) {
r.err = ErrMismatchFields
return
}
}
func (r *Reader) read(recs []string) {
for i, str := range recs {
switch r.schema.Field(i).Type.(type) {
case *arrow.BooleanType:
var v bool
switch str {
case "false", "False", "0":
v = false
case "true", "True", "1":
v = true
}
r.bld.Field(i).(*array.BooleanBuilder).Append(v)
case *arrow.Int8Type:
v := r.readI8(str)
r.bld.Field(i).(*array.Int8Builder).Append(v)
case *arrow.Int16Type:
v := r.readI16(str)
r.bld.Field(i).(*array.Int16Builder).Append(v)
case *arrow.Int32Type:
v := r.readI32(str)
r.bld.Field(i).(*array.Int32Builder).Append(v)
case *arrow.Int64Type:
v := r.readI64(str)
r.bld.Field(i).(*array.Int64Builder).Append(v)
case *arrow.Uint8Type:
v := r.readU8(str)
r.bld.Field(i).(*array.Uint8Builder).Append(v)
case *arrow.Uint16Type:
v := r.readU16(str)
r.bld.Field(i).(*array.Uint16Builder).Append(v)
case *arrow.Uint32Type:
v := r.readU32(str)
r.bld.Field(i).(*array.Uint32Builder).Append(v)
case *arrow.Uint64Type:
v := r.readU64(str)
r.bld.Field(i).(*array.Uint64Builder).Append(v)
case *arrow.Float32Type:
v := r.readF32(str)
r.bld.Field(i).(*array.Float32Builder).Append(v)
case *arrow.Float64Type:
v := r.readF64(str)
r.bld.Field(i).(*array.Float64Builder).Append(v)
case *arrow.StringType:
r.bld.Field(i).(*array.StringBuilder).Append(str)
}
}
}
func (r *Reader) readI8(str string) int8 {
v, err := strconv.ParseInt(str, 10, 8)
if err != nil && r.err == nil {
r.err = err
return 0
}
return int8(v)
}
func (r *Reader) readI16(str string) int16 {
v, err := strconv.ParseInt(str, 10, 16)
if err != nil && r.err == nil {
r.err = err
return 0
}
return int16(v)
}
func (r *Reader) readI32(str string) int32 {
v, err := strconv.ParseInt(str, 10, 32)
if err != nil && r.err == nil {
r.err = err
return 0
}
return int32(v)
}
func (r *Reader) readI64(str string) int64 {
v, err := strconv.ParseInt(str, 10, 64)
if err != nil && r.err == nil {
r.err = err
return 0
}
return int64(v)
}
func (r *Reader) readU8(str string) uint8 {
v, err := strconv.ParseUint(str, 10, 8)
if err != nil && r.err == nil {
r.err = err
return 0
}
return uint8(v)
}
func (r *Reader) readU16(str string) uint16 {
v, err := strconv.ParseUint(str, 10, 16)
if err != nil && r.err == nil {
r.err = err
return 0
}
return uint16(v)
}
func (r *Reader) readU32(str string) uint32 {
v, err := strconv.ParseUint(str, 10, 32)
if err != nil && r.err == nil {
r.err = err
return 0
}
return uint32(v)
}
func (r *Reader) readU64(str string) uint64 {
v, err := strconv.ParseUint(str, 10, 64)
if err != nil && r.err == nil {
r.err = err
return 0
}
return uint64(v)
}
func (r *Reader) readF32(str string) float32 {
v, err := strconv.ParseFloat(str, 32)
if err != nil && r.err == nil {
r.err = err
return 0
}
return float32(v)
}
func (r *Reader) readF64(str string) float64 {
v, err := strconv.ParseFloat(str, 64)
if err != nil && r.err == nil {
r.err = err
return 0
}
return float64(v)
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *Reader) Retain() {
atomic.AddInt64(&r.refs, 1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *Reader) Release() {
debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
if atomic.AddInt64(&r.refs, -1) == 0 {
if r.cur != nil {
r.cur.Release()
}
}
}
var (
_ array.RecordReader = (*Reader)(nil)
)