# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3
from cython.operator cimport dereference as deref
import codecs
from import Mapping
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
RecordBatchReader, ensure_type,
maybe_unbox_memory_pool, get_input_stream,
get_writer, native_transcoding_input_stream,
pyarrow_unwrap_batch, pyarrow_unwrap_table,
pyarrow_wrap_schema, pyarrow_wrap_table,
pyarrow_wrap_data_type, pyarrow_unwrap_data_type,
Table, RecordBatch, StopToken)
from pyarrow.lib import frombytes, tobytes, SignalStopHandler
from pyarrow.util import _stringify_path
cdef unsigned char _single_char(s) except 0:
val = ord(s)
if val == 0 or val > 127:
raise ValueError("Expecting an ASCII character")
return <unsigned char> val
cdef class ReadOptions(_Weakrefable):
Options for reading CSV files.
use_threads : bool, optional (default True)
Whether to use multiple threads to accelerate reading
block_size : int, optional
How much bytes to process at a time from the input stream.
This will determine multi-threading granularity as well as
the size of individual chunks in the Table.
skip_rows: int, optional (default 0)
The number of rows to skip before the column names (if any)
and the CSV data.
column_names: list, optional
The column names of the target table. If empty, fall back on
autogenerate_column_names: bool, optional (default False)
Whether to autogenerate column names if `column_names` is empty.
If true, column names will be of the form "f0", "f1"...
If false, column names will be read from the first CSV row
after `skip_rows`.
encoding: str, optional (default 'utf8')
The character encoding of the CSV data. Columns that cannot
decode using this encoding can still be read as Binary.
# Avoid mistakingly creating attributes
__slots__ = ()
def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
column_names=None, autogenerate_column_names=None,
self.options = CCSVReadOptions.Defaults()
if use_threads is not None:
self.use_threads = use_threads
if block_size is not None:
self.block_size = block_size
if skip_rows is not None:
self.skip_rows = skip_rows
if column_names is not None:
self.column_names = column_names
if autogenerate_column_names is not None:
self.autogenerate_column_names= autogenerate_column_names
# Python-specific option
self.encoding = encoding
def use_threads(self):
Whether to use multiple threads to accelerate reading.
return self.options.use_threads
def use_threads(self, value):
self.options.use_threads = value
def block_size(self):
How much bytes to process at a time from the input stream.
This will determine multi-threading granularity as well as
the size of individual chunks in the Table.
return self.options.block_size
def block_size(self, value):
self.options.block_size = value
def skip_rows(self):
The number of rows to skip before the column names (if any)
and the CSV data.
return self.options.skip_rows
def skip_rows(self, value):
self.options.skip_rows = value
def column_names(self):
The column names of the target table. If empty, fall back on
return [frombytes(s) for s in self.options.column_names]
def column_names(self, value):
for item in value:
def autogenerate_column_names(self):
Whether to autogenerate column names if `column_names` is empty.
If true, column names will be of the form "f0", "f1"...
If false, column names will be read from the first CSV row
after `skip_rows`.
return self.options.autogenerate_column_names
def autogenerate_column_names(self, value):
self.options.autogenerate_column_names = value
def equals(self, ReadOptions other):
return (
self.use_threads == other.use_threads and
self.block_size == other.block_size and
self.skip_rows == other.skip_rows and
self.column_names == other.column_names and
self.autogenerate_column_names ==
other.autogenerate_column_names and
self.encoding == other.encoding
cdef ReadOptions wrap(CCSVReadOptions options):
out = ReadOptions()
out.options = options
out.encoding = 'utf8' # No way to know this
return out
def __getstate__(self):
return (self.use_threads, self.block_size, self.skip_rows,
self.column_names, self.autogenerate_column_names,
def __setstate__(self, state):
(self.use_threads, self.block_size, self.skip_rows,
self.column_names, self.autogenerate_column_names,
self.encoding) = state
def __eq__(self, other):
return self.equals(other)
except TypeError:
return False
cdef class ParseOptions(_Weakrefable):
Options for parsing CSV files.
delimiter: 1-character string, optional (default ',')
The character delimiting individual cells in the CSV data.
quote_char: 1-character string or False, optional (default '"')
The character used optionally for quoting CSV values
(False if quoting is not allowed).
double_quote: bool, optional (default True)
Whether two quotes in a quoted CSV value denote a single quote
in the data.
escape_char: 1-character string or False, optional (default False)
The character used optionally for escaping special characters
(False if escaping is not allowed).
newlines_in_values: bool, optional (default False)
Whether newline characters are allowed in CSV values.
Setting this to True reduces the performance of multi-threaded
CSV reading.
ignore_empty_lines: bool, optional (default True)
Whether empty lines are ignored in CSV input.
If False, an empty line is interpreted as containing a single empty
value (assuming a one-column CSV file).
__slots__ = ()
def __init__(self, *, delimiter=None, quote_char=None, double_quote=None,
escape_char=None, newlines_in_values=None,
self.options = CCSVParseOptions.Defaults()
if delimiter is not None:
self.delimiter = delimiter
if quote_char is not None:
self.quote_char = quote_char
if double_quote is not None:
self.double_quote = double_quote
if escape_char is not None:
self.escape_char = escape_char
if newlines_in_values is not None:
self.newlines_in_values = newlines_in_values
if ignore_empty_lines is not None:
self.ignore_empty_lines = ignore_empty_lines
def delimiter(self):
The character delimiting individual cells in the CSV data.
return chr(self.options.delimiter)
def delimiter(self, value):
self.options.delimiter = _single_char(value)
def quote_char(self):
The character used optionally for quoting CSV values
(False if quoting is not allowed).
if self.options.quoting:
return chr(self.options.quote_char)
return False
def quote_char(self, value):
if value is False:
self.options.quoting = False
self.options.quote_char = _single_char(value)
self.options.quoting = True
def double_quote(self):
Whether two quotes in a quoted CSV value denote a single quote
in the data.
return self.options.double_quote
def double_quote(self, value):
self.options.double_quote = value
def escape_char(self):
The character used optionally for escaping special characters
(False if escaping is not allowed).
if self.options.escaping:
return chr(self.options.escape_char)
return False
def escape_char(self, value):
if value is False:
self.options.escaping = False
self.options.escape_char = _single_char(value)
self.options.escaping = True
def newlines_in_values(self):
Whether newline characters are allowed in CSV values.
Setting this to True reduces the performance of multi-threaded
CSV reading.
return self.options.newlines_in_values
def newlines_in_values(self, value):
self.options.newlines_in_values = value
def ignore_empty_lines(self):
Whether empty lines are ignored in CSV input.
If False, an empty line is interpreted as containing a single empty
value (assuming a one-column CSV file).
return self.options.ignore_empty_lines
def ignore_empty_lines(self, value):
self.options.ignore_empty_lines = value
def equals(self, ParseOptions other):
return (
self.delimiter == other.delimiter and
self.quote_char == other.quote_char and
self.double_quote == other.double_quote and
self.escape_char == other.escape_char and
self.newlines_in_values == other.newlines_in_values and
self.ignore_empty_lines == other.ignore_empty_lines
cdef ParseOptions wrap(CCSVParseOptions options):
out = ParseOptions()
out.options = options
return out
def __getstate__(self):
return (self.delimiter, self.quote_char, self.double_quote,
self.escape_char, self.newlines_in_values,
def __setstate__(self, state):
(self.delimiter, self.quote_char, self.double_quote,
self.escape_char, self.newlines_in_values,
self.ignore_empty_lines) = state
def __eq__(self, other):
return self.equals(other)
except TypeError:
return False
cdef class _ISO8601(_Weakrefable):
A special object indicating ISO-8601 parsing.
__slots__ = ()
def __str__(self):
return 'ISO8601'
def __eq__(self, other):
return isinstance(other, _ISO8601)
ISO8601 = _ISO8601()
cdef class ConvertOptions(_Weakrefable):
Options for converting CSV data.
check_utf8 : bool, optional (default True)
Whether to check UTF8 validity of string columns.
column_types: pa.Schema or dict, optional
Explicitly map column names to column types. Passing this argument
disables type inference on the defined columns.
null_values: list, optional
A sequence of strings that denote nulls in the data
(defaults are appropriate in most cases). Note that by default,
string columns are not checked for null values. To enable
null checking for those, specify ``strings_can_be_null=True``.
true_values: list, optional
A sequence of strings that denote true booleans in the data
(defaults are appropriate in most cases).
false_values: list, optional
A sequence of strings that denote false booleans in the data
(defaults are appropriate in most cases).
timestamp_parsers: list, optional
A sequence of strptime()-compatible format strings, tried in order
when attempting to infer or convert timestamp values (the special
value ISO8601() can also be given). By default, a fast built-in
ISO-8601 parser is used.
strings_can_be_null: bool, optional (default False)
Whether string / binary columns can have null values.
If true, then strings in null_values are considered null for
string columns.
If false, then all strings are valid string values.
auto_dict_encode: bool, optional (default False)
Whether to try to automatically dict-encode string / binary data.
If true, then when type inference detects a string or binary column,
it it dict-encoded up to `auto_dict_max_cardinality` distinct values
(per chunk), after which it switches to regular encoding.
This setting is ignored for non-inferred columns (those in
auto_dict_max_cardinality: int, optional
The maximum dictionary cardinality for `auto_dict_encode`.
This value is per chunk.
include_columns: list, optional
The names of columns to include in the Table.
If empty, the Table will include all columns from the CSV file.
If not empty, only these columns will be included, in this order.
include_missing_columns: bool, optional (default False)
If false, columns in `include_columns` but not in the CSV file will
error out.
If true, columns in `include_columns` but not in the CSV file will
produce a column of nulls (whose type is selected using
`column_types`, or null by default).
This option is ignored if `include_columns` is empty.
# Avoid mistakingly creating attributes
__slots__ = ()
def __init__(self, *, check_utf8=None, column_types=None, null_values=None,
true_values=None, false_values=None,
strings_can_be_null=None, include_columns=None,
include_missing_columns=None, auto_dict_encode=None,
auto_dict_max_cardinality=None, timestamp_parsers=None):
self.options = CCSVConvertOptions.Defaults()
if check_utf8 is not None:
self.check_utf8 = check_utf8
if column_types is not None:
self.column_types = column_types
if null_values is not None:
self.null_values = null_values
if true_values is not None:
self.true_values = true_values
if false_values is not None:
self.false_values = false_values
if strings_can_be_null is not None:
self.strings_can_be_null = strings_can_be_null
if include_columns is not None:
self.include_columns = include_columns
if include_missing_columns is not None:
self.include_missing_columns = include_missing_columns
if auto_dict_encode is not None:
self.auto_dict_encode = auto_dict_encode
if auto_dict_max_cardinality is not None:
self.auto_dict_max_cardinality = auto_dict_max_cardinality
if timestamp_parsers is not None:
self.timestamp_parsers = timestamp_parsers
def check_utf8(self):
Whether to check UTF8 validity of string columns.
return self.options.check_utf8
def check_utf8(self, value):
self.options.check_utf8 = value
def strings_can_be_null(self):
Whether string / binary columns can have null values.
return self.options.strings_can_be_null
def strings_can_be_null(self, value):
self.options.strings_can_be_null = value
def column_types(self):
Explicitly map column names to column types.
d = {frombytes(item.first): pyarrow_wrap_data_type(item.second)
for item in self.options.column_types}
return d
def column_types(self, value):
shared_ptr[CDataType] typ
if isinstance(value, Mapping):
value = value.items()
for item in value:
if isinstance(item, Field):
k =
v = item.type
k, v = item
typ = pyarrow_unwrap_data_type(ensure_type(v))
assert typ != NULL
self.options.column_types[tobytes(k)] = typ
def null_values(self):
A sequence of strings that denote nulls in the data.
return [frombytes(x) for x in self.options.null_values]
def null_values(self, value):
self.options.null_values = [tobytes(x) for x in value]
def true_values(self):
A sequence of strings that denote true booleans in the data.
return [frombytes(x) for x in self.options.true_values]
def true_values(self, value):
self.options.true_values = [tobytes(x) for x in value]
def false_values(self):
A sequence of strings that denote false booleans in the data.
return [frombytes(x) for x in self.options.false_values]
def false_values(self, value):
self.options.false_values = [tobytes(x) for x in value]
def auto_dict_encode(self):
Whether to try to automatically dict-encode string / binary data.
return self.options.auto_dict_encode
def auto_dict_encode(self, value):
self.options.auto_dict_encode = value
def auto_dict_max_cardinality(self):
The maximum dictionary cardinality for `auto_dict_encode`.
This value is per chunk.
return self.options.auto_dict_max_cardinality
def auto_dict_max_cardinality(self, value):
self.options.auto_dict_max_cardinality = value
def include_columns(self):
The names of columns to include in the Table.
If empty, the Table will include all columns from the CSV file.
If not empty, only these columns will be included, in this order.
return [frombytes(s) for s in self.options.include_columns]
def include_columns(self, value):
for item in value:
def include_missing_columns(self):
If false, columns in `include_columns` but not in the CSV file will
error out.
If true, columns in `include_columns` but not in the CSV file will
produce a null column (whose type is selected using `column_types`,
or null by default).
This option is ignored if `include_columns` is empty.
return self.options.include_missing_columns
def include_missing_columns(self, value):
self.options.include_missing_columns = value
def timestamp_parsers(self):
A sequence of strptime()-compatible format strings, tried in order
when attempting to infer or convert timestamp values (the special
value ISO8601() can also be given). By default, a fast built-in
ISO-8601 parser is used.
shared_ptr[CTimestampParser] c_parser
c_string kind
parsers = []
for c_parser in self.options.timestamp_parsers:
kind = deref(c_parser).kind()
if kind == b'strptime':
assert kind == b'iso8601'
return parsers
def timestamp_parsers(self, value):
vector[shared_ptr[CTimestampParser]] c_parsers
for v in value:
if isinstance(v, str):
elif v == ISO8601:
raise TypeError("Expected list of str or ISO8601 objects")
self.options.timestamp_parsers = move(c_parsers)
cdef ConvertOptions wrap(CCSVConvertOptions options):
out = ConvertOptions()
out.options = options
return out
def equals(self, ConvertOptions other):
return (
self.check_utf8 == other.check_utf8 and
self.column_types == other.column_types and
self.null_values == other.null_values and
self.true_values == other.true_values and
self.false_values == other.false_values and
self.timestamp_parsers == other.timestamp_parsers and
self.strings_can_be_null == other.strings_can_be_null and
self.auto_dict_encode == other.auto_dict_encode and
self.auto_dict_max_cardinality ==
other.auto_dict_max_cardinality and
self.include_columns == other.include_columns and
self.include_missing_columns == other.include_missing_columns
def __getstate__(self):
return (self.check_utf8, self.column_types, self.null_values,
self.true_values, self.false_values, self.timestamp_parsers,
self.strings_can_be_null, self.auto_dict_encode,
self.auto_dict_max_cardinality, self.include_columns,
def __setstate__(self, state):
(self.check_utf8, self.column_types, self.null_values,
self.true_values, self.false_values, self.timestamp_parsers,
self.strings_can_be_null, self.auto_dict_encode,
self.auto_dict_max_cardinality, self.include_columns,
self.include_missing_columns) = state
def __eq__(self, other):
return self.equals(other)
except TypeError:
return False
cdef _get_reader(input_file, ReadOptions read_options,
shared_ptr[CInputStream]* out):
use_memory_map = False
get_input_stream(input_file, use_memory_map, out)
if read_options is not None:
out[0] = native_transcoding_input_stream(out[0],
cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
if read_options is None:
out[0] = CCSVReadOptions.Defaults()
out[0] = read_options.options
cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out):
if parse_options is None:
out[0] = CCSVParseOptions.Defaults()
out[0] = parse_options.options
cdef _get_convert_options(ConvertOptions convert_options,
CCSVConvertOptions* out):
if convert_options is None:
out[0] = CCSVConvertOptions.Defaults()
out[0] = convert_options.options
cdef class CSVStreamingReader(RecordBatchReader):
"""An object that reads record batches incrementally from a CSV file.
Should not be instantiated directly by user code.
cdef readonly:
Schema schema
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, "
"use pyarrow.csv.open_csv() instead."
# Note about cancellation: we cannot create a SignalStopHandler
# by default here, as several CSVStreamingReader instances may be
# created (including by the same thread). Handling cancellation
# would require having the user pass the SignalStopHandler.
# (in addition to solving ARROW-11853)
cdef _open(self, shared_ptr[CInputStream] stream,
CCSVReadOptions c_read_options,
CCSVParseOptions c_parse_options,
CCSVConvertOptions c_convert_options,
MemoryPool memory_pool):
shared_ptr[CSchema] c_schema
CIOContext io_context
io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))
with nogil:
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
io_context, stream,
move(c_read_options), move(c_parse_options),
c_schema = self.reader.get().schema()
self.schema = pyarrow_wrap_schema(c_schema)
def read_csv(input_file, read_options=None, parse_options=None,
convert_options=None, MemoryPool memory_pool=None):
Read a Table from a stream of CSV data.
input_file: string, path or file-like object
The location of CSV data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options: pyarrow.csv.ReadOptions, optional
Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
for defaults)
parse_options: pyarrow.csv.ParseOptions, optional
Options for the CSV parser
(see pyarrow.csv.ParseOptions constructor for defaults)
convert_options: pyarrow.csv.ConvertOptions, optional
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool: MemoryPool, optional
Pool to allocate Table memory from
Contents of the CSV file as a in-memory table.
shared_ptr[CInputStream] stream
CCSVReadOptions c_read_options
CCSVParseOptions c_parse_options
CCSVConvertOptions c_convert_options
CIOContext io_context
shared_ptr[CCSVReader] reader
shared_ptr[CTable] table
_get_reader(input_file, read_options, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)
_get_convert_options(convert_options, &c_convert_options)
with SignalStopHandler() as stop_handler:
io_context = CIOContext(
(<StopToken> stop_handler.stop_token).stop_token)
reader = GetResultValue(CCSVReader.Make(
io_context, stream,
c_read_options, c_parse_options, c_convert_options))
with nogil:
table = GetResultValue(reader.get().Read())
return pyarrow_wrap_table(table)
def open_csv(input_file, read_options=None, parse_options=None,
convert_options=None, MemoryPool memory_pool=None):
Open a streaming reader of CSV data.
Reading using this function is always single-threaded.
input_file: string, path or file-like object
The location of CSV data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options: pyarrow.csv.ReadOptions, optional
Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
for defaults)
parse_options: pyarrow.csv.ParseOptions, optional
Options for the CSV parser
(see pyarrow.csv.ParseOptions constructor for defaults)
convert_options: pyarrow.csv.ConvertOptions, optional
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool: MemoryPool, optional
Pool to allocate Table memory from
shared_ptr[CInputStream] stream
CCSVReadOptions c_read_options
CCSVParseOptions c_parse_options
CCSVConvertOptions c_convert_options
CSVStreamingReader reader
_get_reader(input_file, read_options, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)
_get_convert_options(convert_options, &c_convert_options)
reader = CSVStreamingReader.__new__(CSVStreamingReader)
reader._open(stream, move(c_read_options), move(c_parse_options),
move(c_convert_options), memory_pool)
return reader
cdef class WriteOptions(_Weakrefable):
Options for writing CSV files.
include_header : bool, optional (default True)
Whether to write an initial header line with column names
batch_size : int, optional (default 1024)
How many rows to process together when converting and writing
CSV data
CCSVWriteOptions options
# Avoid mistakingly creating attributes
__slots__ = ()
def __init__(self, *, include_header=None, batch_size=None):
self.options = CCSVWriteOptions.Defaults()
if include_header is not None:
self.include_header = include_header
if batch_size is not None:
self.batch_size = batch_size
def include_header(self):
Whether to write an initial header line with column names.
return self.options.include_header
def include_header(self, value):
self.options.include_header = value
def batch_size(self):
How many rows to process together when converting and writing
CSV data.
return self.options.batch_size
def batch_size(self, value):
self.options.batch_size = value
cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
if write_options is None:
out[0] = CCSVWriteOptions.Defaults()
out[0] = write_options.options
def write_csv(data, output_file, write_options=None,
MemoryPool memory_pool=None):
Write record batch or table to a CSV file.
data: pyarrow.RecordBatch or pyarrow.Table
The data to write.
output_file: string, path, pyarrow.OutputStream or file-like object
The location where to write the CSV data.
write_options: pyarrow.csv.WriteOptions
Options to configure writing the CSV data.
memory_pool: MemoryPool, optional
Pool for temporary allocations.
shared_ptr[COutputStream] stream
CCSVWriteOptions c_write_options
CMemoryPool* c_memory_pool
CRecordBatch* batch
CTable* table
_get_write_options(write_options, &c_write_options)
get_writer(output_file, &stream)
c_memory_pool = maybe_unbox_memory_pool(memory_pool)
if isinstance(data, RecordBatch):
batch = pyarrow_unwrap_batch(data).get()
with nogil:
check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
elif isinstance(data, Table):
table = pyarrow_unwrap_table(data).get()
with nogil:
check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")