blob: 694e9fbb7b867d4d75345e84d7251eafa09830a5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Tuple, Callable
import pytest
from libcloud.utils.py3 import b, next
from libcloud.utils.files import CHUNK_SIZE, read_in_chunks
def _old_read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
"""
Old implementation of read_in_chunks without performance optimizations from #1847.
It's only here so we can directly measure compare performance of old and the new version.
"""
chunk_size = chunk_size or CHUNK_SIZE
try:
get_data = iterator.read
args = (chunk_size,)
except AttributeError:
get_data = next
args = (iterator,)
data = b("")
empty = False
while not empty or len(data) > 0:
if not empty:
try:
chunk = b(get_data(*args))
if len(chunk) > 0:
data += chunk
else:
empty = True
except StopIteration:
empty = True
if len(data) == 0:
if empty and yield_empty:
yield b("")
return
if fill_size:
if empty or len(data) >= chunk_size:
yield data[:chunk_size]
data = data[chunk_size:]
else:
yield data
data = b("")
# fmt: off
@pytest.mark.parametrize(
"data_chunk_size_tuple",
[
(b"c" * (40 * 1024 * 1024), 1 * 1024 * 1024),
(b"c" * (40 * 1024 * 1024), 5 * 1024 * 1024),
(b"c" * (80 * 1024 * 1024), 1 * 1024 * 1024),
],
ids=[
"40mb_data_1mb_chunk_size",
"40mb_data_5mb_chunk_size",
"80mb_data_1mb_chunk_size",
],
)
@pytest.mark.parametrize(
"read_in_chunks_func",
[
_old_read_in_chunks,
read_in_chunks,
],
ids=[
"old",
"new",
],
)
# fmt: on
def test_scenario_1(
benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
):
# similar to calling _upload_multipart_chunks with one large array of bytes
data, chunk_size = data_chunk_size_tuple
def run_benchmark():
for _ in read_in_chunks_func(iter([data]), chunk_size=chunk_size, fill_size=True):
pass
benchmark(run_benchmark)
# fmt: off
# NOTE: Because the old implementation is very slow when there is a chunk size mismatch, we need to
# use smaller total objects size to prevent this benchmark from running for a very long time.
@pytest.mark.parametrize(
"data_chunk_size_tuple",
[
(b"c" * (10 * 1024 * 1024), 8 * 1024),
(b"c" * (20 * 1024 * 1024), 1 * 1024 * 1024),
(b"c" * (30 * 1024 * 1024), 1 * 1024 * 1024),
],
ids=[
"10mb_data_8k_chunk_size",
"20mb_data_1mb_chunk_size",
"30mb_data_1mb_chunk_size",
],
)
@pytest.mark.parametrize(
"read_in_chunks_func",
[
_old_read_in_chunks,
read_in_chunks,
],
ids=[
"old",
"new",
],
)
# fmt: on
def test_scenario_2(
benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
):
# similar to calling _upload_multipart_chunks with one large array of bytes
data, chunk_size = data_chunk_size_tuple
response_chunk = 5 * 1024 * 1024
# NOTE: It would be nice if we could also assert that data has been correctly add, but this
# would add additional overhead (since we would also measure accumulating this data) so we have
# those checks done separately in the unit tests.
def run_benchmark():
for a in read_in_chunks_func(
iter([data[i : i + response_chunk] for i in range(0, len(data), response_chunk)]),
chunk_size=chunk_size,
fill_size=True,
):
pass
benchmark(run_benchmark)