blob: 3156d42d24b1d90c045169e2ee37d45150baf422 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import AsyncGenerator, Generator
from burr.common.async_utils import arealize, asyncify_generator
async def test_asyncify_generator_with_sync_generator():
def sync_gen() -> Generator[int, None, None]:
for i in range(5):
yield i
generator = sync_gen()
async_generator = asyncify_generator(generator)
result = [item async for item in async_generator]
assert result == [0, 1, 2, 3, 4], "Should convert sync generator to async generator"
async def test_asyncify_generator_with_async_generator():
async def async_gen() -> AsyncGenerator[int, None]:
for i in range(5):
yield i
generator = async_gen()
async_generator = asyncify_generator(generator)
result = [item async for item in async_generator]
assert result == [0, 1, 2, 3, 4], "Should pass through async generator unchanged"
# Test cases for arealize
async def test_arealize_with_sync_generator():
def sync_gen() -> Generator[int, None, None]:
for i in range(5):
yield i
generator = sync_gen()
result = await arealize(generator)
assert result == [0, 1, 2, 3, 4], "Should convert sync generator to list"
async def test_arealize_with_async_generator():
async def async_gen() -> AsyncGenerator[int, None]:
for i in range(5):
yield i
generator = async_gen()
result = await arealize(generator)
assert result == [0, 1, 2, 3, 4], "Should realize async generator to list"
# Edge cases
async def test_asyncify_generator_empty_sync_generator():
def sync_gen() -> Generator[int, None, None]:
if False: # No items to yield
yield
generator = sync_gen()
async_generator = asyncify_generator(generator)
result = [item async for item in async_generator]
assert result == [], "Should handle empty sync generator"
async def test_arealize_empty_sync_generator():
def sync_gen() -> Generator[int, None, None]:
if False: # No items to yield
yield
generator = sync_gen()
result = await arealize(generator)
assert result == [], "Should handle empty sync generator to list"
async def test_asyncify_generator_empty_async_generator():
async def async_gen() -> AsyncGenerator[int, None]:
if False: # No items to yield
yield
generator = async_gen()
async_generator = asyncify_generator(generator)
result = [item async for item in async_generator]
assert result == [], "Should handle empty async generator"
async def test_arealize_empty_async_generator():
async def async_gen() -> AsyncGenerator[int, None]:
if False: # No items to yield
yield
generator = async_gen()
result = await arealize(generator)
assert result == [], "Should handle empty async generator to list"