| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import shutil |
| import tempfile |
| |
| from pandas.util.testing import rands |
| import numpy as np |
| import pandas as pd |
| |
| import pyarrow as pa |
| try: |
| import pyarrow.parquet as pq |
| except ImportError: |
| pq = None |
| |
| |
| class ParquetManifestCreation(object): |
| """Benchmark creating a parquet manifest.""" |
| |
| size = 10 ** 6 |
| tmpdir = None |
| |
| param_names = ('num_partitions', 'num_threads') |
| params = [(10, 100, 1000), (1, 8)] |
| |
| def setup(self, num_partitions, num_threads): |
| if pq is None: |
| raise NotImplementedError("Parquet support not enabled") |
| |
| self.tmpdir = tempfile.mkdtemp('benchmark_parquet') |
| rnd = np.random.RandomState(42) |
| num1 = rnd.randint(0, num_partitions, size=self.size) |
| num2 = rnd.randint(0, 1000, size=self.size) |
| output_df = pd.DataFrame({'num1': num1, 'num2': num2}) |
| output_table = pa.Table.from_pandas(output_df) |
| pq.write_to_dataset(output_table, self.tmpdir, ['num1']) |
| |
| def teardown(self, num_partitions, num_threads): |
| if self.tmpdir is not None: |
| shutil.rmtree(self.tmpdir) |
| |
| def time_manifest_creation(self, num_partitions, num_threads): |
| pq.ParquetManifest(self.tmpdir, metadata_nthreads=num_threads) |
| |
| |
| class ParquetWriteBinary(object): |
| |
| def setup(self): |
| nuniques = 100000 |
| value_size = 50 |
| length = 1000000 |
| num_cols = 10 |
| |
| unique_values = np.array([rands(value_size) for |
| i in range(nuniques)], dtype='O') |
| values = unique_values[np.random.randint(0, nuniques, size=length)] |
| self.table = pa.table([pa.array(values) for i in range(num_cols)], |
| names=['f{}'.format(i) for i in range(num_cols)]) |
| self.table_df = self.table.to_pandas() |
| |
| def time_write_binary_table(self): |
| out = pa.BufferOutputStream() |
| pq.write_table(self.table, out) |
| |
| def time_write_binary_table_uncompressed(self): |
| out = pa.BufferOutputStream() |
| pq.write_table(self.table, out, compression='none') |
| |
| def time_write_binary_table_no_dictionary(self): |
| out = pa.BufferOutputStream() |
| pq.write_table(self.table, out, use_dictionary=False) |
| |
| def time_convert_pandas_and_write_binary_table(self): |
| out = pa.BufferOutputStream() |
| pq.write_table(pa.table(self.table_df), out) |
| |
| |
| def generate_dict_strings(string_size, nunique, length, random_order=True): |
| uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O') |
| if random_order: |
| indices = np.random.randint(0, nunique, size=length).astype('i4') |
| else: |
| indices = np.arange(nunique).astype('i4').repeat(length // nunique) |
| return pa.DictionaryArray.from_arrays(indices, uniques) |
| |
| |
| def generate_dict_table(num_cols, string_size, nunique, length, |
| random_order=True): |
| data = generate_dict_strings(string_size, nunique, length, |
| random_order=random_order) |
| return pa.table([ |
| data for i in range(num_cols) |
| ], names=['f{}'.format(i) for i in range(num_cols)]) |
| |
| |
| class ParquetWriteDictionaries(object): |
| |
| param_names = ('nunique',) |
| params = [(1000), (100000)] |
| |
| def setup(self, nunique): |
| self.num_cols = 10 |
| self.value_size = 32 |
| self.nunique = nunique |
| self.length = 10000000 |
| |
| self.table = generate_dict_table(self.num_cols, self.value_size, |
| self.nunique, self.length) |
| self.table_sequential = generate_dict_table(self.num_cols, |
| self.value_size, |
| self.nunique, self.length, |
| random_order=False) |
| |
| def time_write_random_order(self, nunique): |
| pq.write_table(self.table, pa.BufferOutputStream()) |
| |
| def time_write_sequential(self, nunique): |
| pq.write_table(self.table_sequential, pa.BufferOutputStream()) |
| |
| |
| class ParquetManyColumns(object): |
| |
| total_cells = 10000000 |
| param_names = ('num_cols',) |
| params = [100, 1000, 10000] |
| |
| def setup(self, num_cols): |
| num_rows = self.total_cells // num_cols |
| self.table = pa.table({'c' + str(i): np.random.randn(num_rows) |
| for i in range(num_cols)}) |
| |
| out = pa.BufferOutputStream() |
| pq.write_table(self.table, out) |
| self.buf = out.getvalue() |
| |
| def time_write(self, num_cols): |
| out = pa.BufferOutputStream() |
| pq.write_table(self.table, out) |
| |
| def time_read(self, num_cols): |
| pq.read_table(self.buf) |