| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_work_queue_tests). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| -define(TIMEOUT, 100). |
| |
| |
| setup(Opts) -> |
| {ok, Q} = couch_work_queue:new(Opts), |
| Producer = spawn_producer(Q), |
| Consumer = spawn_consumer(Q), |
| {Q, Producer, Consumer}. |
| |
| setup_max_items() -> |
| setup([{max_items, 3}]). |
| |
| setup_max_size() -> |
| setup([{max_size, 160}]). |
| |
| setup_max_items_and_size() -> |
| setup([{max_size, 160}, {max_items, 3}]). |
| |
| setup_multi_workers() -> |
| {Q, Producer, Consumer1} = setup([{max_size, 160}, |
| {max_items, 3}, |
| {multi_workers, true}]), |
| Consumer2 = spawn_consumer(Q), |
| Consumer3 = spawn_consumer(Q), |
| {Q, Producer, [Consumer1, Consumer2, Consumer3]}. |
| |
| teardown({Q, Producer, Consumers}) when is_list(Consumers) -> |
| % consume all to unblock and let producer/consumer stop without timeout |
| [consume(Consumer, all) || Consumer <- Consumers], |
| |
| ok = close_queue(Q), |
| ok = stop(Producer, "producer"), |
| R = [stop(Consumer, "consumer") || Consumer <- Consumers], |
| R = [ok || _ <- Consumers], |
| ok; |
| teardown({Q, Producer, Consumer}) -> |
| teardown({Q, Producer, [Consumer]}). |
| |
| |
| single_consumer_test_() -> |
| { |
| "Single producer and consumer", |
| [ |
| { |
| "Queue with 3 max items", |
| { |
| foreach, |
| fun setup_max_items/0, fun teardown/1, |
| single_consumer_max_item_count() ++ common_cases() |
| } |
| }, |
| { |
| "Queue with max size of 160 bytes", |
| { |
| foreach, |
| fun setup_max_size/0, fun teardown/1, |
| single_consumer_max_size() ++ common_cases() |
| } |
| }, |
| { |
| "Queue with max size of 160 bytes and 3 max items", |
| { |
| foreach, |
| fun setup_max_items_and_size/0, fun teardown/1, |
| single_consumer_max_items_and_size() ++ common_cases() |
| } |
| } |
| ] |
| }. |
| |
| multiple_consumers_test_() -> |
| { |
| "Single producer and multiple consumers", |
| [ |
| { |
| "Queue with max size of 160 bytes and 3 max items", |
| { |
| foreach, |
| fun setup_multi_workers/0, fun teardown/1, |
| common_cases() ++ multiple_consumers() |
| } |
| |
| } |
| ] |
| }. |
| |
| common_cases()-> |
| [ |
| fun should_block_consumer_on_dequeue_from_empty_queue/1, |
| fun should_consume_right_item/1, |
| fun should_timeout_on_close_non_empty_queue/1, |
| fun should_not_block_producer_for_non_empty_queue_after_close/1, |
| fun should_be_closed/1 |
| ]. |
| |
| single_consumer_max_item_count()-> |
| [ |
| fun should_have_no_items_for_new_queue/1, |
| fun should_block_producer_on_full_queue_count/1, |
| fun should_receive_first_queued_item/1, |
| fun should_consume_multiple_items/1, |
| fun should_consume_all/1 |
| ]. |
| |
| single_consumer_max_size()-> |
| [ |
| fun should_have_zero_size_for_new_queue/1, |
| fun should_block_producer_on_full_queue_size/1, |
| fun should_increase_queue_size_on_produce/1, |
| fun should_receive_first_queued_item/1, |
| fun should_consume_multiple_items/1, |
| fun should_consume_all/1 |
| ]. |
| |
| single_consumer_max_items_and_size() -> |
| single_consumer_max_item_count() ++ single_consumer_max_size(). |
| |
| multiple_consumers() -> |
| [ |
| fun should_have_zero_size_for_new_queue/1, |
| fun should_have_no_items_for_new_queue/1, |
| fun should_increase_queue_size_on_produce/1 |
| ]. |
| |
| |
| should_have_no_items_for_new_queue({Q, _, _}) -> |
| ?_assertEqual(0, couch_work_queue:item_count(Q)). |
| |
| should_have_zero_size_for_new_queue({Q, _, _}) -> |
| ?_assertEqual(0, couch_work_queue:size(Q)). |
| |
| should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) -> |
| [consume(C, 2) || C <- Consumers], |
| Pongs = [ping(C) || C <- Consumers], |
| ?_assertEqual([timeout, timeout, timeout], Pongs); |
| should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) -> |
| consume(Consumer, 1), |
| Pong = ping(Consumer), |
| ?_assertEqual(timeout, Pong). |
| |
| should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) -> |
| [consume(C, 3) || C <- Consumers], |
| |
| Item1 = produce(Q, Producer, 10, false), |
| ok = ping(Producer), |
| ?assertEqual(0, couch_work_queue:item_count(Q)), |
| ?assertEqual(0, couch_work_queue:size(Q)), |
| |
| Item2 = produce(Q, Producer, 10, false), |
| ok = ping(Producer), |
| ?assertEqual(0, couch_work_queue:item_count(Q)), |
| ?assertEqual(0, couch_work_queue:size(Q)), |
| |
| Item3 = produce(Q, Producer, 10, false), |
| ok = ping(Producer), |
| ?assertEqual(0, couch_work_queue:item_count(Q)), |
| ?assertEqual(0, couch_work_queue:size(Q)), |
| |
| R = [{ping(C), Item} |
| || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])], |
| |
| ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R); |
| should_consume_right_item({Q, Producer, Consumer}) -> |
| consume(Consumer, 1), |
| Item = produce(Q, Producer, 10, false), |
| produce(Q, Producer, 20, true), |
| ok = ping(Producer), |
| ok = ping(Consumer), |
| {ok, Items} = last_consumer_items(Consumer), |
| ?_assertEqual([Item], Items). |
| |
| should_increase_queue_size_on_produce({Q, Producer, _}) -> |
| produce(Q, Producer, 50, true), |
| ok = ping(Producer), |
| Count1 = couch_work_queue:item_count(Q), |
| Size1 = couch_work_queue:size(Q), |
| |
| produce(Q, Producer, 10, true), |
| Count2 = couch_work_queue:item_count(Q), |
| Size2 = couch_work_queue:size(Q), |
| |
| ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]). |
| |
| should_block_producer_on_full_queue_count({Q, Producer, _}) -> |
| produce(Q, Producer, 10, true), |
| ?assertEqual(1, couch_work_queue:item_count(Q)), |
| ok = ping(Producer), |
| |
| produce(Q, Producer, 15, true), |
| ?assertEqual(2, couch_work_queue:item_count(Q)), |
| ok = ping(Producer), |
| |
| produce(Q, Producer, 20, true), |
| ?assertEqual(3, couch_work_queue:item_count(Q)), |
| Pong = ping(Producer), |
| |
| ?_assertEqual(timeout, Pong). |
| |
| should_block_producer_on_full_queue_size({Q, Producer, _}) -> |
| produce(Q, Producer, 100, true), |
| ok = ping(Producer), |
| ?assertEqual(1, couch_work_queue:item_count(Q)), |
| ?assertEqual(100, couch_work_queue:size(Q)), |
| |
| produce(Q, Producer, 110, false), |
| Pong = ping(Producer), |
| ?assertEqual(2, couch_work_queue:item_count(Q)), |
| ?assertEqual(210, couch_work_queue:size(Q)), |
| |
| ?_assertEqual(timeout, Pong). |
| |
| should_consume_multiple_items({Q, Producer, Consumer}) -> |
| Item1 = produce(Q, Producer, 10, true), |
| ok = ping(Producer), |
| |
| Item2 = produce(Q, Producer, 15, true), |
| ok = ping(Producer), |
| |
| consume(Consumer, 2), |
| |
| {ok, Items} = last_consumer_items(Consumer), |
| ?_assertEqual([Item1, Item2], Items). |
| |
| should_receive_first_queued_item({Q, Producer, Consumer}) -> |
| consume(Consumer, 100), |
| timeout = ping(Consumer), |
| |
| Item = produce(Q, Producer, 11, false), |
| ok = ping(Producer), |
| |
| ok = ping(Consumer), |
| ?assertEqual(0, couch_work_queue:item_count(Q)), |
| |
| {ok, Items} = last_consumer_items(Consumer), |
| ?_assertEqual([Item], Items). |
| |
| should_consume_all({Q, Producer, Consumer}) -> |
| Item1 = produce(Q, Producer, 10, true), |
| Item2 = produce(Q, Producer, 15, true), |
| Item3 = produce(Q, Producer, 20, true), |
| |
| consume(Consumer, all), |
| |
| {ok, Items} = last_consumer_items(Consumer), |
| ?_assertEqual([Item1, Item2, Item3], Items). |
| |
| should_timeout_on_close_non_empty_queue({Q, Producer, _}) -> |
| produce(Q, Producer, 1, true), |
| Status = close_queue(Q), |
| |
| ?_assertEqual(timeout, Status). |
| |
| should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) -> |
| produce(Q, Producer, 1, true), |
| close_queue(Q), |
| Pong = ping(Producer), |
| Size = couch_work_queue:size(Q), |
| Count = couch_work_queue:item_count(Q), |
| |
| ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}). |
| |
| should_be_closed({Q, _, Consumers}) when is_list(Consumers) -> |
| ok = close_queue(Q), |
| |
| [consume(C, 1) || C <- Consumers], |
| |
| LastConsumerItems = [last_consumer_items(C) || C <- Consumers], |
| ItemsCount = couch_work_queue:item_count(Q), |
| Size = couch_work_queue:size(Q), |
| |
| ?_assertEqual({[closed, closed, closed], closed, closed}, |
| {LastConsumerItems, ItemsCount, Size}); |
| should_be_closed({Q, _, Consumer}) -> |
| ok = close_queue(Q), |
| |
| consume(Consumer, 1), |
| |
| LastConsumerItems = last_consumer_items(Consumer), |
| ItemsCount = couch_work_queue:item_count(Q), |
| Size = couch_work_queue:size(Q), |
| |
| ?_assertEqual({closed, closed, closed}, |
| {LastConsumerItems, ItemsCount, Size}). |
| |
| |
| close_queue(Q) -> |
| test_util:stop_sync(Q, fun() -> |
| ok = couch_work_queue:close(Q) |
| end, ?TIMEOUT). |
| |
| spawn_consumer(Q) -> |
| Parent = self(), |
| spawn(fun() -> consumer_loop(Parent, Q, nil) end). |
| |
| consumer_loop(Parent, Q, PrevItem) -> |
| receive |
| {stop, Ref} -> |
| Parent ! {ok, Ref}; |
| {ping, Ref} -> |
| Parent ! {pong, Ref}, |
| consumer_loop(Parent, Q, PrevItem); |
| {last_item, Ref} -> |
| Parent ! {item, Ref, PrevItem}, |
| consumer_loop(Parent, Q, PrevItem); |
| {consume, N} -> |
| Result = couch_work_queue:dequeue(Q, N), |
| consumer_loop(Parent, Q, Result) |
| end. |
| |
| spawn_producer(Q) -> |
| Parent = self(), |
| spawn(fun() -> producer_loop(Parent, Q) end). |
| |
| producer_loop(Parent, Q) -> |
| receive |
| {stop, Ref} -> |
| Parent ! {ok, Ref}; |
| {ping, Ref} -> |
| Parent ! {pong, Ref}, |
| producer_loop(Parent, Q); |
| {produce, Ref, Size} -> |
| Item = crypto:rand_bytes(Size), |
| Parent ! {item, Ref, Item}, |
| ok = couch_work_queue:queue(Q, Item), |
| producer_loop(Parent, Q) |
| end. |
| |
| consume(Consumer, N) -> |
| Consumer ! {consume, N}. |
| |
| last_consumer_items(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {last_item, Ref}, |
| receive |
| {item, Ref, Items} -> |
| Items |
| after ?TIMEOUT -> |
| timeout |
| end. |
| |
| produce(Q, Producer, Size, Wait) -> |
| Ref = make_ref(), |
| ItemsCount = couch_work_queue:item_count(Q), |
| Producer ! {produce, Ref, Size}, |
| receive |
| {item, Ref, Item} when Wait -> |
| ok = wait_increment(Q, ItemsCount), |
| Item; |
| {item, Ref, Item} -> |
| Item |
| after ?TIMEOUT -> |
| erlang:error({assertion_failed, |
| [{module, ?MODULE}, |
| {line, ?LINE}, |
| {reason, "Timeout asking producer to produce an item"}]}) |
| end. |
| |
| ping(Pid) -> |
| Ref = make_ref(), |
| Pid ! {ping, Ref}, |
| receive |
| {pong, Ref} -> |
| ok |
| after ?TIMEOUT -> |
| timeout |
| end. |
| |
| stop(Pid, Name) -> |
| Ref = make_ref(), |
| Pid ! {stop, Ref}, |
| receive |
| {ok, Ref} -> ok |
| after ?TIMEOUT -> |
| ?debugMsg("Timeout stopping " ++ Name), |
| timeout |
| end. |
| |
| wait_increment(Q, ItemsCount) -> |
| test_util:wait(fun() -> |
| case couch_work_queue:item_count(Q) > ItemsCount of |
| true -> |
| ok; |
| false -> |
| wait |
| end |
| end). |