blob: e26093a30850356123a92adec8d0c738fa32577d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# RECIPE STARTS HERE
#: ADBC makes it easy to load PyArrow datasets into your datastore.
import os
import tempfile
from pathlib import Path
import pyarrow
import pyarrow.csv
import pyarrow.dataset
import pyarrow.feather
import pyarrow.parquet
import adbc_driver_postgresql.dbapi
uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
conn = adbc_driver_postgresql.dbapi.connect(uri)
#: For the purposes of testing, we'll first make sure the tables we're about
#: to use don't exist.
with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS csvtable")
cur.execute("DROP TABLE IF EXISTS ipctable")
cur.execute("DROP TABLE IF EXISTS pqtable")
cur.execute("DROP TABLE IF EXISTS csvdataset")
cur.execute("DROP TABLE IF EXISTS ipcdataset")
cur.execute("DROP TABLE IF EXISTS pqdataset")
conn.commit()
#: Generating sample data
#: ~~~~~~~~~~~~~~~~~~~~~~
tempdir = tempfile.TemporaryDirectory(
prefix="adbc-docs-",
ignore_cleanup_errors=True,
)
root = Path(tempdir.name)
table = pyarrow.table(
[
[1, 1, 2],
["foo", "bar", "baz"],
],
names=["ints", "strs"],
)
#: First we'll write single files.
csv_file = root / "example.csv"
pyarrow.csv.write_csv(table, csv_file)
ipc_file = root / "example.arrow"
pyarrow.feather.write_feather(table, ipc_file)
parquet_file = root / "example.parquet"
pyarrow.parquet.write_table(table, parquet_file)
#: We'll also generate some partitioned datasets.
csv_dataset = root / "csv_dataset"
pyarrow.dataset.write_dataset(
table,
csv_dataset,
format="csv",
partitioning=["ints"],
)
ipc_dataset = root / "ipc_dataset"
pyarrow.dataset.write_dataset(
table,
ipc_dataset,
format="feather",
partitioning=["ints"],
)
parquet_dataset = root / "parquet_dataset"
pyarrow.dataset.write_dataset(
table,
parquet_dataset,
format="parquet",
partitioning=["ints"],
)
#: Loading CSV Files into PostgreSQL
#: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#: We can directly pass a :py:class:`pyarrow.RecordBatchReader` (from
#: ``open_csv``) to ``adbc_ingest``. We can also pass a
#: :py:class:`pyarrow.dataset.Dataset`, or a
#: :py:class:`pyarrow.dataset.Scanner`.
with conn.cursor() as cur:
reader = pyarrow.csv.open_csv(csv_file)
cur.adbc_ingest("csvtable", reader, mode="create")
reader = pyarrow.dataset.dataset(
csv_dataset,
format="csv",
partitioning=["ints"],
)
cur.adbc_ingest("csvdataset", reader, mode="create")
conn.commit()
with conn.cursor() as cur:
cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
#: Loading Arrow IPC (Feather) Files into PostgreSQL
#: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
with conn.cursor() as cur:
reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
#: Because of quirks in the PyArrow API, we have to read the file into
#: memory.
cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
#: The Dataset API will stream the data into memory and then into
#: PostgreSQL, though.
reader = pyarrow.dataset.dataset(
ipc_dataset,
format="feather",
partitioning=["ints"],
)
cur.adbc_ingest("ipcdataset", reader, mode="create")
conn.commit()
with conn.cursor() as cur:
cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
#: Loading Parquet Files into PostgreSQL
#: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
with conn.cursor() as cur:
reader = pyarrow.parquet.ParquetFile(parquet_file)
cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
reader = pyarrow.dataset.dataset(
parquet_dataset,
format="parquet",
partitioning=["ints"],
)
cur.adbc_ingest("pqdataset", reader, mode="create")
conn.commit()
with conn.cursor() as cur:
cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
#: Cleanup
#: ~~~~~~~
conn.close()
tempdir.cleanup()