blob: d3984a89ef754a05e46347e8aca1801072ea2fcc [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyarrow as pa
import numpy as np
import pandas as pd
import pandas.util.testing as tm
import memory_profiler
import gc
import io
MEGABYTE = 1 << 20
def assert_does_not_leak(f, iterations=10, check_interval=1, tolerance=5):
gc.collect()
baseline = memory_profiler.memory_usage()[0]
for i in range(iterations):
f()
if i % check_interval == 0:
gc.collect()
usage = memory_profiler.memory_usage()[0]
diff = usage - baseline
print("{0}: {1}\r".format(i, diff), end="")
if diff > tolerance:
raise Exception("Memory increased by {0} megabytes after {1} "
"iterations".format(diff, i + 1))
gc.collect()
usage = memory_profiler.memory_usage()[0]
diff = usage - baseline
print("\nMemory increased by {0} megabytes after {1} "
"iterations".format(diff, iterations))
def test_leak1():
data = [pa.array(np.concatenate([np.random.randn(100000)] * 1000))]
table = pa.Table.from_arrays(data, ['foo'])
def func():
table.to_pandas()
assert_does_not_leak(func)
def test_leak2():
data = [pa.array(np.concatenate([np.random.randn(100000)] * 10))]
table = pa.Table.from_arrays(data, ['foo'])
def func():
df = table.to_pandas()
batch = pa.RecordBatch.from_pandas(df)
sink = io.BytesIO()
writer = pa.RecordBatchFileWriter(sink, batch.schema)
writer.write_batch(batch)
writer.close()
buf_reader = pa.BufferReader(sink.getvalue())
reader = pa.open_file(buf_reader)
reader.read_all()
assert_does_not_leak(func, iterations=50, tolerance=50)
def test_leak3():
import pyarrow.parquet as pq
df = pd.DataFrame({'a{0}'.format(i): [1, 2, 3, 4]
for i in range(50)})
table = pa.Table.from_pandas(df, preserve_index=False)
writer = pq.ParquetWriter('leak_test_' + tm.rands(5) + '.parquet',
table.schema)
def func():
writer.write_table(table, row_group_size=len(table))
# This does not "leak" per se but we do want to have this use as little
# memory as possible
assert_does_not_leak(func, iterations=500,
check_interval=50, tolerance=20)
if __name__ == '__main__':
test_leak3()