blob: f82263e469d12c8676276313c622496406b0a48f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require "csv"
require "pathname"
require "time"
module Arrow
class CSVLoader
class << self
def load(path_or_data, **options)
new(path_or_data, **options).load
end
end
def initialize(path_or_data, **options)
@path_or_data = path_or_data
@options = options
if @options.key?(:delimiter)
@options[:col_sep] = @options.delete(:delimiter)
end
@compression = @options.delete(:compression)
end
def load
case @path_or_data
when Pathname
load_from_path(@path_or_data.to_path)
when /\A.+\.csv\z/i
load_from_path(@path_or_data)
else
load_data(@path_or_data)
end
end
private
def open_csv(path, **options)
CSV.open(path, **options) do |csv|
yield(csv)
end
end
def parse_csv_data(data, **options)
csv = CSV.new(data, **options)
begin
yield(csv)
ensure
csv.close
end
end
def read_csv(csv)
values_set = []
csv.each do |row|
if row.is_a?(CSV::Row)
row = row.collect(&:last)
end
row.each_with_index do |value, i|
values = (values_set[i] ||= [])
values << value
end
end
return nil if values_set.empty?
arrays = values_set.collect.with_index do |values, i|
ArrayBuilder.build(values)
end
if csv.headers
names = csv.headers
else
names = arrays.size.times.collect(&:to_s)
end
raw_table = {}
names.each_with_index do |name, i|
raw_table[name] = arrays[i]
end
Table.new(raw_table)
end
def reader_options
options = CSVReadOptions.new
@options.each do |key, value|
case key
when :headers
case value
when ::Array
options.column_names = value
when String
return nil
else
if value
options.generate_column_names = false
else
options.generate_column_names = true
end
end
when :column_types
value.each do |name, type|
options.add_column_type(name, type)
end
when :schema
options.add_schema(value)
when :encoding
# process encoding on opening input
when :col_sep
options.delimiter = value
else
setter = "#{key}="
if options.respond_to?(setter)
options.__send__(setter, value)
else
return nil
end
end
end
options
end
def open_decompress_input(raw_input)
if @compression
codec = Codec.new(@compression)
CompressedInputStream.open(codec, raw_input) do |input|
yield(input)
end
else
yield(raw_input)
end
end
def open_encoding_convert_stream(raw_input, &block)
encoding = @options[:encoding]
if encoding
converter = Gio::CharsetConverter.new("UTF-8", encoding)
convert_input_stream =
Gio::ConverterInputStream.new(raw_input, converter)
GIOInputStream.open(convert_input_stream, &block)
else
yield(raw_input)
end
end
def wrap_input(raw_input)
open_decompress_input(raw_input) do |input_|
open_encoding_convert_stream(input_) do |input__|
yield(input__)
end
end
end
def load_from_path(path)
options = reader_options
if options
begin
MemoryMappedInputStream.open(path) do |raw_input|
wrap_input(raw_input) do |input|
return CSVReader.new(input, options).read
end
end
rescue Arrow::Error::Invalid, Gio::Error
end
end
options = update_csv_parse_options(@options, :open_csv, path)
open_csv(path, **options) do |csv|
read_csv(csv)
end
end
def load_data(data)
options = reader_options
if options
begin
BufferInputStream.open(Buffer.new(data)) do |raw_input|
wrap_input(raw_input) do |input|
return CSVReader.new(input, options).read
end
end
rescue Arrow::Error::Invalid, Gio::Error
end
end
options = update_csv_parse_options(@options, :parse_csv_data, data)
parse_csv_data(data, **options) do |csv|
read_csv(csv)
end
end
def selective_converter(target_index)
lambda do |field, field_info|
if target_index.nil? or field_info.index == target_index
yield(field)
else
field
end
end
end
BOOLEAN_CONVERTER = lambda do |field|
begin
encoded_field = field.encode(CSV::ConverterEncoding)
rescue EncodingError
field
else
case encoded_field
when "true"
true
when "false"
false
else
field
end
end
end
ISO8601_CONVERTER = lambda do |field|
begin
encoded_field = field.encode(CSV::ConverterEncoding)
rescue EncodingError
field
else
begin
::Time.iso8601(encoded_field)
rescue ArgumentError
field
end
end
end
AVAILABLE_CSV_PARSE_OPTIONS = {}
CSV.instance_method(:initialize).parameters.each do |type, name|
AVAILABLE_CSV_PARSE_OPTIONS[name] = true if type == :key
end
def update_csv_parse_options(options, create_csv, *args)
if options.key?(:converters)
new_options = options.dup
else
converters = [:all, BOOLEAN_CONVERTER, ISO8601_CONVERTER]
new_options = options.merge(converters: converters)
end
# TODO: Support :schema and :column_types
unless AVAILABLE_CSV_PARSE_OPTIONS.empty?
new_options.select! do |key, value|
AVAILABLE_CSV_PARSE_OPTIONS.key?(key)
end
end
unless options.key?(:headers)
__send__(create_csv, *args, **new_options) do |csv|
new_options[:headers] = have_header?(csv)
end
end
unless options.key?(:converters)
__send__(create_csv, *args, **new_options) do |csv|
new_options[:converters] = detect_robust_converters(csv)
end
end
new_options
end
def have_header?(csv)
if @options.key?(:headers)
return @options[:headers]
end
row1 = csv.shift
return false if row1.nil?
return false if row1.any?(&:nil?)
row2 = csv.shift
return nil if row2.nil?
return true if row2.any?(&:nil?)
return false if row1.any? {|value| not value.is_a?(String)}
if row1.collect(&:class) != row2.collect(&:class)
return true
end
nil
end
def detect_robust_converters(csv)
column_types = []
csv.each do |row|
if row.is_a?(CSV::Row)
each_value = Enumerator.new do |yielder|
row.each do |_name, value|
yielder << value
end
end
else
each_value = row.each
end
each_value.with_index do |value, i|
current_column_type = column_types[i]
next if current_column_type == :string
candidate_type = nil
case value
when nil
next
when "true", "false", true, false
candidate_type = :boolean
when Integer
candidate_type = :integer
if current_column_type == :float
candidate_type = :float
end
when Float
candidate_type = :float
if current_column_type == :integer
column_types[i] = candidate_type
end
when ::Time
candidate_type = :time
when DateTime
candidate_type = :date_time
when Date
candidate_type = :date
when String
next if value.empty?
candidate_type = :string
else
candidate_type = :string
end
column_types[i] ||= candidate_type
if column_types[i] != candidate_type
column_types[i] = :string
end
end
end
converters = []
column_types.each_with_index do |type, i|
case type
when :boolean
converters << selective_converter(i, &BOOLEAN_CONVERTER)
when :integer
converters << selective_converter(i) do |field|
if field.nil? or field.empty?
nil
else
CSV::Converters[:integer].call(field)
end
end
when :float
converters << selective_converter(i) do |field|
if field.nil? or field.empty?
nil
else
CSV::Converters[:float].call(field)
end
end
when :time
converters << selective_converter(i, &ISO8601_CONVERTER)
when :date_time
converters << selective_converter(i, &CSV::Converters[:date_time])
when :date
converters << selective_converter(i, &CSV::Converters[:date])
end
end
converters
end
end
end