Merge branch 'deps-updates-2' into trunk
diff --git a/CHANGES.rst b/CHANGES.rst
index cffa856..970d5f6 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -4,6 +4,16 @@
 Changes in Apache Libcloud in development
 -----------------------------------------
 
+Storage
+~~~~~~~
+
+- Optimize ``read_in_chunks()`` function implementation.
+
+  This should result in large performance speedups and lower memory usage when
+  uploading or downloading a large file with a mismatching chunk size.
+  (GITHUB-1847)
+  [Tobias Biester - @Tobi995]
+
 Changes in Apache Libcloud 3.7.0
 --------------------------------
 
diff --git a/libcloud/test/benchmarks/test_read_in_chunks.py b/libcloud/test/benchmarks/test_read_in_chunks.py
new file mode 100644
index 0000000..694e9fb
--- /dev/null
+++ b/libcloud/test/benchmarks/test_read_in_chunks.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Tuple, Callable
+
+import pytest
+
+from libcloud.utils.py3 import b, next
+from libcloud.utils.files import CHUNK_SIZE, read_in_chunks
+
+
+def _old_read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
+    """
+    Old implementation of read_in_chunks without performance optimizations from #1847.
+
+    It's only here so we can directly measure compare performance of old and the new version.
+    """
+    chunk_size = chunk_size or CHUNK_SIZE
+
+    try:
+        get_data = iterator.read
+        args = (chunk_size,)
+    except AttributeError:
+        get_data = next
+        args = (iterator,)
+
+    data = b("")
+    empty = False
+
+    while not empty or len(data) > 0:
+        if not empty:
+            try:
+                chunk = b(get_data(*args))
+                if len(chunk) > 0:
+                    data += chunk
+                else:
+                    empty = True
+            except StopIteration:
+                empty = True
+
+        if len(data) == 0:
+            if empty and yield_empty:
+                yield b("")
+
+            return
+
+        if fill_size:
+            if empty or len(data) >= chunk_size:
+                yield data[:chunk_size]
+                data = data[chunk_size:]
+        else:
+            yield data
+            data = b("")
+
+
+# fmt: off
+@pytest.mark.parametrize(
+    "data_chunk_size_tuple",
+    [
+        (b"c" * (40 * 1024 * 1024), 1 * 1024 * 1024),
+        (b"c" * (40 * 1024 * 1024), 5 * 1024 * 1024),
+        (b"c" * (80 * 1024 * 1024), 1 * 1024 * 1024),
+    ],
+    ids=[
+        "40mb_data_1mb_chunk_size",
+        "40mb_data_5mb_chunk_size",
+        "80mb_data_1mb_chunk_size",
+    ],
+)
+@pytest.mark.parametrize(
+    "read_in_chunks_func",
+    [
+        _old_read_in_chunks,
+        read_in_chunks,
+    ],
+    ids=[
+        "old",
+        "new",
+    ],
+)
+# fmt: on
+def test_scenario_1(
+    benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
+):
+    # similar to calling _upload_multipart_chunks with one large array of bytes
+    data, chunk_size = data_chunk_size_tuple
+
+    def run_benchmark():
+        for _ in read_in_chunks_func(iter([data]), chunk_size=chunk_size, fill_size=True):
+            pass
+
+    benchmark(run_benchmark)
+
+
+# fmt: off
+# NOTE: Because the old implementation is very slow when there is a chunk size mismatch, we need to
+# use smaller total objects size to prevent this benchmark from running for a very long time.
+@pytest.mark.parametrize(
+    "data_chunk_size_tuple",
+    [
+        (b"c" * (10 * 1024 * 1024), 8 * 1024),
+        (b"c" * (20 * 1024 * 1024), 1 * 1024 * 1024),
+        (b"c" * (30 * 1024 * 1024), 1 * 1024 * 1024),
+    ],
+    ids=[
+        "10mb_data_8k_chunk_size",
+        "20mb_data_1mb_chunk_size",
+        "30mb_data_1mb_chunk_size",
+    ],
+)
+@pytest.mark.parametrize(
+    "read_in_chunks_func",
+    [
+        _old_read_in_chunks,
+        read_in_chunks,
+    ],
+    ids=[
+        "old",
+        "new",
+    ],
+)
+# fmt: on
+def test_scenario_2(
+    benchmark, data_chunk_size_tuple: Tuple[bytes, int], read_in_chunks_func: Callable
+):
+    # similar to calling _upload_multipart_chunks with one large array of bytes
+    data, chunk_size = data_chunk_size_tuple
+    response_chunk = 5 * 1024 * 1024
+
+    # NOTE: It would be nice if we could also assert that data has been correctly add, but this
+    # would add additional overhead (since we would also measure accumulating this data) so we have
+    # those checks done separately in the unit tests.
+    def run_benchmark():
+        for a in read_in_chunks_func(
+            iter([data[i : i + response_chunk] for i in range(0, len(data), response_chunk)]),
+            chunk_size=chunk_size,
+            fill_size=True,
+        ):
+            pass
+
+    benchmark(run_benchmark)
diff --git a/libcloud/test/test_utils.py b/libcloud/test/test_utils.py
index c6fc161..395bfea 100644
--- a/libcloud/test/test_utils.py
+++ b/libcloud/test/test_utils.py
@@ -202,15 +202,42 @@
             for x in range(0, 1000):
                 yield "aa"
 
+        chunk_count = 0
         for result in libcloud.utils.files.read_in_chunks(
             iterator(), chunk_size=10, fill_size=False
         ):
+            chunk_count += 1
             self.assertEqual(result, b("aa"))
+        self.assertEqual(chunk_count, 1000)
 
+        chunk_count = 0
         for result in libcloud.utils.files.read_in_chunks(
             iterator(), chunk_size=10, fill_size=True
         ):
-            self.assertEqual(result, b("aaaaaaaaaa"))
+            chunk_count += 1
+            self.assertEqual(result, b("a") * 10)
+        self.assertEqual(chunk_count, 200)
+
+    def test_read_in_chunks_large_iterator_batches(self):
+        def iterator():
+            for x in range(0, 10):
+                yield "a" * 10_000
+
+        chunk_count = 0
+        for result in libcloud.utils.files.read_in_chunks(
+            iterator(), chunk_size=10, fill_size=False
+        ):
+            chunk_count += 1
+            self.assertEqual(result, b("a") * 10_000)
+        self.assertEqual(chunk_count, 10)
+
+        chunk_count = 0
+        for result in libcloud.utils.files.read_in_chunks(
+            iterator(), chunk_size=10, fill_size=True
+        ):
+            chunk_count += 1
+            self.assertEqual(result, b("a") * 10)
+        self.assertEqual(chunk_count, 10_000)
 
     def test_read_in_chunks_filelike(self):
         class FakeFile(file):
diff --git a/libcloud/utils/files.py b/libcloud/utils/files.py
index 3ab0bbc..fdf1ef7 100644
--- a/libcloud/utils/files.py
+++ b/libcloud/utils/files.py
@@ -15,6 +15,7 @@
 
 import os
 import mimetypes
+from typing import Generator
 
 from libcloud.utils.py3 import b, next
 
@@ -75,7 +76,9 @@
             return
 
         if fill_size:
-            if empty or len(data) >= chunk_size:
+            data = yield from _optimized_chunked_generator(data=data, chunk_size=chunk_size)
+            if empty:
+                # Yield last not completely filled chunk
                 yield data[:chunk_size]
                 data = data[chunk_size:]
         else:
@@ -83,6 +86,18 @@
             data = b("")
 
 
+def _optimized_chunked_generator(data: bytes, chunk_size: int) -> Generator[bytes, None, bytes]:
+    # We want to emit chunk_size large chunks, but chunk_size can be larger or smaller than the chunks returned
+    # by get_data. We need to yield in a loop to avoid large amounts of data piling up.
+    # The loop also avoids copying all data #chunks amount of times by keeping the original data as is.
+    chunk_start = 0
+    while chunk_start + chunk_size < len(data):
+        yield data[chunk_start : chunk_start + chunk_size]
+        chunk_start += chunk_size
+    data = data[chunk_start:]
+    return data
+
+
 def exhaust_iterator(iterator):
     """
     Exhaust an iterator and return all data returned by it.
diff --git a/tox.ini b/tox.ini
index 47db058..efdf9ec 100644
--- a/tox.ini
+++ b/tox.ini
@@ -387,6 +387,7 @@
 commands =
     cp libcloud/test/secrets.py-dist libcloud/test/secrets.py
     pytest -s -v --timeout 60 --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-histogram=benchmark_histograms/benchmark  --benchmark-group-by=group,param:sort_objects libcloud/test/benchmarks/test_list_objects_filtering_performance.py
+    pytest -s -v --timeout 60 --benchmark-only --benchmark-name=short --benchmark-columns=min,max,mean,stddev,median,ops,rounds --benchmark-histogram=benchmark_histograms/benchmark --benchmark-group-by=group,func,param:read_in_chunks_func libcloud/test/benchmarks/test_read_in_chunks.py
 
 [testenv:import-timings]
 setenv =