| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import abc |
| from collections import OrderedDict |
| from decimal import Decimal |
| import io |
| import itertools |
| import json |
| import string |
| import unittest |
| |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| import pytest |
| |
| import pyarrow as pa |
| from pyarrow.json import read_json, open_json, ReadOptions, ParseOptions |
| |
| |
| def generate_col_names(): |
| # 'a', 'b'... 'z', then 'aa', 'ab'... |
| letters = string.ascii_lowercase |
| yield from letters |
| for first in letters: |
| for second in letters: |
| yield first + second |
| |
| |
| def make_random_json(num_cols=2, num_rows=10, linesep='\r\n'): |
| arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) |
| col_names = list(itertools.islice(generate_col_names(), num_cols)) |
| lines = [] |
| for row in arr.T: |
| json_obj = OrderedDict([(k, int(v)) for (k, v) in zip(col_names, row)]) |
| lines.append(json.dumps(json_obj)) |
| data = linesep.join(lines).encode() |
| columns = [pa.array(col, type=pa.int64()) for col in arr] |
| expected = pa.Table.from_arrays(columns, col_names) |
| return data, expected |
| |
| |
| def check_options_class_pickling(cls, pickler, **attr_values): |
| opts = cls(**attr_values) |
| new_opts = pickler.loads(pickler.dumps(opts, |
| protocol=pickler.HIGHEST_PROTOCOL)) |
| for name, value in attr_values.items(): |
| assert getattr(new_opts, name) == value |
| |
| |
| def test_read_options(pickle_module): |
| cls = ReadOptions |
| opts = cls() |
| |
| assert opts.block_size > 0 |
| opts.block_size = 12345 |
| assert opts.block_size == 12345 |
| |
| assert opts.use_threads is True |
| opts.use_threads = False |
| assert opts.use_threads is False |
| |
| opts = cls(block_size=1234, use_threads=False) |
| assert opts.block_size == 1234 |
| assert opts.use_threads is False |
| |
| check_options_class_pickling(cls, pickler=pickle_module, |
| block_size=1234, |
| use_threads=False) |
| |
| |
| def test_parse_options(pickle_module): |
| cls = ParseOptions |
| opts = cls() |
| assert opts.newlines_in_values is False |
| assert opts.explicit_schema is None |
| |
| opts.newlines_in_values = True |
| assert opts.newlines_in_values is True |
| |
| schema = pa.schema([pa.field('foo', pa.int32())]) |
| opts.explicit_schema = schema |
| assert opts.explicit_schema == schema |
| |
| assert opts.unexpected_field_behavior == "infer" |
| for value in ["ignore", "error", "infer"]: |
| opts.unexpected_field_behavior = value |
| assert opts.unexpected_field_behavior == value |
| |
| with pytest.raises(ValueError): |
| opts.unexpected_field_behavior = "invalid-value" |
| |
| check_options_class_pickling(cls, pickler=pickle_module, |
| explicit_schema=schema, |
| newlines_in_values=False, |
| unexpected_field_behavior="ignore") |
| |
| |
| class BaseTestJSON(abc.ABC): |
| @abc.abstractmethod |
| def read_bytes(self, b, **kwargs): |
| """ |
| :param b: bytes to be parsed |
| :param kwargs: arguments passed on to open the json file |
| :return: b parsed as a single Table |
| """ |
| raise NotImplementedError |
| |
| def check_names(self, table, names): |
| assert table.num_columns == len(names) |
| assert [c.name for c in table.columns] == names |
| |
| def test_block_sizes(self): |
| rows = b'{"a": 1}\n{"a": 2}\n{"a": 3}' |
| read_options = ReadOptions() |
| parse_options = ParseOptions() |
| |
| for data in [rows, rows + b'\n']: |
| for newlines_in_values in [False, True]: |
| parse_options.newlines_in_values = newlines_in_values |
| read_options.block_size = 4 |
| with pytest.raises(ValueError, |
| match="try to increase block size"): |
| self.read_bytes(data, read_options=read_options, |
| parse_options=parse_options) |
| |
| # Validate reader behavior with various block sizes. |
| # There used to be bugs in this area. |
| for block_size in range(9, 20): |
| read_options.block_size = block_size |
| table = self.read_bytes(data, read_options=read_options, |
| parse_options=parse_options) |
| assert table.to_pydict() == {'a': [1, 2, 3]} |
| |
| def test_no_newline_at_end(self): |
| rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}' |
| table = self.read_bytes(rows) |
| assert table.to_pydict() == { |
| 'a': [1, 4], |
| 'b': [2, 5], |
| 'c': [3, 6], |
| } |
| |
| def test_simple_ints(self): |
| # Infer integer columns |
| rows = b'{"a": 1,"b": 2, "c": 3}\n{"a": 4,"b": 5, "c": 6}\n' |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64()), |
| ('c', pa.int64())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1, 4], |
| 'b': [2, 5], |
| 'c': [3, 6], |
| } |
| |
| def test_simple_varied(self): |
| # Infer various kinds of data |
| rows = (b'{"a": 1,"b": 2, "c": "3", "d": false}\n' |
| b'{"a": 4.0, "b": -5, "c": "foo", "d": true}\n') |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.float64()), |
| ('b', pa.int64()), |
| ('c', pa.string()), |
| ('d', pa.bool_())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1.0, 4.0], |
| 'b': [2, -5], |
| 'c': ["3", "foo"], |
| 'd': [False, True], |
| } |
| |
| def test_simple_nulls(self): |
| # Infer various kinds of data, with nulls |
| rows = (b'{"a": 1, "b": 2, "c": null, "d": null, "e": null}\n' |
| b'{"a": null, "b": -5, "c": "foo", "d": null, "e": true}\n' |
| b'{"a": 4.5, "b": null, "c": "nan", "d": null,"e": false}\n') |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.float64()), |
| ('b', pa.int64()), |
| ('c', pa.string()), |
| ('d', pa.null()), |
| ('e', pa.bool_())]) |
| assert table.schema == schema |
| assert table.to_pydict() == { |
| 'a': [1.0, None, 4.5], |
| 'b': [2, -5, None], |
| 'c': [None, "foo", "nan"], |
| 'd': [None, None, None], |
| 'e': [None, True, False], |
| } |
| |
| def test_empty_lists(self): |
| # ARROW-10955: Infer list(null) |
| rows = b'{"a": []}' |
| table = self.read_bytes(rows) |
| schema = pa.schema([('a', pa.list_(pa.null()))]) |
| assert table.schema == schema |
| assert table.to_pydict() == {'a': [[]]} |
| |
| def test_empty_rows(self): |
| rows = b'{}\n{}\n' |
| table = self.read_bytes(rows) |
| schema = pa.schema([]) |
| assert table.schema == schema |
| assert table.num_columns == 0 |
| assert table.num_rows == 2 |
| |
| def test_explicit_schema_decimal(self): |
| rows = (b'{"a": 1}\n' |
| b'{"a": 1.45}\n' |
| b'{"a": -23.456}\n' |
| b'{}\n') |
| expected = { |
| 'a': [Decimal("1"), Decimal("1.45"), Decimal("-23.456"), None], |
| } |
| |
| decimal_types = (pa.decimal32, pa.decimal64, pa.decimal128, pa.decimal256) |
| for type_factory in decimal_types: |
| schema = pa.schema([('a', type_factory(9, 4))]) |
| opts = ParseOptions(explicit_schema=schema) |
| table = self.read_bytes(rows, parse_options=opts) |
| assert table.schema == schema |
| assert table.to_pydict() == expected |
| |
| def test_explicit_schema_with_unexpected_behaviour(self): |
| # infer by default |
| rows = (b'{"foo": "bar", "num": 0}\n' |
| b'{"foo": "baz", "num": 1}\n') |
| schema = pa.schema([ |
| ('foo', pa.binary()) |
| ]) |
| |
| opts = ParseOptions(explicit_schema=schema) |
| table = self.read_bytes(rows, parse_options=opts) |
| assert table.schema == pa.schema([ |
| ('foo', pa.binary()), |
| ('num', pa.int64()) |
| ]) |
| assert table.to_pydict() == { |
| 'foo': [b'bar', b'baz'], |
| 'num': [0, 1], |
| } |
| |
| # ignore the unexpected fields |
| opts = ParseOptions(explicit_schema=schema, |
| unexpected_field_behavior="ignore") |
| table = self.read_bytes(rows, parse_options=opts) |
| assert table.schema == pa.schema([ |
| ('foo', pa.binary()), |
| ]) |
| assert table.to_pydict() == { |
| 'foo': [b'bar', b'baz'], |
| } |
| |
| # raise error |
| opts = ParseOptions(explicit_schema=schema, |
| unexpected_field_behavior="error") |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error: unexpected field"): |
| self.read_bytes(rows, parse_options=opts) |
| |
| @pytest.mark.numpy |
| def test_small_random_json(self): |
| data, expected = make_random_json(num_cols=2, num_rows=10) |
| table = self.read_bytes(data) |
| assert table.schema == expected.schema |
| assert table.equals(expected) |
| assert table.to_pydict() == expected.to_pydict() |
| |
| @pytest.mark.numpy |
| def test_load_large_json(self): |
| data, expected = make_random_json(num_cols=2, num_rows=100100) |
| # set block size is 10MB |
| read_options = ReadOptions(block_size=1024*1024*10) |
| table = self.read_bytes(data, read_options=read_options) |
| assert table.num_rows == 100100 |
| assert expected.num_rows == 100100 |
| |
| @pytest.mark.numpy |
| def test_stress_block_sizes(self): |
| # Test a number of small block sizes to stress block stitching |
| data_base, expected = make_random_json(num_cols=2, num_rows=100) |
| read_options = ReadOptions() |
| parse_options = ParseOptions() |
| |
| for data in [data_base, data_base.rstrip(b'\r\n')]: |
| for newlines_in_values in [False, True]: |
| parse_options.newlines_in_values = newlines_in_values |
| for block_size in [22, 23, 37]: |
| read_options.block_size = block_size |
| table = self.read_bytes(data, read_options=read_options, |
| parse_options=parse_options) |
| assert table.schema == expected.schema |
| if not table.equals(expected): |
| # Better error output |
| assert table.to_pydict() == expected.to_pydict() |
| |
| |
| class BaseTestJSONRead(BaseTestJSON): |
| |
| def read_bytes(self, b, **kwargs): |
| return self.read_json(pa.py_buffer(b), **kwargs) |
| |
| def test_file_object(self): |
| data = b'{"a": 1, "b": 2}\n' |
| expected_data = {'a': [1], 'b': [2]} |
| bio = io.BytesIO(data) |
| table = self.read_json(bio) |
| assert table.to_pydict() == expected_data |
| # Text files not allowed |
| sio = io.StringIO(data.decode()) |
| with pytest.raises(TypeError): |
| self.read_json(sio) |
| |
| def test_reconcile_across_blocks(self): |
| # ARROW-12065: reconciling inferred types across blocks |
| first_row = b'{ }\n' |
| read_options = ReadOptions(block_size=len(first_row)) |
| for next_rows, expected_pylist in [ |
| (b'{"a": 0}', [None, 0]), |
| (b'{"a": []}', [None, []]), |
| (b'{"a": []}\n{"a": [[1]]}', [None, [], [[1]]]), |
| (b'{"a": {}}', [None, {}]), |
| (b'{"a": {}}\n{"a": {"b": {"c": 1}}}', |
| [None, {"b": None}, {"b": {"c": 1}}]), |
| ]: |
| table = self.read_bytes(first_row + next_rows, |
| read_options=read_options) |
| expected = {"a": expected_pylist} |
| assert table.to_pydict() == expected |
| # Check that the issue was exercised |
| assert table.column("a").num_chunks > 1 |
| |
| |
| class BaseTestStreamingJSONRead(BaseTestJSON): |
| def open_json(self, json, *args, **kwargs): |
| """ |
| Reads the JSON file into memory using pyarrow's open_json |
| json The JSON bytes |
| args Positional arguments to be forwarded to pyarrow's open_json |
| kwargs Keyword arguments to be forwarded to pyarrow's open_json |
| """ |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = self.use_threads |
| return open_json(json, *args, **kwargs) |
| |
| def open_bytes(self, b, **kwargs): |
| return self.open_json(pa.py_buffer(b), **kwargs) |
| |
| def check_reader(self, reader, expected_schema, expected_data): |
| assert reader.schema == expected_schema |
| batches = list(reader) |
| assert len(batches) == len(expected_data) |
| for batch, expected_batch in zip(batches, expected_data): |
| batch.validate(full=True) |
| assert batch.schema == expected_schema |
| assert batch.to_pydict() == expected_batch |
| |
| def read_bytes(self, b, **kwargs): |
| return self.open_bytes(b, **kwargs).read_all() |
| |
| def test_file_object(self): |
| data = b'{"a": 1, "b": 2}\n' |
| expected_data = {'a': [1], 'b': [2]} |
| bio = io.BytesIO(data) |
| reader = self.open_json(bio) |
| expected_schema = pa.schema([('a', pa.int64()), |
| ('b', pa.int64())]) |
| self.check_reader(reader, expected_schema, [expected_data]) |
| |
| def test_bad_first_chunk(self): |
| bad_first_chunk = b'{"i": 0 }\n{"i": 1}' |
| read_options = ReadOptions() |
| read_options.block_size = 3 |
| with pytest.raises( |
| pa.ArrowInvalid, |
| match="straddling object straddles two block boundaries*" |
| ): |
| self.open_bytes(bad_first_chunk, read_options=read_options) |
| |
| def test_bad_middle_chunk(self): |
| bad_middle_chunk = b'{"i": 0}\n{"i": 1}\n{"i": 2}' |
| read_options = ReadOptions() |
| read_options.block_size = 10 |
| expected_schema = pa.schema([('i', pa.int64())]) |
| |
| reader = self.open_bytes(bad_middle_chunk, read_options=read_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'i': [0] |
| } |
| with pytest.raises( |
| pa.ArrowInvalid, |
| match="straddling object straddles two block boundaries*" |
| ): |
| reader.read_next_batch() |
| |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_bad_first_parse(self): |
| bad_first_block = b'{"n": }\n{"n": 10000}' |
| read_options = ReadOptions() |
| read_options.block_size = 16 |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error: Invalid value.*"): |
| self.open_bytes(bad_first_block, read_options=read_options) |
| |
| def test_bad_middle_parse_after_empty(self): |
| bad_first_block = b'{ }{"n": }\n{"n": 10000}' |
| read_options = ReadOptions() |
| read_options.block_size = 16 |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error: Invalid value.*"): |
| self.open_bytes(bad_first_block, read_options=read_options) |
| |
| def test_bad_middle_parse(self): |
| bad_middle_chunk = b'{"n": 1000}\n{"n": 200 00}\n{"n": 3000}' |
| read_options = ReadOptions() |
| read_options.block_size = 10 |
| expected_schema = pa.schema([('n', pa.int64())]) |
| |
| reader = self.open_bytes(bad_middle_chunk, read_options=read_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'n': [1000] |
| } |
| with pytest.raises( |
| pa.ArrowInvalid, |
| match="JSON parse error:\ |
| Missing a comma or '}' after an object member*" |
| ): |
| reader.read_next_batch() |
| |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_non_linewise_chunker_first_block(self): |
| bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' |
| read_options = ReadOptions(block_size=10) |
| parse_options = ParseOptions(newlines_in_values=True) |
| expected_schema = pa.schema([('n', pa.int64())]) |
| |
| reader = self.open_bytes( |
| bad_middle_chunk, |
| read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'n': [0] |
| } |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error *"): |
| reader.read_next_batch() |
| |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_non_linewise_chunker_bad_first_block(self): |
| bad_middle_chunk = b'{"n": 0}{1}\n{"n": 2}' |
| read_options = ReadOptions(block_size=10) |
| parse_options = ParseOptions(newlines_in_values=True) |
| expected_schema = pa.schema([('n', pa.int64())]) |
| |
| reader = self.open_bytes( |
| bad_middle_chunk, |
| read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'n': [0] |
| } |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error *"): |
| reader.read_next_batch() |
| |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_non_linewise_chunker_bad_middle_block(self): |
| bad_middle_chunk = b'{"n": 0}\n{"n": 1}\n{}"n":2}\n{"n": 3}' |
| read_options = ReadOptions(block_size=10) |
| parse_options = ParseOptions(newlines_in_values=True) |
| expected_schema = pa.schema([('n', pa.int64())]) |
| |
| reader = self.open_bytes( |
| bad_middle_chunk, |
| read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == { |
| 'n': [0] |
| } |
| assert reader.read_next_batch().to_pydict() == { |
| 'n': [1] |
| } |
| |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error *"): |
| reader.read_next_batch() |
| |
| with pytest.raises(StopIteration): |
| reader.read_next_batch() |
| |
| def test_ignore_leading_empty_blocks(self): |
| leading_empty_chunk = b' \n{"b": true, "s": "foo"}' |
| explicit_schema = pa.schema([ |
| ('b', pa.bool_()), |
| ('s', pa.utf8()) |
| ]) |
| read_options = ReadOptions(block_size=24) |
| parse_options = ParseOptions(explicit_schema=explicit_schema) |
| expected_data = { |
| 'b': [True], 's': ["foo"] |
| } |
| |
| reader = self.open_bytes( |
| leading_empty_chunk, |
| read_options=read_options, |
| parse_options=parse_options) |
| self.check_reader(reader, explicit_schema, [expected_data]) |
| |
| def test_inference(self): |
| rows = b'{"a": 0, "b": "foo" }\n\ |
| {"a": 1, "c": true }\n{"a": 2, "d": 4.0}' |
| expected_schema = pa.schema([ |
| ('a', pa.int64()), |
| ('b', pa.utf8()) |
| ]) |
| expected_data = {'a': [0], 'b': ["foo"]} |
| |
| read_options = ReadOptions(block_size=32) |
| parse_options = ParseOptions(unexpected_field_behavior="infer") |
| reader = self.open_bytes( |
| rows, |
| read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == expected_data |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error: unexpected field"): |
| reader.read_next_batch() |
| |
| expected_schema = pa.schema([ |
| ('a', pa.int64()), |
| ('b', pa.utf8()), |
| ('c', pa.bool_()), |
| ]) |
| expected_data = {'a': [0, 1], 'b': ["foo", None], 'c': [None, True]} |
| read_options = ReadOptions(block_size=64) |
| reader = self.open_bytes(rows, read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == expected_data |
| with pytest.raises(pa.ArrowInvalid, |
| match="JSON parse error: unexpected field"): |
| reader.read_next_batch() |
| |
| expected_schema = pa.schema([ |
| ('a', pa.int64()), |
| ('b', pa.utf8()), |
| ('c', pa.bool_()), |
| ('d', pa.float64()), |
| ]) |
| expected_data = {'a': [0, 1, 2], 'b': ["foo", None, None], |
| 'c': [None, True, None], 'd': [None, None, 4.0]} |
| read_options = ReadOptions(block_size=96) |
| reader = self.open_bytes(rows, read_options=read_options, |
| parse_options=parse_options) |
| assert reader.schema == expected_schema |
| assert reader.read_next_batch().to_pydict() == expected_data |
| |
| |
| class TestSerialJSONRead(BaseTestJSONRead, unittest.TestCase): |
| |
| def read_json(self, *args, **kwargs): |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = False |
| table = read_json(*args, **kwargs) |
| table.validate(full=True) |
| return table |
| |
| |
| class TestParallelJSONRead(BaseTestJSONRead, unittest.TestCase): |
| |
| def read_json(self, *args, **kwargs): |
| read_options = kwargs.setdefault('read_options', ReadOptions()) |
| read_options.use_threads = True |
| table = read_json(*args, **kwargs) |
| table.validate(full=True) |
| return table |
| |
| |
| class TestSerialStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): |
| |
| use_threads = False |
| |
| |
| @pytest.mark.threading |
| class TestThreadedStreamingJSONRead(BaseTestStreamingJSONRead, unittest.TestCase): |
| |
| use_threads = True |