GH-34216: [Python] Support for reading JSON Datasets With Python (#34586)
This PR supports for reading JSON Datasets With Python. As mentioned in [#34216](https://github.com/apache/arrow/issues/34216), only the reading ability are supported.
Please compare the difference between my implemenation of _json.pyx, _json.pyd and _csv.pyx _csv.pyd.
Cause _csv.pyd utilize pointer for cpp class and my implementation doesn't.
**What changes are included in this PR?**
C++: add inclusion for file_json.h
Python: reference C++ codes and support reading JSON Datasets
**Are these changes tested?**
Yes
6 test samples added in tests/test_dataset.py
* Closes: #34216
Lead-authored-by: Junming Chen <junming.chen.r@outlook.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h
index 6554dfc..c2ebd9d 100644
--- a/cpp/src/arrow/dataset/api.h
+++ b/cpp/src/arrow/dataset/api.h
@@ -26,6 +26,9 @@
#ifdef ARROW_CSV
#include "arrow/dataset/file_csv.h"
#endif
+#ifdef ARROW_JSON
+#include "arrow/dataset/file_json.h"
+#endif
#include "arrow/dataset/file_ipc.h"
#ifdef ARROW_ORC
#include "arrow/dataset/file_orc.h"
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index ba0a286..d2b5554 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -36,6 +36,8 @@
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
+from pyarrow._json cimport ParseOptions as JsonParseOptions
+from pyarrow._json cimport ReadOptions as JsonReadOptions
_DEFAULT_BATCH_SIZE = 2**17
@@ -983,7 +985,7 @@
The top-level schema of the Dataset.
format : FileFormat
File format of the fragments, currently only ParquetFileFormat,
- IpcFileFormat, and CsvFileFormat are supported.
+ IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
FileSystem of the fragments.
root_partition : Expression, optional
@@ -1078,7 +1080,7 @@
The top-level schema of the DataDataset.
format : FileFormat
File format to create fragments from, currently only
- ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
+ ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : list[Expression], optional
@@ -1179,6 +1181,7 @@
classes = {
'ipc': IpcFileFormat,
'csv': CsvFileFormat,
+ 'json': JsonFileFormat,
'parquet': _get_parquet_symbol('ParquetFileFormat'),
'orc': _get_orc_fileformat(),
}
@@ -1315,10 +1318,11 @@
type_name = frombytes(sp.get().type_name())
classes = {
- # IpcFileFormat, CsvFileFormat and OrcFileFormat do not have
+ # IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have
# corresponding subclasses of FileFragment
'ipc': FileFragment,
'csv': FileFragment,
+ 'json': FileFragment,
'orc': FileFragment,
'parquet': _get_parquet_symbol('ParquetFileFragment'),
}
@@ -1928,6 +1932,7 @@
classes = {
'csv': CsvFragmentScanOptions,
+ 'json': JsonFragmentScanOptions,
'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'),
}
@@ -2184,6 +2189,126 @@
self.csv_options = <CCsvFileWriteOptions*> sp.get()
+cdef class JsonFileFormat(FileFormat):
+ """
+ FileFormat for JSON files.
+
+ Parameters
+ ----------
+ default_fragment_scan_options : JsonFragmentScanOptions
+ Default options for fragments scan.
+ parse_options : pyarrow.json.ParseOptions
+ Options regarding json parsing.
+ read_options : pyarrow.json.ReadOptions
+ General read options.
+ """
+ cdef:
+ CJsonFileFormat* json_format
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, default_fragment_scan_options=None,
+ JsonParseOptions parse_options=None,
+ JsonReadOptions read_options=None):
+ self.init(shared_ptr[CFileFormat](new CJsonFileFormat()))
+ if parse_options is not None or read_options is not None:
+ if default_fragment_scan_options is not None:
+ raise ValueError('If `default_fragment_scan_options` is '
+ 'given, cannot specify read_options')
+ self.default_fragment_scan_options = JsonFragmentScanOptions(
+ parse_options=parse_options,
+ read_options=read_options)
+ elif isinstance(default_fragment_scan_options, dict):
+ self.default_fragment_scan_options = JsonFragmentScanOptions(
+ **default_fragment_scan_options)
+ elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions):
+ self.default_fragment_scan_options = default_fragment_scan_options
+ elif default_fragment_scan_options is not None:
+ raise TypeError('`default_fragment_scan_options` must be either '
+ 'a dictionary or an instance of '
+ 'JsonFragmentScanOptions')
+
+ cdef void init(self, const shared_ptr[CFileFormat]& sp):
+ FileFormat.init(self, sp)
+ self.json_format = <CJsonFileFormat*> sp.get()
+
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ if options.type_name == 'json':
+ self.json_format.default_fragment_scan_options = options.wrapped
+ self.default_fragment_scan_options.read_options = options.read_options
+ self.default_fragment_scan_options.parse_options = options.parse_options
+ else:
+ super()._set_default_fragment_scan_options(options)
+
+ def equals(self, JsonFileFormat other):
+ return (other and
+ self.default_fragment_scan_options ==
+ other.default_fragment_scan_options)
+
+ def __reduce__(self):
+ return JsonFileFormat, (self.default_fragment_scan_options,)
+
+ def __repr__(self):
+ return "<JsonFileFormat>"
+
+
+cdef class JsonFragmentScanOptions(FragmentScanOptions):
+ """
+ Scan-specific options for JSON fragments.
+
+ Parameters
+ ----------
+ parse_options : pyarrow.json.ParseOptions
+ Options regarding JSON parsing.
+ read_options : pyarrow.json.ReadOptions
+ General read options.
+ """
+ cdef:
+ CJsonFragmentScanOptions* json_options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, JsonParseOptions parse_options=None,
+ JsonReadOptions read_options=None):
+ self.init(shared_ptr[CFragmentScanOptions](
+ new CJsonFragmentScanOptions()))
+ if parse_options is not None:
+ self.parse_options = parse_options
+ if read_options is not None:
+ self.read_options = read_options
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ FragmentScanOptions.init(self, sp)
+ self.json_options = <CJsonFragmentScanOptions*> sp.get()
+
+ @property
+ def parse_options(self):
+ return JsonParseOptions.wrap(self.json_options.parse_options)
+
+ @parse_options.setter
+ def parse_options(self, JsonParseOptions parse_options not None):
+ self.json_options.parse_options = parse_options.options
+
+ @property
+ def read_options(self):
+ return JsonReadOptions.wrap(self.json_options.read_options)
+
+ @read_options.setter
+ def read_options(self, JsonReadOptions read_options not None):
+ self.json_options.read_options = read_options.options
+
+ def equals(self, JsonFragmentScanOptions other):
+ return (
+ other and
+ self.read_options.equals(other.read_options) and
+ self.parse_options.equals(other.parse_options))
+
+ def __reduce__(self):
+ return JsonFragmentScanOptions, (self.parse_options, self.read_options)
+
+
cdef class Partitioning(_Weakrefable):
def __init__(self):
diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd
new file mode 100644
index 0000000..42a0a67
--- /dev/null
+++ b/python/pyarrow/_json.pxd
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport _Weakrefable
+
+
+cdef class ParseOptions(_Weakrefable):
+ cdef:
+ CJSONParseOptions options
+
+ @staticmethod
+ cdef ParseOptions wrap(CJSONParseOptions options)
+
+cdef class ReadOptions(_Weakrefable):
+ cdef:
+ CJSONReadOptions options
+
+ @staticmethod
+ cdef ReadOptions wrap(CJSONReadOptions options)
diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx
index 4c6d964..70cde6e 100644
--- a/python/pyarrow/_json.pyx
+++ b/python/pyarrow/_json.pyx
@@ -40,8 +40,6 @@
This will determine multi-threading granularity as well as
the size of individual chunks in the Table.
"""
- cdef:
- CJSONReadOptions options
# Avoid mistakingly creating attributes
__slots__ = ()
@@ -84,6 +82,24 @@
self.block_size
)
+ def equals(self, ReadOptions other):
+ return (
+ self.use_threads == other.use_threads and
+ self.block_size == other.block_size
+ )
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+ @staticmethod
+ cdef ReadOptions wrap(CJSONReadOptions options):
+ out = ReadOptions()
+ out.options = options # shallow copy
+ return out
+
cdef class ParseOptions(_Weakrefable):
"""
@@ -107,9 +123,6 @@
the output
"""
- cdef:
- CJSONParseOptions options
-
__slots__ = ()
def __init__(self, explicit_schema=None, newlines_in_values=None,
@@ -198,6 +211,25 @@
self.options.unexpected_field_behavior = v
+ def equals(self, ParseOptions other):
+ return (
+ self.explicit_schema == other.explicit_schema and
+ self.newlines_in_values == other.newlines_in_values and
+ self.unexpected_field_behavior == other.unexpected_field_behavior
+ )
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+ @staticmethod
+ cdef ParseOptions wrap(CJSONParseOptions options):
+ out = ParseOptions()
+ out.options = options # shallow copy
+ return out
+
cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
use_memory_map = False
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index c4da968..8bec208 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -23,6 +23,8 @@
from pyarrow._dataset import ( # noqa
CsvFileFormat,
CsvFragmentScanOptions,
+ JsonFileFormat,
+ JsonFragmentScanOptions,
Dataset,
DatasetFactory,
DirectoryPartitioning,
@@ -297,6 +299,8 @@
if not _orc_available:
raise ValueError(_orc_msg)
return OrcFileFormat()
+ elif obj == "json":
+ return JsonFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))
@@ -598,7 +602,7 @@
Optionally provide the Schema for the Dataset, in which case it will
not be inferred from the source.
format : FileFormat or str
- Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are
+ Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are
supported. For Feather, only version 2 files are supported.
filesystem : FileSystem or URI string, default None
If a single path is given as source and filesystem is None, then the
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index b554633..201fb78 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -277,6 +277,13 @@
CCSVReadOptions read_options
function[StreamWrapFunc] stream_transform_func
+ cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat):
+ pass
+
+ cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions):
+ CJSONParseOptions parse_options
+ CJSONReadOptions read_options
+
cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
CResult[CExpression] Parse(const c_string & path) const
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 20a1d51..66562b7 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -35,6 +35,7 @@
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.csv
+import pyarrow.json
import pyarrow.feather
import pyarrow.fs as fs
from pyarrow.tests.util import (change_cwd, _filesystem_uri,
@@ -805,6 +806,12 @@
skip_rows=3, column_names=['foo'])),
ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
skip_rows=3, block_size=2**20)),
+ ds.JsonFileFormat(),
+ ds.JsonFileFormat(
+ parse_options=pa.json.ParseOptions(newlines_in_values=True,
+ unexpected_field_behavior="ignore")),
+ ds.JsonFileFormat(read_options=pa.json.ReadOptions(
+ use_threads=False, block_size=14)),
]
try:
formats.append(ds.OrcFileFormat())
@@ -835,6 +842,12 @@
convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
ds.CsvFragmentScanOptions(
read_options=pa.csv.ReadOptions(block_size=2**16)),
+ ds.JsonFragmentScanOptions(),
+ ds.JsonFragmentScanOptions(
+ pa.json.ParseOptions(newlines_in_values=False,
+ unexpected_field_behavior="error")),
+ ds.JsonFragmentScanOptions(
+ read_options=pa.json.ReadOptions(use_threads=True, block_size=512)),
]
if pq is not None:
@@ -972,6 +985,28 @@
assert dataset_reader.to_table(pickled).equals(fragment.to_table())
+def test_make_json_fragment_from_buffer(dataset_reader):
+ content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + \
+ '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + \
+ '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n'
+ buffer = pa.py_buffer(content.encode('utf-8'))
+
+ json_format = ds.JsonFileFormat()
+ fragment = json_format.make_fragment(buffer)
+
+ # When buffer, fragment open returns a BufferReader, not NativeFile
+ assert isinstance(fragment.open(), pa.BufferReader)
+
+ expected = pa.table([['a', 'b', 'c'],
+ [12, 11, 10],
+ ['dog', 'cat', 'rabbit']],
+ names=['alpha', 'num', 'animal'])
+ assert dataset_reader.to_table(fragment).equals(expected)
+
+ pickled = pickle.loads(pickle.dumps(fragment))
+ assert dataset_reader.to_table(pickled).equals(fragment.to_table())
+
+
@pytest.mark.parquet
def test_make_parquet_fragment_from_buffer(dataset_reader):
arrays = [
@@ -3174,6 +3209,69 @@
pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])}))
+@pytest.mark.pandas
+def test_json_format(tempdir, dataset_reader):
+ table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+ 'b': pa.array([.1, .2, .3], type="float64")})
+
+ path = str(tempdir / 'test.json')
+ out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+ with open(path, 'w') as f:
+ f.write(out)
+
+ dataset = ds.dataset(path, format=ds.JsonFileFormat())
+ result = dataset_reader.to_table(dataset)
+ assert result.equals(table)
+
+ assert_dataset_fragment_convenience_methods(dataset)
+
+ dataset = ds.dataset(path, format='json')
+ result = dataset_reader.to_table(dataset)
+ assert result.equals(table)
+
+
+def test_json_format_options(tempdir, dataset_reader):
+ table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+ 'b': pa.array([.1, .2, .3], type="float64")})
+
+ path = str(tempdir / 'test.json')
+ out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+ with open(path, 'w') as f:
+ f.write(out)
+
+ with pytest.raises(ValueError,
+ match="try to increase block size"):
+ dataset = ds.dataset(path, format=ds.JsonFileFormat(
+ read_options=pa.json.ReadOptions(block_size=4)))
+
+ dataset = ds.dataset(path, format=ds.JsonFileFormat(
+ read_options=pa.json.ReadOptions(block_size=64)))
+ result = dataset_reader.to_table(dataset)
+ assert result.equals(table)
+
+
+def test_json_fragment_options(tempdir, dataset_reader):
+ table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
+ 'b': pa.array([.1, .2, .3], type="float64")})
+
+ path = str(tempdir / 'test.json')
+ out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{')
+ with open(path, 'w') as f:
+ f.write(out)
+
+ with pytest.raises(ValueError,
+ match="try to increase block size"):
+ options = ds.JsonFragmentScanOptions(
+ read_options=pa.json.ReadOptions(block_size=4))
+ dataset = ds.dataset(path, format=ds.JsonFileFormat(options))
+
+ options = ds.JsonFragmentScanOptions(
+ read_options=pa.json.ReadOptions(block_size=64))
+ dataset = ds.dataset(path, format=ds.JsonFileFormat(options))
+ result = dataset_reader.to_table(dataset)
+ assert result.equals(table)
+
+
def test_encoding(tempdir, dataset_reader):
path = str(tempdir / 'test.csv')