blob: bc2296a0a0730857f4f25b4a57aabe2b23df29aa [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
module Arrow
class TableSaver
class << self
def save(table, output, options={})
new(table, output, options).save
end
end
def initialize(table, output, options={})
@table = table
output = output.to_path if output.respond_to?(:to_path)
@output = output
@options = options
fill_options
end
def save
format = @options[:format]
custom_save_method = "save_as_#{format}"
unless respond_to?(custom_save_method, true)
available_formats = []
(methods(true) | private_methods(true)).each do |name|
match_data = /\Asave_as_/.match(name.to_s)
if match_data
available_formats << match_data.post_match
end
end
deprecated_formats = ["batch", "stream"]
available_formats -= deprecated_formats
message = "Arrow::Table save format must be one of ["
message << available_formats.join(", ")
message << "]: #{format.inspect}"
raise ArgumentError, message
end
if method(custom_save_method).arity.zero?
__send__(custom_save_method)
else
# For backward compatibility.
__send__(custom_save_method, @output)
end
end
private
def fill_options
if @options[:format] and @options.key?(:compression)
return
end
if @output.is_a?(Buffer)
info = {}
else
extension = PathExtension.new(@output)
info = extension.extract
end
format = info[:format]
@options = @options.dup
if format and respond_to?("save_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
end
unless @options.key?(:compression)
@options[:compression] = info[:compression]
end
end
def open_raw_output_stream(&block)
if @output.is_a?(Buffer)
BufferOutputStream.open(@output, &block)
else
FileOutputStream.open(@output, false, &block)
end
end
def open_output_stream(&block)
compression = @options[:compression]
if compression
codec = Codec.new(compression)
open_raw_output_stream do |raw_output|
CompressedOutputStream.open(codec, raw_output) do |output|
yield(output)
end
end
else
open_raw_output_stream(&block)
end
end
def save_raw(writer_class)
open_output_stream do |output|
writer_class.open(output, @table.schema) do |writer|
writer.write_table(@table)
end
end
end
def save_as_arrow
save_as_arrow_file
end
# @since 1.0.0
def save_as_arrow_file
save_raw(RecordBatchFileWriter)
end
# @deprecated Use `format: :arrow_batch` instead.
def save_as_batch
save_as_arrow_file
end
# @since 1.0.0
def save_as_arrow_streaming
save_raw(RecordBatchStreamWriter)
end
# @deprecated Use `format: :arrow_streaming` instead.
def save_as_stream
save_as_arrow_streaming
end
def csv_save(**options)
open_output_stream do |output|
csv = CSV.new(output, **options)
names = @table.schema.fields.collect(&:name)
csv << names
@table.raw_records.each do |record|
csv << record
end
end
end
def save_as_csv
csv_save
end
def save_as_tsv
csv_save(col_sep: "\t")
end
def save_as_feather
properties = FeatherWriteProperties.new
properties.class.properties.each do |name|
value = @options[name.to_sym]
next if value.nil?
properties.__send__("#{name}=", value)
end
open_raw_output_stream do |output|
@table.write_as_feather(output, properties)
end
end
end
end