Merge branch 'deps-updates-2' into trunk
diff --git a/CHANGES.rst b/CHANGES.rst
index cffa856..970d5f6 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -4,6 +4,16 @@
Changes in Apache Libcloud in development
-----------------------------------------
+Storage
+~~~~~~~
+
+- Optimize ``read_in_chunks()`` function implementation.
+
+ This should result in large performance speedups and lower memory usage when
+ uploading or downloading a large file with a mismatching chunk size.
+ (GITHUB-1847)
+ [Tobias Biester - @Tobi995]
+
Changes in Apache Libcloud 3.7.0
--------------------------------
diff --git a/libcloud/test/benchmarks/test_read_in_chunks.py b/libcloud/test/benchmarks/test_read_in_chunks.py
new file mode 100644
index 0000000..694e9fb
--- /dev/null
+++ b/libcloud/test/benchmarks/test_read_in_chunks.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Tuple, Callable
+
+import pytest
+
+from libcloud.utils.py3 import b, next
+from libcloud.utils.files import CHUNK_SIZE, read_in_chunks
+
+
+def _old_read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
+ """
+ Old implementation of read_in_chunks without performance optimizations from #1847.
+
+ It's only here so we can directly measure compare performance of old and the new version.
+ """
+ chunk_size = chunk_size or CHUNK_SIZE
+
+ try:
+ get_data = iterator.read
+ args = (chunk_size,)
+ except AttributeError:
+ get_data = next
+ args = (iterator,)
+
+ data = b("")
+ empty = False
+
+ while not empty or len(data) > 0:
+ if not empty:
+ try:
+ chunk = b(get_data(*args))
+ if len(chunk) > 0:
+ data += chunk
+ else:
+ empty = True
+ except StopIteration:
+ empty = True
+
+ if len(data) == 0:
+ if empty and yield_empty:
+ yield b("")
+
+ return
+
+ if fill_size:
+ if empty or len(data) >= chunk_size:
+ yield data[:chunk_size]
+ data = data[chunk_size:]
+ else:
+ yield data
+ data = b("")
+
+
+# fmt: off
+@pytest.mark.parametrize(
+ "data_chunk_size_tuple",
+ [
+ (b"c" * (40 * 1024 * 1024), 1 * 1024 * 1024),
+ (b"c" * (40 * 1024 * 1024), 5 * 1024 * 1024),
+ (b"c" * (80 * 1024 * 1024), 1 * 1024 * 1024),
+ ],
+ ids=[
+ "40mb_data_1mb_chunk_size",
+ "40mb_data_5mb_chunk_size",
+ "80mb_data_1mb_chunk_size",
+ ],
+)
+@pytest.mark.parametrize(
+ "read_in_chunks_func",
+ [
+ _old_read_in_chunks,
+ read_in_chunks,
+ ],
+ ids=[
+ "old",
+ "new",
+ ],
+)
+# fmt: on
+def test_scenario_1(
+ benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
+):
+ # similar to calling _upload_multipart_chunks with one large array of bytes
+ data, chunk_size = data_chunk_size_tuple
+
+ def run_benchmark():
+ for _ in read_in_chunks_func(iter([data]), chunk_size=chunk_size, fill_size=True):
+ pass
+
+ benchmark(run_benchmark)
+
+
+# fmt: off
+# NOTE: Because the old implementation is very slow when there is a chunk size mismatch, we need to
+# use smaller total objects size to prevent this benchmark from running for a very long time.
+@pytest.mark.parametrize(
+ "data_chunk_size_tuple",
+ [
+ (b"c" * (10 * 1024 * 1024), 8 * 1024),
+ (b"c" * (20 * 1024 * 1024), 1 * 1024 * 1024),
+ (b"c" * (30 * 1024 * 1024), 1 * 1024 * 1024),
+ ],
+ ids=[
+ "10mb_data_8k_chunk_size",
+ "20mb_data_1mb_chunk_size",
+ "30mb_data_1mb_chunk_size",
+ ],
+)
+@pytest.mark.parametrize(
+ "read_in_chunks_func",
+ [
+ _old_read_in_chunks,
+ read_in_chunks,
+ ],
+ ids=[
+ "old",
+ "new",
+ ],
+)
+# fmt: on
+def test_scenario_2(
+ benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
+):
+ # similar to calling _upload_multipart_chunks with one large array of bytes
+ data, chunk_size = data_chunk_size_tuple
+ response_chunk = 5 * 1024 * 1024
+
+ # NOTE: It would be nice if we could also assert that data has been correctly add, but this
+ # would add additional overhead (since we would also measure accumulating this data) so we have
+ # those checks done separately in the unit tests.
+ def run_benchmark():
+ for a in read_in_chunks_func(
+ iter([data[i : i + response_chunk] for i in range(0, len(data), response_chunk)]),
+ chunk_size=chunk_size,
+ fill_size=True,
+ ):
+ pass
+
+ benchmark(run_benchmark)
diff --git a/libcloud/test/test_utils.py b/libcloud/test/test_utils.py
index c6fc161..395bfea 100644
--- a/libcloud/test/test_utils.py
+++ b/libcloud/test/test_utils.py
@@ -202,15 +202,42 @@
for x in range(0, 1000):
yield "aa"
+ chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=False
):
+ chunk_count += 1
self.assertEqual(result, b("aa"))
+ self.assertEqual(chunk_count, 1000)
+ chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=True
):
- self.assertEqual(result, b("aaaaaaaaaa"))
+ chunk_count += 1
+ self.assertEqual(result, b("a") * 10)
+ self.assertEqual(chunk_count, 200)
+
+ def test_read_in_chunks_large_iterator_batches(self):
+ def iterator():
+ for x in range(0, 10):
+ yield "a" * 10_000
+
+ chunk_count = 0
+ for result in libcloud.utils.files.read_in_chunks(
+ iterator(), chunk_size=10, fill_size=False
+ ):
+ chunk_count += 1
+ self.assertEqual(result, b("a") * 10_000)
+ self.assertEqual(chunk_count, 10)
+
+ chunk_count = 0
+ for result in libcloud.utils.files.read_in_chunks(
+ iterator(), chunk_size=10, fill_size=True
+ ):
+ chunk_count += 1
+ self.assertEqual(result, b("a") * 10)
+ self.assertEqual(chunk_count, 10_000)
def test_read_in_chunks_filelike(self):
class FakeFile(file):
diff --git a/libcloud/utils/files.py b/libcloud/utils/files.py
index 3ab0bbc..fdf1ef7 100644
--- a/libcloud/utils/files.py
+++ b/libcloud/utils/files.py
@@ -15,6 +15,7 @@
import os
import mimetypes
+from typing import Generator
from libcloud.utils.py3 import b, next
@@ -75,7 +76,9 @@
return
if fill_size:
- if empty or len(data) >= chunk_size:
+ data = yield from _optimized_chunked_generator(data=data, chunk_size=chunk_size)
+ if empty:
+ # Yield last not completely filled chunk
yield data[:chunk_size]
data = data[chunk_size:]
else:
@@ -83,6 +86,18 @@
data = b("")
+def _optimized_chunked_generator(data: bytes, chunk_size: int) -> Generator[bytes, None, bytes]:
+ # We want to emit chunk_size large chunks, but chunk_size can be larger or smaller than the chunks returned
+ # by get_data. We need to yield in a loop to avoid large amounts of data piling up.
+ # The loop also avoids copying all data #chunks amount of times by keeping the original data as is.
+ chunk_start = 0
+ while chunk_start + chunk_size < len(data):
+ yield data[chunk_start : chunk_start + chunk_size]
+ chunk_start += chunk_size
+ data = data[chunk_start:]
+ return data
+
+
def exhaust_iterator(iterator):
"""
Exhaust an iterator and return all data returned by it.
diff --git a/tox.ini b/tox.ini
index 47db058..efdf9ec 100644
--- a/tox.ini
+++ b/tox.ini
@@ -387,6 +387,7 @@
commands =
cp libcloud/test/secrets.py-dist libcloud/test/secrets.py
pytest -s -v --timeout 60 --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-histogram=benchmark_histograms/benchmark --benchmark-group-by=group,param:sort_objects libcloud/test/benchmarks/test_list_objects_filtering_performance.py
+ pytest -s -v --timeout 60 --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-histogram=benchmark_histograms/benchmark --benchmark-group-by=group,func,param:read_in_chunks_func libcloud/test/benchmarks/test_read_in_chunks.py
[testenv:import-timings]
setenv =