blob: 4a38853e141bf69674e68218f16dd84d62520186 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
========================
Reading and Writing Data
========================
Recipes related to reading and writing data from disk using
Apache Arrow.
.. contents::
.. testsetup::
import pyarrow as pa
Write a Parquet file
====================
Given an array with 100 numbers, from 0 to 99
.. testcode::
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
.. testoutput::
0 .. 99
To write it to a Parquet file,
as Parquet is a format that contains multiple named columns,
we must create a :class:`pyarrow.Table` out of it,
so that we get a table of a single column which can then be
written to a Parquet file.
.. testcode::
table = pa.Table.from_arrays([arr], names=["col1"])
Once we have a table, it can be written to a Parquet File
using the functions provided by the ``pyarrow.parquet`` module
.. testcode::
import pyarrow.parquet as pq
pq.write_table(table, "example.parquet", compression=None)
Reading a Parquet file
======================
Given a Parquet file, it can be read back to a :class:`pyarrow.Table`
by using :func:`pyarrow.parquet.read_table` function
.. testcode::
import pyarrow.parquet as pq
table = pq.read_table("example.parquet")
The resulting table will contain the same columns that existed in
the parquet file as :class:`ChunkedArray`
.. testcode::
print(table)
.. testoutput::
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Reading a subset of Parquet data
================================
When reading a Parquet file with :func:`pyarrow.parquet.read_table`
it is possible to restrict which Columns and Rows will be read
into memory by using the ``filters`` and ``columns`` arguments
.. testcode::
import pyarrow.parquet as pq
table = pq.read_table("example.parquet",
columns=["col1"],
filters=[
("col1", ">", 5),
("col1", "<", 10),
])
The resulting table will contain only the projected columns
and filtered rows. Refer to :func:`pyarrow.parquet.read_table`
documentation for details about the syntax for filters.
.. testcode::
print(table)
.. testoutput::
pyarrow.Table
col1: int64
----
col1: [[6,7,8,9]]
Saving Arrow Arrays to disk
===========================
Apart from using arrow to read and save common file formats like Parquet,
it is possible to dump data in the raw arrow format which allows
direct memory mapping of data from disk. This format is called
the Arrow IPC format.
Given an array with 100 numbers, from 0 to 99
.. testcode::
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
.. testoutput::
0 .. 99
We can save the array by making a :class:`pyarrow.RecordBatch` out
of it and writing the record batch to disk.
.. testcode::
schema = pa.schema([
pa.field('nums', arr.type)
])
with pa.OSFile('arraydata.arrow', 'wb') as sink:
with pa.ipc.new_file(sink, schema=schema) as writer:
batch = pa.record_batch([arr], schema=schema)
writer.write(batch)
If we were to save multiple arrays into the same file,
we would just have to adapt the ``schema`` accordingly and add
them all to the ``record_batch`` call.
Memory Mapping Arrow Arrays from disk
=====================================
Arrow arrays that have been written to disk in the Arrow IPC
format can be memory mapped back directly from the disk.
.. testcode::
with pa.memory_map('arraydata.arrow', 'r') as source:
loaded_arrays = pa.ipc.open_file(source).read_all()
.. testcode::
arr = loaded_arrays[0]
print(f"{arr[0]} .. {arr[-1]}")
.. testoutput::
0 .. 99
Writing CSV files
=================
It is possible to write an Arrow :class:`pyarrow.Table` to
a CSV file using the :func:`pyarrow.csv.write_csv` function
.. testcode::
arr = pa.array(range(100))
table = pa.Table.from_arrays([arr], names=["col1"])
import pyarrow.csv
pa.csv.write_csv(table, "table.csv",
write_options=pa.csv.WriteOptions(include_header=True))
Writing CSV files incrementally
===============================
If you need to write data to a CSV file incrementally
as you generate or retrieve the data and you don't want to keep
in memory the whole table to write it at once, it's possible to use
:class:`pyarrow.csv.CSVWriter` to write data incrementally
.. testcode::
schema = pa.schema([("col1", pa.int32())])
with pa.csv.CSVWriter("table.csv", schema=schema) as writer:
for chunk in range(10):
datachunk = range(chunk*10, (chunk+1)*10)
table = pa.Table.from_arrays([pa.array(datachunk)], schema=schema)
writer.write(table)
It's equally possible to write :class:`pyarrow.RecordBatch`
by passing them as you would for tables.
Reading CSV files
=================
Arrow can read :class:`pyarrow.Table` entities from CSV using an
optimized codepath that can leverage multiple threads.
.. testcode::
import pyarrow.csv
table = pa.csv.read_csv("table.csv")
Arrow will do its best to infer data types. Further options can be
provided to :func:`pyarrow.csv.read_csv` to drive
:class:`pyarrow.csv.ConvertOptions`.
.. testcode::
print(table)
.. testoutput::
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Writing Partitioned Datasets
============================
When your dataset is big it usually makes sense to split it into
multiple separate files. You can do this manually or use
:func:`pyarrow.dataset.write_dataset` to let Arrow do the effort
of splitting the data in chunks for you.
The ``partitioning`` argument allows to tell :func:`pyarrow.dataset.write_dataset`
for which columns the data should be split.
For example given 100 birthdays, within 2000 and 2009
.. testcode::
import numpy.random
data = pa.table({"day": numpy.random.randint(1, 31, size=100),
"month": numpy.random.randint(1, 12, size=100),
"year": [2000 + x // 10 for x in range(100)]})
Then we could partition the data by the year column so that it
gets saved in 10 different files:
.. testcode::
import pyarrow as pa
import pyarrow.dataset as ds
ds.write_dataset(data, "./partitioned", format="parquet",
partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))
Arrow will partition datasets in subdirectories by default, which will
result in 10 different directories named with the value of the partitioning
column each with a file containing the subset of the data for that partition:
.. testcode::
from pyarrow import fs
localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))
for file in files:
print(file)
.. testoutput::
./partitioned/2000/part-0.parquet
./partitioned/2001/part-0.parquet
./partitioned/2002/part-0.parquet
./partitioned/2003/part-0.parquet
./partitioned/2004/part-0.parquet
./partitioned/2005/part-0.parquet
./partitioned/2006/part-0.parquet
./partitioned/2007/part-0.parquet
./partitioned/2008/part-0.parquet
./partitioned/2009/part-0.parquet
Reading Partitioned data
========================
In some cases, your dataset might be composed by multiple separate
files each containing a piece of the data.
.. testsetup::
import pathlib
import pyarrow.parquet as pq
examples = pathlib.Path("examples")
examples.mkdir(exist_ok=True)
pq.write_table(pa.table({"col1": range(10)}),
examples / "dataset1.parquet", compression=None)
pq.write_table(pa.table({"col1": range(10, 20)}),
examples / "dataset2.parquet", compression=None)
pq.write_table(pa.table({"col1": range(20, 30)}),
examples / "dataset3.parquet", compression=None)
In this case the :func:`pyarrow.dataset.dataset` function provides
an interface to discover and read all those files as a single big dataset.
For example if we have a structure like:
.. code-block::
examples/
├── dataset1.parquet
├── dataset2.parquet
└── dataset3.parquet
Then, pointing the :func:`pyarrow.dataset.dataset` function to the ``examples`` directory
will discover those parquet files and will expose them all as a single
:class:`pyarrow.dataset.Dataset`:
.. testcode::
import pyarrow.dataset as ds
dataset = ds.dataset("./examples", format="parquet")
print(dataset.files)
.. testoutput::
['./examples/dataset1.parquet', './examples/dataset2.parquet', './examples/dataset3.parquet']
The whole dataset can be viewed as a single big table using
:meth:`pyarrow.dataset.Dataset.to_table`. While each parquet file
contains only 10 rows, converting the dataset to a table will
expose them as a single Table.
.. testcode::
table = dataset.to_table()
print(table)
.. testoutput::
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,5,6,7,8,9],[10,11,12,13,14,15,16,17,18,19],[20,21,22,23,24,25,26,27,28,29]]
Notice that converting to a table will force all data to be loaded
in memory. For big datasets is usually not what you want.
For this reason, it might be better to rely on the
:meth:`pyarrow.dataset.Dataset.to_batches` method, which will
iteratively load the dataset one chunk of data at the time returning a
:class:`pyarrow.RecordBatch` for each one of them.
.. testcode::
for record_batch in dataset.to_batches():
col1 = record_batch.column("col1")
print(f"{col1._name} = {col1[0]} .. {col1[-1]}")
.. testoutput::
col1 = 0 .. 9
col1 = 10 .. 19
col1 = 20 .. 29
Reading Partitioned Data from S3
================================
The :class:`pyarrow.dataset.Dataset` is also able to abstract
partitioned data coming from remote sources like S3 or HDFS.
.. testcode::
from pyarrow import fs
# List content of s3://ursa-labs-taxi-data/2011
s3 = fs.SubTreeFileSystem(
"ursa-labs-taxi-data",
fs.S3FileSystem(region="us-east-2", anonymous=True)
)
for entry in s3.get_file_info(fs.FileSelector("2011", recursive=True)):
if entry.type == fs.FileType.File:
print(entry.path)
.. testoutput::
2011/01/data.parquet
2011/02/data.parquet
2011/03/data.parquet
2011/04/data.parquet
2011/05/data.parquet
2011/06/data.parquet
2011/07/data.parquet
2011/08/data.parquet
2011/09/data.parquet
2011/10/data.parquet
2011/11/data.parquet
2011/12/data.parquet
The data in the bucket can be loaded as a single big dataset partitioned
by ``month`` using
.. testcode::
dataset = ds.dataset("s3://ursa-labs-taxi-data/2011",
partitioning=["month"])
for f in dataset.files[:10]:
print(f)
print("...")
.. testoutput::
ursa-labs-taxi-data/2011/01/data.parquet
ursa-labs-taxi-data/2011/02/data.parquet
ursa-labs-taxi-data/2011/03/data.parquet
ursa-labs-taxi-data/2011/04/data.parquet
ursa-labs-taxi-data/2011/05/data.parquet
ursa-labs-taxi-data/2011/06/data.parquet
ursa-labs-taxi-data/2011/07/data.parquet
ursa-labs-taxi-data/2011/08/data.parquet
ursa-labs-taxi-data/2011/09/data.parquet
ursa-labs-taxi-data/2011/10/data.parquet
...
The dataset can then be used with :meth:`pyarrow.dataset.Dataset.to_table`
or :meth:`pyarrow.dataset.Dataset.to_batches` like you would for a local one.
.. note::
It is possible to load partitioned data also in the ipc arrow
format or in feather format.
.. warning::
If the above code throws an error most likely the reason is your
AWS credentials are not set. Follow these instructions to get
``AWS Access Key Id`` and ``AWS Secret Access Key``:
`AWS Credentials <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html>`_.
The credentials are normally stored in ``~/.aws/credentials`` (on Mac or Linux)
or in ``C:\Users\<USERNAME>\.aws\credentials`` (on Windows) file.
You will need to either create or update this file in the appropriate location.
The contents of the file should look like this:
.. code-block:: bash
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
Write a Feather file
====================
.. testsetup::
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
Given an array with 100 numbers, from 0 to 99
.. testcode::
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
.. testoutput::
0 .. 99
To write it to a Feather file, as Feather stores multiple columns,
we must create a :class:`pyarrow.Table` out of it,
so that we get a table of a single column which can then be
written to a Feather file.
.. testcode::
table = pa.Table.from_arrays([arr], names=["col1"])
Once we have a table, it can be written to a Feather File
using the functions provided by the ``pyarrow.feather`` module
.. testcode::
import pyarrow.feather as ft
ft.write_feather(table, 'example.feather')
Reading a Feather file
======================
Given a Feather file, it can be read back to a :class:`pyarrow.Table`
by using :func:`pyarrow.feather.read_table` function
.. testcode::
import pyarrow.feather as ft
table = ft.read_table("example.feather")
The resulting table will contain the same columns that existed in
the parquet file as :class:`ChunkedArray`
.. testcode::
print(table)
.. testoutput::
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
Reading Line Delimited JSON
===========================
Arrow has builtin support for line-delimited JSON.
Each line represents a row of data as a JSON object.
Given some data in a file where each line is a JSON object
containing a row of data:
.. testcode::
import tempfile
with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f:
f.write('{"a": 1, "b": 2.0, "c": 1}\n')
f.write('{"a": 3, "b": 3.0, "c": 2}\n')
f.write('{"a": 5, "b": 4.0, "c": 3}\n')
f.write('{"a": 7, "b": 5.0, "c": 4}\n')
The content of the file can be read back to a :class:`pyarrow.Table` using
:func:`pyarrow.json.read_json`:
.. testcode::
import pyarrow as pa
import pyarrow.json
table = pa.json.read_json(f.name)
.. testcode::
print(table.to_pydict())
.. testoutput::
{'a': [1, 3, 5, 7], 'b': [2.0, 3.0, 4.0, 5.0], 'c': [1, 2, 3, 4]}
Writing Compressed Data
=======================
Arrow provides support for writing files in compressed formats,
both for formats that provide compression natively like Parquet or Feather,
and for formats that don't support compression out of the box like CSV.
Given a table:
.. testcode::
table = pa.table([
pa.array([1, 2, 3, 4, 5])
], names=["numbers"])
Writing compressed Parquet or Feather data is driven by the
``compression`` argument to the :func:`pyarrow.feather.write_feather` and
:func:`pyarrow.parquet.write_table` functions:
.. testcode::
pa.feather.write_feather(table, "compressed.feather",
compression="lz4")
pa.parquet.write_table(table, "compressed.parquet",
compression="lz4")
You can refer to each of those functions' documentation for a complete
list of supported compression formats.
.. note::
Arrow actually uses compression by default when writing
Parquet or Feather files. Feather is compressed using ``lz4``
by default and Parquet uses ``snappy`` by default.
For formats that don't support compression natively, like CSV,
it's possible to save compressed data using
:class:`pyarrow.CompressedOutputStream`:
.. testcode::
with pa.CompressedOutputStream("compressed.csv.gz", "gzip") as out:
pa.csv.write_csv(table, out)
This requires decompressing the file when reading it back,
which can be done using :class:`pyarrow.CompressedInputStream`
as explained in the next recipe.
Reading Compressed Data
=======================
Arrow provides support for reading compressed files,
both for formats that provide it natively like Parquet or Feather,
and for files in formats that don't support compression natively,
like CSV, but have been compressed by an application.
Reading compressed formats that have native support for compression
doesn't require any special handling. We can for example read back
the Parquet and Feather files we wrote in the previous recipe
by simply invoking :meth:`pyarrow.feather.read_table` and
:meth:`pyarrow.parquet.read_table`:
.. testcode::
table_feather = pa.feather.read_table("compressed.feather")
print(table_feather)
.. testoutput::
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
.. testcode::
table_parquet = pa.parquet.read_table("compressed.parquet")
print(table_parquet)
.. testoutput::
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
Reading data from formats that don't have native support for
compression instead involves decompressing them before decoding them.
This can be done using the :class:`pyarrow.CompressedInputStream` class
which wraps files with a decompress operation before the result is
provided to the actual read function.
For example to read a compressed CSV file:
.. testcode::
with pa.CompressedInputStream(pa.OSFile("compressed.csv.gz"), "gzip") as input:
table_csv = pa.csv.read_csv(input)
print(table_csv)
.. testoutput::
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
.. note::
In the case of CSV, arrow is actually smart enough to try detecting
compressed files using the file extension. So if your file is named
``*.gz`` or ``*.bz2`` the :meth:`pyarrow.csv.read_csv` function will
try to decompress it accordingly
.. testcode::
table_csv2 = pa.csv.read_csv("compressed.csv.gz")
print(table_csv2)
.. testoutput::
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]