| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import time |
| import pyarrow as pa |
| |
| |
| class HighLatencyReader(object): |
| |
| def __init__(self, raw, latency): |
| self.raw = raw |
| self.latency = latency |
| |
| def close(self): |
| self.raw.close() |
| |
| @property |
| def closed(self): |
| return self.raw.closed |
| |
| def read(self, nbytes=None): |
| time.sleep(self.latency) |
| return self.raw.read(nbytes) |
| |
| |
| class HighLatencyWriter(object): |
| |
| def __init__(self, raw, latency): |
| self.raw = raw |
| self.latency = latency |
| |
| def close(self): |
| self.raw.close() |
| |
| @property |
| def closed(self): |
| return self.raw.closed |
| |
| def write(self, data): |
| time.sleep(self.latency) |
| self.raw.write(data) |
| |
| |
| class BufferedIOHighLatency(object): |
| """Benchmark creating a parquet manifest.""" |
| |
| increment = 1024 |
| total_size = 16 * (1 << 20) # 16 MB |
| buffer_size = 1 << 20 # 1 MB |
| latency = 0.1 # 100ms |
| |
| param_names = ('latency',) |
| params = [0, 0.01, 0.1] |
| |
| def time_buffered_writes(self, latency): |
| test_data = b'x' * self.increment |
| bytes_written = 0 |
| out = pa.BufferOutputStream() |
| slow_out = HighLatencyWriter(out, latency) |
| buffered_out = pa.output_stream(slow_out, buffer_size=self.buffer_size) |
| |
| while bytes_written < self.total_size: |
| buffered_out.write(test_data) |
| bytes_written += self.increment |
| buffered_out.flush() |
| |
| def time_buffered_reads(self, latency): |
| bytes_read = 0 |
| reader = pa.input_stream(pa.py_buffer(b'x' * self.total_size)) |
| slow_reader = HighLatencyReader(reader, latency) |
| buffered_reader = pa.input_stream(slow_reader, |
| buffer_size=self.buffer_size) |
| while bytes_read < self.total_size: |
| buffered_reader.read(self.increment) |
| bytes_read += self.increment |