blob: 3aeca425bc8f0aa7b20a43139f1c33384e11c0e4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import shutil
import tempfile
from pandas.util.testing import rands
import numpy as np
import pandas as pd
import pyarrow as pa
try:
import pyarrow.parquet as pq
except ImportError:
pq = None
class ParquetManifestCreation(object):
"""Benchmark creating a parquet manifest."""
size = 10 ** 6
tmpdir = None
param_names = ('num_partitions', 'num_threads')
params = [(10, 100, 1000), (1, 8)]
def setup(self, num_partitions, num_threads):
if pq is None:
raise NotImplementedError("Parquet support not enabled")
self.tmpdir = tempfile.mkdtemp('benchmark_parquet')
rnd = np.random.RandomState(42)
num1 = rnd.randint(0, num_partitions, size=self.size)
num2 = rnd.randint(0, 1000, size=self.size)
output_df = pd.DataFrame({'num1': num1, 'num2': num2})
output_table = pa.Table.from_pandas(output_df)
pq.write_to_dataset(output_table, self.tmpdir, ['num1'])
def teardown(self, num_partitions, num_threads):
if self.tmpdir is not None:
shutil.rmtree(self.tmpdir)
def time_manifest_creation(self, num_partitions, num_threads):
pq.ParquetManifest(self.tmpdir, metadata_nthreads=num_threads)
class ParquetWriteBinary(object):
def setup(self):
nuniques = 100000
value_size = 50
length = 1000000
num_cols = 10
unique_values = np.array([rands(value_size) for
i in range(nuniques)], dtype='O')
values = unique_values[np.random.randint(0, nuniques, size=length)]
self.table = pa.table([pa.array(values) for i in range(num_cols)],
names=['f{}'.format(i) for i in range(num_cols)])
self.table_df = self.table.to_pandas()
def time_write_binary_table(self):
out = pa.BufferOutputStream()
pq.write_table(self.table, out)
def time_write_binary_table_uncompressed(self):
out = pa.BufferOutputStream()
pq.write_table(self.table, out, compression='none')
def time_write_binary_table_no_dictionary(self):
out = pa.BufferOutputStream()
pq.write_table(self.table, out, use_dictionary=False)
def time_convert_pandas_and_write_binary_table(self):
out = pa.BufferOutputStream()
pq.write_table(pa.table(self.table_df), out)
def generate_dict_strings(string_size, nunique, length, random_order=True):
uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O')
if random_order:
indices = np.random.randint(0, nunique, size=length).astype('i4')
else:
indices = np.arange(nunique).astype('i4').repeat(length // nunique)
return pa.DictionaryArray.from_arrays(indices, uniques)
def generate_dict_table(num_cols, string_size, nunique, length,
random_order=True):
data = generate_dict_strings(string_size, nunique, length,
random_order=random_order)
return pa.table([
data for i in range(num_cols)
], names=['f{}'.format(i) for i in range(num_cols)])
class ParquetWriteDictionaries(object):
param_names = ('nunique',)
params = [(1000), (100000)]
def setup(self, nunique):
self.num_cols = 10
self.value_size = 32
self.nunique = nunique
self.length = 10000000
self.table = generate_dict_table(self.num_cols, self.value_size,
self.nunique, self.length)
self.table_sequential = generate_dict_table(self.num_cols,
self.value_size,
self.nunique, self.length,
random_order=False)
def time_write_random_order(self, nunique):
pq.write_table(self.table, pa.BufferOutputStream())
def time_write_sequential(self, nunique):
pq.write_table(self.table_sequential, pa.BufferOutputStream())
class ParquetManyColumns(object):
total_cells = 10000000
param_names = ('num_cols',)
params = [100, 1000, 10000]
def setup(self, num_cols):
num_rows = self.total_cells // num_cols
self.table = pa.table({'c' + str(i): np.random.randn(num_rows)
for i in range(num_cols)})
out = pa.BufferOutputStream()
pq.write_table(self.table, out)
self.buf = out.getvalue()
def time_write(self, num_cols):
out = pa.BufferOutputStream()
pq.write_table(self.table, out)
def time_read(self, num_cols):
pq.read_table(self.buf)