| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import numpy as np |
| import pandas as pd |
| import pyarrow as pa |
| |
| from . import common |
| from .common import KILOBYTE, MEGABYTE |
| |
| |
| def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')): |
| rowsize = total_size // nchunks // ncols |
| assert rowsize % dtype.itemsize == 0 |
| |
| def make_column(col, chunk): |
| return np.frombuffer(common.get_random_bytes( |
| rowsize, seed=col + 997 * chunk)).view(dtype) |
| |
| return [pd.DataFrame({ |
| 'c' + str(col): make_column(col, chunk) |
| for col in range(ncols)}) |
| for chunk in range(nchunks)] |
| |
| |
| class StreamReader(object): |
| """ |
| Benchmark in-memory streaming to a Pandas dataframe. |
| """ |
| total_size = 64 * MEGABYTE |
| ncols = 8 |
| chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE] |
| |
| param_names = ['chunk_size'] |
| params = [chunk_sizes] |
| |
| def setup(self, chunk_size): |
| # Note we're careful to stream different chunks instead of |
| # streaming N times the same chunk, so that we avoid operating |
| # entirely out of L1/L2. |
| chunks = generate_chunks(self.total_size, |
| nchunks=self.total_size // chunk_size, |
| ncols=self.ncols) |
| batches = [pa.RecordBatch.from_pandas(df) |
| for df in chunks] |
| schema = batches[0].schema |
| sink = pa.BufferOutputStream() |
| stream_writer = pa.RecordBatchStreamWriter(sink, schema) |
| for batch in batches: |
| stream_writer.write_batch(batch) |
| self.source = sink.getvalue() |
| |
| def time_read_to_dataframe(self, *args): |
| reader = pa.RecordBatchStreamReader(self.source) |
| table = reader.read_all() |
| df = table.to_pandas() # noqa |