| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import bz2 |
| from datetime import datetime |
| from decimal import Decimal |
| import gc |
| import gzip |
| import io |
| import itertools |
| import os |
| import pickle |
| import shutil |
| import string |
| import tempfile |
| import time |
| import unittest |
| |
| import pytest |
| |
| import numpy as np |
| |
| import pyarrow as pa |
| from pyarrow.csv import ( |
| open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601) |
| |
| |
| def generate_col_names(): |
| # 'a', 'b'... 'z', then 'aa', 'ab'... |
| letters = string.ascii_lowercase |
| yield from letters |
| for first in letters: |
| for second in letters: |
| yield first + second |
| |
| |
| def make_random_csv(num_cols=2, num_rows=10, linesep='\r\n'): |
| arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) |
| col_names = list(itertools.islice(generate_col_names(), num_cols)) |
| csv = io.StringIO() |
| csv.write(",".join(col_names)) |
| csv.write(linesep) |
| for row in arr.T: |
| csv.write(",".join(map(str, row))) |
| csv.write(linesep) |
| csv = csv.getvalue().encode() |
| columns = [pa.array(a, type=pa.int64()) for a in arr] |
| expected = pa.Table.from_arrays(columns, col_names) |
| return csv, expected |
| |
| |
| def make_empty_csv(column_names): |
| csv = io.StringIO() |
| csv.write(",".join(column_names)) |
| csv.write("\n") |
| return csv.getvalue().encode() |
| |
| |
| def check_options_class(cls, **attr_values): |
| """ |
| Check setting and getting attributes of an *Options class. |
| """ |
| opts = cls() |
| |
| for name, values in attr_values.items(): |
| assert getattr(opts, name) == values[0], \ |
| "incorrect default value for " + name |
| for v in values: |
| setattr(opts, name, v) |
| assert getattr(opts, name) == v, "failed setting value" |
| |
| with pytest.raises(AttributeError): |
| opts.zzz_non_existent = True |
| |
| # Check constructor named arguments |
| non_defaults = {name: values[1] for name, values in attr_values.items()} |
| opts = cls(**non_defaults) |
| for name, value in non_defaults.items(): |
| assert getattr(opts, name) == value |
| |
| |
| def check_options_class_pickling(cls, **attr_values): |
| opts = cls(**attr_values) |
| new_opts = pickle.loads(pickle.dumps(opts, |
| protocol=pickle.HIGHEST_PROTOCOL)) |
| for name, value in attr_values.items(): |
| assert getattr(new_opts, name) == value |
| |
| |
| def test_read_options(): |
| cls = ReadOptions |
| opts = cls() |
| |
| check_options_class(cls, use_threads=[True, False], |
| skip_rows=[0, 3], |
| column_names=[[], ["ab", "cd"]], |
| autogenerate_column_names=[False, True], |
| encoding=['utf8', 'utf16']) |
| |
| assert opts.block_size > 0 |
| opts.block_size = 12345 |
| assert opts.block_size == 12345 |
| |
| opts = cls(block_size=1234) |
| assert opts.block_size == 1234 |
| |
| |
| def test_parse_options(): |
| cls = ParseOptions |
| |
| check_options_class(cls, delimiter=[',', 'x'], |
| escape_char=[False, 'y'], |
| quote_char=['"', 'z', False], |
| double_quote=[True, False], |
| newlines_in_values=[False, True], |
| ignore_empty_lines=[True, False]) |
| |
| # ParseOptions needs to be picklable for dataset |
| check_options_class_pickling(cls, delimiter='x', |
| escape_char='y', |
| quote_char=False, |
| double_quote=False, |
| newlines_in_values=True, |
| ignore_empty_lines=False) |
| |
| |
| def test_convert_options(): |
| cls = ConvertOptions |
| opts = cls() |
| |
| check_options_class( |
| cls, check_utf8=[True, False], |
| strings_can_be_null=[False, True], |
| include_columns=[[], ['def', 'abc']], |
| include_missing_columns=[False, True], |
| auto_dict_encode=[False, True], |
| timestamp_parsers=[[], [ISO8601, '%y-%m']]) |
| |
| assert opts.auto_dict_max_cardinality > 0 |
| opts.auto_dict_max_cardinality = 99999 |
| assert opts.auto_dict_max_cardinality == 99999 |
| |
| assert opts.column_types == {} |
| # Pass column_types as mapping |
| opts.column_types = {'b': pa.int16(), 'c': pa.float32()} |
| assert opts.column_types == {'b': pa.int16(), 'c': pa.float32()} |
| opts.column_types = {'v': 'int16', 'w': 'null'} |
| assert opts.column_types == {'v': pa.int16(), 'w': pa.null()} |
| # Pass column_types as schema |
| schema = pa.schema([('a', pa.int32()), ('b', pa.string())]) |
| opts.column_types = schema |
| assert opts.column_types == {'a': pa.int32(), 'b': pa.string()} |
| # Pass column_types as sequence |
| opts.column_types = [('x', pa.binary())] |
| assert opts.column_types == {'x': pa.binary()} |
| |
| with pytest.raises(TypeError, match='DataType expected'): |
| opts.column_types = {'a': None} |
| with pytest.raises(TypeError): |
| opts.column_types = 0 |
| |
| assert isinstance(opts.null_values, list) |
| assert '' in opts.null_values |
| assert 'N/A' in opts.null_values |
| opts.null_values = ['xxx', 'yyy'] |
| assert opts.null_values == ['xxx', 'yyy'] |
| |
| assert isinstance(opts.true_values, list) |
| opts.true_values = ['xxx', 'yyy'] |
| assert opts.true_values == ['xxx', 'yyy'] |
| |
| assert isinstance(opts.false_values, list) |
| opts.false_values = ['xxx', 'yyy'] |
| assert opts.false_values == ['xxx', 'yyy'] |
| |
| assert opts.timestamp_parsers == [] |
| opts.timestamp_parsers = [ISO8601] |
| assert opts.timestamp_parsers == [ISO8601] |
| |
| opts = cls(column_types={'a': pa.null()}, |
| null_values=['N', 'nn'], true_values=['T', 'tt'], |
| false_values=['F', 'ff'], auto_dict_max_cardinality=999, |
| timestamp_parsers=[ISO8601, '%Y-%m-%d']) |
| assert opts.column_types == {'a': pa.null()} |
| assert opts.null_values == ['N', 'nn'] |
| assert opts.false_values == ['F', 'ff'] |
| assert opts.true_values == ['T', 'tt'] |
| assert opts.auto_dict_max_cardinality == 999 |
| assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d'] |
| |
| |
| class BaseTestCSVRead: |
| |
| def read_bytes(self, b, **kwargs): |
| return self.read_csv(pa.py_buffer(b), **kwargs) |
| |
| def check_names(self, table, names): |
| assert table.num_columns == len(names) |
| assert table.column_names == names |
| |
| def test_file_object(self): |
| data = b"a,b\n1,2\n" |
| expected_data = {'a': [1], 'b': [2]} |
| bio = io.BytesIO(data) |
| table = self.read_csv(bio) |
| assert table.to_pydict() == expected_data |
| # Text files not allowed |
| sio = io.StringIO(data.decode()) |
| with pytest.raises(TypeError): |
| self.read_csv(sio) |
| |
| def test_header(self): |
| rows = b"abc,def,gh\n" |
| table = self.read_bytes(rows) |
| assert isinstance(table, pa.Table) |
| self.check_names(table, ["abc", "def", "gh"]) |
| assert table.num_rows == 0 |
| |
| def test_bom(self): |
| rows = b"\xef\xbb\xbfa,b\n1,2\n" |
| expected_data = {'a': [1], 'b': [2]} |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == expected_data |
| |
| def test_one_chunk(self): |
| # ARROW-7661: lack of newline at end of file should not produce |
| # an additional chunk. |
| rows = [b"a,b", b"1,2", b"3,4", b"56,78"] |
| for line_ending in [b'\n', b'\r', b'\r\n']: |
| for file_ending in [b'', line_ending]: |
| data = line_ending.join(rows) + file_ending |
| table = self.read_bytes(data) |
| assert len(table.to_batches()) == 1 |
| assert table.to_pydict() == { |
| "a": [1, 3, 56], |
| "b": [2, 4, 78], |
| } |
| |
| def test_header_skip_rows(self): |
| rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" |
| |
| opts = ReadOptions() |
| opts.skip_rows = 1 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["ef", "gh"]) |
| assert table.to_pydict() == { |
| "ef": ["ij", "mn"], |
| "gh": ["kl", "op"], |
| } |
| |
| opts.skip_rows = 3 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["mn", "op"]) |
| assert table.to_pydict() == { |
| "mn": [], |
| "op": [], |
| } |
| |
| opts.skip_rows = 4 |
| with pytest.raises(pa.ArrowInvalid): |
| # Not enough rows |
| table = self.read_bytes(rows, read_options=opts) |
| |
| # Can skip rows with a different number of columns |
| rows = b"abcd\n,,,,,\nij,kl\nmn,op\n" |
| opts.skip_rows = 2 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["ij", "kl"]) |
| assert table.to_pydict() == { |
| "ij": ["mn"], |
| "kl": ["op"], |
| } |
| |
| def test_header_column_names(self): |
| rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" |
| |
| opts = ReadOptions() |
| opts.column_names = ["x", "y"] |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["x", "y"]) |
| assert table.to_pydict() == { |
| "x": ["ab", "ef", "ij", "mn"], |
| "y": ["cd", "gh", "kl", "op"], |
| } |
| |
| opts.skip_rows = 3 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["x", "y"]) |
| assert table.to_pydict() == { |
| "x": ["mn"], |
| "y": ["op"], |
| } |
| |
| opts.skip_rows = 4 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["x", "y"]) |
| assert table.to_pydict() == { |
| "x": [], |
| "y": [], |
| } |
| |
| opts.skip_rows = 5 |
| with pytest.raises(pa.ArrowInvalid): |
| # Not enough rows |
| table = self.read_bytes(rows, read_options=opts) |
| |
| # Unexpected number of columns |
| opts.skip_rows = 0 |
| opts.column_names = ["x", "y", "z"] |
| with pytest.raises(pa.ArrowInvalid, |
| match="Expected 3 columns, got 2"): |
| table = self.read_bytes(rows, read_options=opts) |
| |
| # Can skip rows with a different number of columns |
| rows = b"abcd\n,,,,,\nij,kl\nmn,op\n" |
| opts.skip_rows = 2 |
| opts.column_names = ["x", "y"] |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["x", "y"]) |
| assert table.to_pydict() == { |
| "x": ["ij", "mn"], |
| "y": ["kl", "op"], |
| } |
| |
| def test_header_autogenerate_column_names(self): |
| rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" |
| |
| opts = ReadOptions() |
| opts.autogenerate_column_names = True |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["f0", "f1"]) |
| assert table.to_pydict() == { |
| "f0": ["ab", "ef", "ij", "mn"], |
| "f1": ["cd", "gh", "kl", "op"], |
| } |
| |
| opts.skip_rows = 3 |
| table = self.read_bytes(rows, read_options=opts) |
| self.check_names(table, ["f0", "f1"]) |
| assert table.to_pydict() == { |
| "f0": ["mn"], |
| "f1": ["op"], |
| } |
| |
| # Not enough rows, impossible to infer number of columns |
| opts.skip_rows = 4 |
| with pytest.raises(pa.ArrowInvalid): |
| table = self.read_bytes(rows, read_options=opts) |
| |
| def test_include_columns(self): |
| rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" |
| |
| convert_options = ConvertOptions() |
| convert_options.include_columns = ['ab'] |
| table = self.read_bytes(rows, convert_options=convert_options) |
| self.check_names(table, ["ab"]) |
| assert table.to_pydict() == { |
| "ab": ["ef", "ij", "mn"], |
| } |
| |
| # Order of include_columns is respected, regardless of CSV order |
| convert_options.include_columns = ['cd', 'ab'] |
| table = self.read_bytes(rows, convert_options=convert_options) |
| schema = pa.schema([('cd', pa.string()), |
| ('ab', pa.string())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| "cd": ["gh", "kl", "op"], |
| "ab": ["ef", "ij", "mn"], |
| } |
| |
| # Include a column not in the CSV file => raises by default |
| convert_options.include_columns = ['xx', 'ab', 'yy'] |
| with pytest.raises(KeyError, |
| match="Column 'xx' in include_columns " |
| "does not exist in CSV file"): |
| self.read_bytes(rows, convert_options=convert_options) |
| |
| def test_include_missing_columns(self): |
| rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" |
| |
| read_options = ReadOptions() |
| convert_options = ConvertOptions() |
| convert_options.include_columns = ['xx', 'ab', 'yy'] |
| convert_options.include_missing_columns = True |
| table = self.read_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| schema = pa.schema([('xx', pa.null()), |
| ('ab', pa.string()), |
| ('yy', pa.null())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| "xx": [None, None, None], |
| "ab": ["ef", "ij", "mn"], |
| "yy": [None, None, None], |
| } |
| |
| # Combining with `column_names` |
| read_options.column_names = ["xx", "yy"] |
| convert_options.include_columns = ["yy", "cd"] |
| table = self.read_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| schema = pa.schema([('yy', pa.string()), |
| ('cd', pa.null())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| "yy": ["cd", "gh", "kl", "op"], |
| "cd": [None, None, None, None], |
| } |
| |
| # And with `column_types` as well |
| convert_options.column_types = {"yy": pa.binary(), |
| "cd": pa.int32()} |
| table = self.read_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| schema = pa.schema([('yy', pa.binary()), |
| ('cd', pa.int32())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| "yy": [b"cd", b"gh", b"kl", b"op"], |
| "cd": [None, None, None, None], |
| } |
| |
| def test_simple_ints(self): |
| # Infer integer columns |
| rows = b"a,b,c\n1,2,3\n4,5,6\n" |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64()), |
| ('c', pa.int64())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1, 4], |
| 'b': [2, 5], |
| 'c': [3, 6], |
| } |
| |
| def test_simple_varied(self): |
| # Infer various kinds of data |
| rows = b"a,b,c,d\n1,2,3,0\n4.0,-5,foo,True\n" |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.float64()), |
| ('b', pa.int64()), |
| ('c', pa.string()), |
| ('d', pa.bool_())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1.0, 4.0], |
| 'b': [2, -5], |
| 'c': ["3", "foo"], |
| 'd': [False, True], |
| } |
| |
| def test_simple_nulls(self): |
| # Infer various kinds of data, with nulls |
| rows = (b"a,b,c,d,e,f\n" |
| b"1,2,,,3,N/A\n" |
| b"nan,-5,foo,,nan,TRUE\n" |
| b"4.5,#N/A,nan,,\xff,false\n") |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.float64()), |
| ('b', pa.int64()), |
| ('c', pa.string()), |
| ('d', pa.null()), |
| ('e', pa.binary()), |
| ('f', pa.bool_())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1.0, None, 4.5], |
| 'b': [2, -5, None], |
| 'c': ["", "foo", "nan"], |
| 'd': [None, None, None], |
| 'e': [b"3", b"nan", b"\xff"], |
| 'f': [None, True, False], |
| } |
| |
| def test_simple_timestamps(self): |
| # Infer a timestamp column |
| rows = b"a,b\n1970,1970-01-01\n1989,1989-07-14\n" |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.int64()), |
| ('b', pa.timestamp('s'))]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1970, 1989], |
| 'b': [datetime(1970, 1, 1), datetime(1989, 7, 14)], |
| } |
| |
| def test_timestamp_parsers(self): |
| # Infer timestamps with custom parsers |
| rows = b"a,b\n1970/01/01,1980-01-01\n1970/01/02,1980-01-02\n" |
| opts = ConvertOptions() |
| |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.string()), |
| ('b', pa.timestamp('s'))]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': ['1970/01/01', '1970/01/02'], |
| 'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)], |
| } |
| |
| opts.timestamp_parsers = ['%Y/%m/%d'] |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.timestamp('s')), |
| ('b', pa.string())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)], |
| 'b': ['1980-01-01', '1980-01-02'], |
| } |
| |
| opts.timestamp_parsers = ['%Y/%m/%d', ISO8601] |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.timestamp('s')), |
| ('b', pa.timestamp('s'))]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)], |
| 'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)], |
| } |
| |
| def test_auto_dict_encode(self): |
| opts = ConvertOptions(auto_dict_encode=True) |
| rows = "a,b\nab,1\ncdé,2\ncdé,3\nab,4".encode() |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.string())), |
| ('b', pa.int64())]) |
| expected = { |
| 'a': ["ab", "cdé", "cdé", "ab"], |
| 'b': [1, 2, 3, 4], |
| } |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| |
| opts.auto_dict_max_cardinality = 2 |
| table = self.read_bytes(rows, convert_options=opts) |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| |
| # Cardinality above max => plain-encoded |
| opts.auto_dict_max_cardinality = 1 |
| table = self.read_bytes(rows, convert_options=opts) |
| assert table.schema == pa.schema([('a', pa.string()), |
| ('b', pa.int64())]) |
| assert table.to_pydict() == expected |
| |
| # With invalid UTF8, not checked |
| opts.auto_dict_max_cardinality = 50 |
| opts.check_utf8 = False |
| rows = b"a,b\nab,1\ncd\xff,2\nab,3" |
| table = self.read_bytes(rows, convert_options=opts, |
| validate_full=False) |
| assert table.schema == schema |
| dict_values = table['a'].chunk(0).dictionary |
| assert len(dict_values) == 2 |
| assert dict_values[0].as_py() == "ab" |
| assert dict_values[1].as_buffer() == b"cd\xff" |
| |
| # With invalid UTF8, checked |
| opts.check_utf8 = True |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.binary())), |
| ('b', pa.int64())]) |
| expected = { |
| 'a': [b"ab", b"cd\xff", b"ab"], |
| 'b': [1, 2, 3], |
| } |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| |
| def test_custom_nulls(self): |
| # Infer nulls with custom values |
| opts = ConvertOptions(null_values=['Xxx', 'Zzz']) |
| rows = b"a,b,c,d\nZzz,Xxx,1,2\nXxx,#N/A,,Zzz\n" |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.null()), |
| ('b', pa.string()), |
| ('c', pa.string()), |
| ('d', pa.int64())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [None, None], |
| 'b': ["Xxx", "#N/A"], |
| 'c': ["1", ""], |
| 'd': [2, None], |
| } |
| |
| opts = ConvertOptions(null_values=['Xxx', 'Zzz'], |
| strings_can_be_null=True) |
| table = self.read_bytes(rows, convert_options=opts) |
| assert table.to_pydict() == { |
| 'a': [None, None], |
| 'b': [None, "#N/A"], |
| 'c': ["1", ""], |
| 'd': [2, None], |
| } |
| |
| opts = ConvertOptions(null_values=[]) |
| rows = b"a,b\n#N/A,\n" |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.string()), |
| ('b', pa.string())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': ["#N/A"], |
| 'b': [""], |
| } |
| |
| def test_custom_bools(self): |
| # Infer booleans with custom values |
| opts = ConvertOptions(true_values=['T', 'yes'], |
| false_values=['F', 'no']) |
| rows = (b"a,b,c\n" |
| b"True,T,t\n" |
| b"False,F,f\n" |
| b"True,yes,yes\n" |
| b"False,no,no\n" |
| b"N/A,N/A,N/A\n") |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.string()), |
| ('b', pa.bool_()), |
| ('c', pa.string())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': ["True", "False", "True", "False", "N/A"], |
| 'b': [True, False, True, False, None], |
| 'c': ["t", "f", "yes", "no", "N/A"], |
| } |
| |
| def test_column_types(self): |
| # Ask for specific column types in ConvertOptions |
| opts = ConvertOptions(column_types={'b': 'float32', |
| 'c': 'string', |
| 'd': 'boolean', |
| 'e': pa.decimal128(11, 2), |
| 'zz': 'null'}) |
| rows = b"a,b,c,d,e\n1,2,3,true,1.0\n4,-5,6,false,0\n" |
| table = self.read_bytes(rows, convert_options=opts) |
| schema = pa.schema([('a', pa.int64()), |
| ('b', pa.float32()), |
| ('c', pa.string()), |
| ('d', pa.bool_()), |
| ('e', pa.decimal128(11, 2))]) |
| expected = { |
| 'a': [1, 4], |
| 'b': [2.0, -5.0], |
| 'c': ["3", "6"], |
| 'd': [True, False], |
| 'e': [Decimal("1.00"), Decimal("0.00")] |
| } |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| # Pass column_types as schema |
| opts = ConvertOptions( |
| column_types=pa.schema([('b', pa.float32()), |
| ('c', pa.string()), |
| ('d', pa.bool_()), |
| ('e', pa.decimal128(11, 2)), |
| ('zz', pa.bool_())])) |
| table = self.read_bytes(rows, convert_options=opts) |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| # One of the columns in column_types fails converting |
| rows = b"a,b,c,d,e\n1,XXX,3,true,5\n4,-5,6,false,7\n" |
| with pytest.raises(pa.ArrowInvalid) as exc: |
| self.read_bytes(rows, convert_options=opts) |
| err = str(exc.value) |
| assert "In CSV column #1: " in err |
| assert "CSV conversion error to float: invalid value 'XXX'" in err |
| |
| def test_column_types_with_column_names(self): |
| # When both `column_names` and `column_types` are given, names |
| # in `column_types` should refer to names in `column_names` |
| rows = b"a,b\nc,d\ne,f\n" |
| read_options = ReadOptions(column_names=['x', 'y']) |
| convert_options = ConvertOptions(column_types={'x': pa.binary()}) |
| table = self.read_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| schema = pa.schema([('x', pa.binary()), |
| ('y', pa.string())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'x': [b'a', b'c', b'e'], |
| 'y': ['b', 'd', 'f'], |
| } |
| |
| def test_no_ending_newline(self): |
| # No \n after last line |
| rows = b"a,b,c\n1,2,3\n4,5,6" |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == { |
| 'a': [1, 4], |
| 'b': [2, 5], |
| 'c': [3, 6], |
| } |
| |
| def test_trivial(self): |
| # A bit pointless, but at least it shouldn't crash |
| rows = b",\n\n" |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == {'': []} |
| |
| def test_empty_lines(self): |
| rows = b"a,b\n\r1,2\r\n\r\n3,4\r\n" |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == { |
| 'a': [1, 3], |
| 'b': [2, 4], |
| } |
| parse_options = ParseOptions(ignore_empty_lines=False) |
| table = self.read_bytes(rows, parse_options=parse_options) |
| assert table.to_pydict() == { |
| 'a': [None, 1, None, 3], |
| 'b': [None, 2, None, 4], |
| } |
| read_options = ReadOptions(skip_rows=2) |
| table = self.read_bytes(rows, parse_options=parse_options, |
| read_options=read_options) |
| assert table.to_pydict() == { |
| '1': [None, 3], |
| '2': [None, 4], |
| } |
| |
| def test_invalid_csv(self): |
| # Various CSV errors |
| rows = b"a,b,c\n1,2\n4,5,6\n" |
| with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 2"): |
| self.read_bytes(rows) |
| rows = b"a,b,c\n1,2,3\n4" |
| with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 1"): |
| self.read_bytes(rows) |
| for rows in [b"", b"\n", b"\r\n", b"\r", b"\n\n"]: |
| with pytest.raises(pa.ArrowInvalid, match="Empty CSV file"): |
| self.read_bytes(rows) |
| |
| def test_options_delimiter(self): |
| rows = b"a;b,c\nde,fg;eh\n" |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == { |
| 'a;b': ['de'], |
| 'c': ['fg;eh'], |
| } |
| opts = ParseOptions(delimiter=';') |
| table = self.read_bytes(rows, parse_options=opts) |
| assert table.to_pydict() == { |
| 'a': ['de,fg'], |
| 'b,c': ['eh'], |
| } |
| |
| def test_small_random_csv(self): |
| csv, expected = make_random_csv(num_cols=2, num_rows=10) |
| table = self.read_bytes(csv) |
| assert table.schema == expected.schema |
| assert table.equals(expected) |
| assert table.to_pydict() == expected.to_pydict() |
| |
| def test_stress_block_sizes(self): |
| # Test a number of small block sizes to stress block stitching |
| csv_base, expected = make_random_csv(num_cols=2, num_rows=500) |
| block_sizes = [11, 12, 13, 17, 37, 111] |
| csvs = [csv_base, csv_base.rstrip(b'\r\n')] |
| for csv in csvs: |
| for block_size in block_sizes: |
| read_options = ReadOptions(block_size=block_size) |
| table = self.read_bytes(csv, read_options=read_options) |
| assert table.schema == expected.schema |
| if not table.equals(expected): |
| # Better error output |
| assert table.to_pydict() == expected.to_pydict() |
| |
| def test_stress_convert_options_blowup(self): |
| # ARROW-6481: A convert_options with a very large number of columns |
| # should not blow memory and CPU time. |
| try: |
| clock = time.thread_time |
| except AttributeError: |
| clock = time.time |
| num_columns = 10000 |
| col_names = ["K{}".format(i) for i in range(num_columns)] |
| csv = make_empty_csv(col_names) |
| t1 = clock() |
| convert_options = ConvertOptions( |
| column_types={k: pa.string() for k in col_names[::2]}) |
| table = self.read_bytes(csv, convert_options=convert_options) |
| dt = clock() - t1 |
| # Check that processing time didn't blow up. |
| # This is a conservative check (it takes less than 300 ms |
| # in debug mode on my local machine). |
| assert dt <= 10.0 |
| # Check result |
| assert table.num_columns == num_columns |
| assert table.num_rows == 0 |
| assert table.column_names == col_names |
| |
| |
| class TestSerialCSVRead(BaseTestCSVRead, unittest.TestCase): |
| |
| def read_csv(self, *args, validate_full=True, **kwargs): |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = False |
| table = read_csv(*args, **kwargs) |
| table.validate(full=validate_full) |
| return table |
| |
| |
| class TestParallelCSVRead(BaseTestCSVRead, unittest.TestCase): |
| |
| def read_csv(self, *args, validate_full=True, **kwargs): |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = True |
| table = read_csv(*args, **kwargs) |
| table.validate(full=validate_full) |
| return table |
| |
| |
| class BaseTestStreamingCSVRead: |
| |
| def open_bytes(self, b, **kwargs): |
| return self.open_csv(pa.py_buffer(b), **kwargs) |
| |
| def check_reader(self, reader, expected_schema, expected_data): |
| assert reader.schema == expected_schema |
| batches = list(reader) |
| assert len(batches) == len(expected_data) |
| for batch, expected_batch in zip(batches, expected_data): |
| batch.validate(full=True) |
| assert batch.schema == expected_schema |
| assert batch.to_pydict() == expected_batch |
| |
| def test_file_object(self): |
| data = b"a,b\n1,2\n3,4\n" |
| expected_data = {'a': [1, 3], 'b': [2, 4]} |
| bio = io.BytesIO(data) |
| reader = self.open_csv(bio) |
| expected_schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64())]) |
| self.check_reader(reader, expected_schema, [expected_data]) |
| |
| def test_header(self): |
| rows = b"abc,def,gh\n" |
| reader = self.open_bytes(rows) |
| expected_schema = pa.schema([('abc', pa.null()), |
| ('def', pa.null()), |
| ('gh', pa.null())]) |
| self.check_reader(reader, expected_schema, []) |
| |
| def test_inference(self): |
| # Inference is done on first block |
| rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" |
| expected_schema = pa.schema([('a', pa.string()), |
| ('b', pa.binary())]) |
| |
| read_options = ReadOptions() |
| read_options.block_size = len(rows) |
| reader = self.open_bytes(rows, read_options=read_options) |
| self.check_reader(reader, expected_schema, |
| [{'a': ['123', 'abc', 'gh'], |
| 'b': [b'456', b'de\xff', b'ij']}]) |
| |
| read_options.block_size = len(rows) - 1 |
| reader = self.open_bytes(rows, read_options=read_options) |
| self.check_reader(reader, expected_schema, |
| [{'a': ['123', 'abc'], |
| 'b': [b'456', b'de\xff']}, |
| {'a': ['gh'], |
| 'b': [b'ij']}]) |
| |
| def test_inference_failure(self): |
| # Inference on first block, then conversion failure on second block |
| rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" |
| read_options = ReadOptions() |
| read_options.block_size = len(rows) - 7 |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64())]) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'a': [123], 'b': [456] |
| } |
| # Second block |
| with pytest.raises(ValueError, |
| match="CSV conversion error to int64"): |
| reader.read_next_batch() |
| # EOF |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| # Inference on first block, then conversion failure on second block, |
| # then success on third block |
| rows = b"a,b\n1,2\nabc,def\n45,67\n" |
| read_options.block_size = 8 |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64())]) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} |
| # Second block |
| with pytest.raises(ValueError, |
| match="CSV conversion error to int64"): |
| reader.read_next_batch() |
| # Third block |
| assert reader.read_next_batch().to_pydict() == {'a': [45], 'b': [67]} |
| # EOF |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_invalid_csv(self): |
| # CSV errors on first block |
| rows = b"a,b\n1,2,3\n4,5\n6,7\n" |
| read_options = ReadOptions() |
| read_options.block_size = 10 |
| with pytest.raises(pa.ArrowInvalid, |
| match="Expected 2 columns, got 3"): |
| reader = self.open_bytes(rows, read_options=read_options) |
| |
| # CSV errors on second block |
| rows = b"a,b\n1,2\n3,4,5\n6,7\n" |
| read_options.block_size = 8 |
| reader = self.open_bytes(rows, read_options=read_options) |
| assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} |
| with pytest.raises(pa.ArrowInvalid, |
| match="Expected 2 columns, got 3"): |
| reader.read_next_batch() |
| # Cannot continue after a parse error |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_options_delimiter(self): |
| rows = b"a;b,c\nde,fg;eh\n" |
| reader = self.open_bytes(rows) |
| expected_schema = pa.schema([('a;b', pa.string()), |
| ('c', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'a;b': ['de'], |
| 'c': ['fg;eh']}]) |
| |
| opts = ParseOptions(delimiter=';') |
| reader = self.open_bytes(rows, parse_options=opts) |
| expected_schema = pa.schema([('a', pa.string()), |
| ('b,c', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'a': ['de,fg'], |
| 'b,c': ['eh']}]) |
| |
| def test_no_ending_newline(self): |
| # No \n after last line |
| rows = b"a,b,c\n1,2,3\n4,5,6" |
| reader = self.open_bytes(rows) |
| expected_schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64()), |
| ('c', pa.int64())]) |
| self.check_reader(reader, expected_schema, |
| [{'a': [1, 4], |
| 'b': [2, 5], |
| 'c': [3, 6]}]) |
| |
| def test_empty_file(self): |
| with pytest.raises(ValueError, match="Empty CSV file"): |
| self.open_bytes(b"") |
| |
| def test_column_options(self): |
| # With column_names |
| rows = b"1,2,3\n4,5,6" |
| read_options = ReadOptions() |
| read_options.column_names = ['d', 'e', 'f'] |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('d', pa.int64()), |
| ('e', pa.int64()), |
| ('f', pa.int64())]) |
| self.check_reader(reader, expected_schema, |
| [{'d': [1, 4], |
| 'e': [2, 5], |
| 'f': [3, 6]}]) |
| |
| # With include_columns |
| convert_options = ConvertOptions() |
| convert_options.include_columns = ['f', 'e'] |
| reader = self.open_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| expected_schema = pa.schema([('f', pa.int64()), |
| ('e', pa.int64())]) |
| self.check_reader(reader, expected_schema, |
| [{'e': [2, 5], |
| 'f': [3, 6]}]) |
| |
| # With column_types |
| convert_options.column_types = {'e': pa.string()} |
| reader = self.open_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| expected_schema = pa.schema([('f', pa.int64()), |
| ('e', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'e': ["2", "5"], |
| 'f': [3, 6]}]) |
| |
| # Missing columns in include_columns |
| convert_options.include_columns = ['g', 'f', 'e'] |
| with pytest.raises( |
| KeyError, |
| match="Column 'g' in include_columns does not exist"): |
| reader = self.open_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| |
| convert_options.include_missing_columns = True |
| reader = self.open_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| expected_schema = pa.schema([('g', pa.null()), |
| ('f', pa.int64()), |
| ('e', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'g': [None, None], |
| 'e': ["2", "5"], |
| 'f': [3, 6]}]) |
| |
| convert_options.column_types = {'e': pa.string(), 'g': pa.float64()} |
| reader = self.open_bytes(rows, read_options=read_options, |
| convert_options=convert_options) |
| expected_schema = pa.schema([('g', pa.float64()), |
| ('f', pa.int64()), |
| ('e', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'g': [None, None], |
| 'e': ["2", "5"], |
| 'f': [3, 6]}]) |
| |
| def test_encoding(self): |
| # latin-1 (invalid utf-8) |
| rows = b"a,b\nun,\xe9l\xe9phant" |
| read_options = ReadOptions() |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('a', pa.string()), |
| ('b', pa.binary())]) |
| self.check_reader(reader, expected_schema, |
| [{'a': ["un"], |
| 'b': [b"\xe9l\xe9phant"]}]) |
| |
| read_options.encoding = 'latin1' |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('a', pa.string()), |
| ('b', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'a': ["un"], |
| 'b': ["éléphant"]}]) |
| |
| # utf-16 |
| rows = (b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' |
| b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00') |
| read_options.encoding = 'utf16' |
| reader = self.open_bytes(rows, read_options=read_options) |
| expected_schema = pa.schema([('a', pa.string()), |
| ('b', pa.string())]) |
| self.check_reader(reader, expected_schema, |
| [{'a': ["un"], |
| 'b': ["éléphant"]}]) |
| |
| def test_small_random_csv(self): |
| csv, expected = make_random_csv(num_cols=2, num_rows=10) |
| reader = self.open_bytes(csv) |
| table = reader.read_all() |
| assert table.schema == expected.schema |
| assert table.equals(expected) |
| assert table.to_pydict() == expected.to_pydict() |
| |
| def test_stress_block_sizes(self): |
| # Test a number of small block sizes to stress block stitching |
| csv_base, expected = make_random_csv(num_cols=2, num_rows=500) |
| block_sizes = [19, 21, 23, 26, 37, 111] |
| csvs = [csv_base, csv_base.rstrip(b'\r\n')] |
| for csv in csvs: |
| for block_size in block_sizes: |
| # Need at least two lines for type inference |
| assert csv[:block_size].count(b'\n') >= 2 |
| read_options = ReadOptions(block_size=block_size) |
| reader = self.open_bytes(csv, read_options=read_options) |
| table = reader.read_all() |
| assert table.schema == expected.schema |
| if not table.equals(expected): |
| # Better error output |
| assert table.to_pydict() == expected.to_pydict() |
| |
| |
| class TestSerialStreamingCSVRead(BaseTestStreamingCSVRead, unittest.TestCase): |
| |
| def open_csv(self, *args, **kwargs): |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = False |
| return open_csv(*args, **kwargs) |
| |
| def test_batch_lifetime(self): |
| gc.collect() |
| old_allocated = pa.total_allocated_bytes() |
| |
| # Memory occupation should not grow with CSV file size |
| def check_one_batch(reader, expected): |
| batch = reader.read_next_batch() |
| assert batch.to_pydict() == expected |
| |
| rows = b"10,11\n12,13\n14,15\n16,17\n" |
| read_options = ReadOptions() |
| read_options.column_names = ['a', 'b'] |
| read_options.block_size = 6 |
| reader = self.open_bytes(rows, read_options=read_options) |
| check_one_batch(reader, {'a': [10], 'b': [11]}) |
| allocated_after_first_batch = pa.total_allocated_bytes() |
| check_one_batch(reader, {'a': [12], 'b': [13]}) |
| assert pa.total_allocated_bytes() == allocated_after_first_batch |
| check_one_batch(reader, {'a': [14], 'b': [15]}) |
| assert pa.total_allocated_bytes() == allocated_after_first_batch |
| check_one_batch(reader, {'a': [16], 'b': [17]}) |
| assert pa.total_allocated_bytes() == allocated_after_first_batch |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| assert pa.total_allocated_bytes() == old_allocated |
| reader = None |
| assert pa.total_allocated_bytes() == old_allocated |
| |
| |
| class BaseTestCompressedCSVRead: |
| |
| def setUp(self): |
| self.tmpdir = tempfile.mkdtemp(prefix='arrow-csv-test-') |
| |
| def tearDown(self): |
| shutil.rmtree(self.tmpdir) |
| |
| def read_csv(self, csv_path): |
| try: |
| return read_csv(csv_path) |
| except pa.ArrowNotImplementedError as e: |
| pytest.skip(str(e)) |
| |
| def test_random_csv(self): |
| csv, expected = make_random_csv(num_cols=2, num_rows=100) |
| csv_path = os.path.join(self.tmpdir, self.csv_filename) |
| self.write_file(csv_path, csv) |
| table = self.read_csv(csv_path) |
| table.validate(full=True) |
| assert table.schema == expected.schema |
| assert table.equals(expected) |
| assert table.to_pydict() == expected.to_pydict() |
| |
| |
| class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase): |
| csv_filename = "compressed.csv.gz" |
| |
| def write_file(self, path, contents): |
| with gzip.open(path, 'wb', 3) as f: |
| f.write(contents) |
| |
| def test_concatenated(self): |
| # ARROW-5974 |
| csv_path = os.path.join(self.tmpdir, self.csv_filename) |
| with gzip.open(csv_path, 'wb', 3) as f: |
| f.write(b"ab,cd\nef,gh\n") |
| with gzip.open(csv_path, 'ab', 3) as f: |
| f.write(b"ij,kl\nmn,op\n") |
| table = self.read_csv(csv_path) |
| assert table.to_pydict() == { |
| 'ab': ['ef', 'ij', 'mn'], |
| 'cd': ['gh', 'kl', 'op'], |
| } |
| |
| |
| class TestBZ2CSVRead(BaseTestCompressedCSVRead, unittest.TestCase): |
| csv_filename = "compressed.csv.bz2" |
| |
| def write_file(self, path, contents): |
| with bz2.BZ2File(path, 'w') as f: |
| f.write(contents) |
| |
| |
| def test_read_csv_does_not_close_passed_file_handles(): |
| # ARROW-4823 |
| buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6") |
| read_csv(buf) |
| assert not buf.closed |