blob: 96be7da59753e3684b090aaf48766ea39c467ba4 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_work_queue_tests).
-include_lib("couch/include/couch_eunit.hrl").
-define(TIMEOUT, 100).
setup(Opts) ->
{ok, Q} = couch_work_queue:new(Opts),
Producer = spawn_producer(Q),
Consumer = spawn_consumer(Q),
{Q, Producer, Consumer}.
setup_max_items() ->
setup([{max_items, 3}]).
setup_max_size() ->
setup([{max_size, 160}]).
setup_max_items_and_size() ->
setup([{max_size, 160}, {max_items, 3}]).
setup_multi_workers() ->
{Q, Producer, Consumer1} = setup([{max_size, 160},
{max_items, 3},
{multi_workers, true}]),
Consumer2 = spawn_consumer(Q),
Consumer3 = spawn_consumer(Q),
{Q, Producer, [Consumer1, Consumer2, Consumer3]}.
teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
% consume all to unblock and let producer/consumer stop without timeout
[consume(Consumer, all) || Consumer <- Consumers],
ok = close_queue(Q),
ok = stop(Producer, "producer"),
R = [stop(Consumer, "consumer") || Consumer <- Consumers],
R = [ok || _ <- Consumers],
ok;
teardown({Q, Producer, Consumer}) ->
teardown({Q, Producer, [Consumer]}).
single_consumer_test_() ->
{
"Single producer and consumer",
[
{
"Queue with 3 max items",
{
foreach,
fun setup_max_items/0, fun teardown/1,
single_consumer_max_item_count() ++ common_cases()
}
},
{
"Queue with max size of 160 bytes",
{
foreach,
fun setup_max_size/0, fun teardown/1,
single_consumer_max_size() ++ common_cases()
}
},
{
"Queue with max size of 160 bytes and 3 max items",
{
foreach,
fun setup_max_items_and_size/0, fun teardown/1,
single_consumer_max_items_and_size() ++ common_cases()
}
}
]
}.
multiple_consumers_test_() ->
{
"Single producer and multiple consumers",
[
{
"Queue with max size of 160 bytes and 3 max items",
{
foreach,
fun setup_multi_workers/0, fun teardown/1,
common_cases() ++ multiple_consumers()
}
}
]
}.
common_cases()->
[
fun should_block_consumer_on_dequeue_from_empty_queue/1,
fun should_consume_right_item/1,
fun should_timeout_on_close_non_empty_queue/1,
fun should_not_block_producer_for_non_empty_queue_after_close/1,
fun should_be_closed/1
].
single_consumer_max_item_count()->
[
fun should_have_no_items_for_new_queue/1,
fun should_block_producer_on_full_queue_count/1,
fun should_receive_first_queued_item/1,
fun should_consume_multiple_items/1,
fun should_consume_all/1
].
single_consumer_max_size()->
[
fun should_have_zero_size_for_new_queue/1,
fun should_block_producer_on_full_queue_size/1,
fun should_increase_queue_size_on_produce/1,
fun should_receive_first_queued_item/1,
fun should_consume_multiple_items/1,
fun should_consume_all/1
].
single_consumer_max_items_and_size() ->
single_consumer_max_item_count() ++ single_consumer_max_size().
multiple_consumers() ->
[
fun should_have_zero_size_for_new_queue/1,
fun should_have_no_items_for_new_queue/1,
fun should_increase_queue_size_on_produce/1
].
should_have_no_items_for_new_queue({Q, _, _}) ->
?_assertEqual(0, couch_work_queue:item_count(Q)).
should_have_zero_size_for_new_queue({Q, _, _}) ->
?_assertEqual(0, couch_work_queue:size(Q)).
should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) ->
[consume(C, 2) || C <- Consumers],
Pongs = [ping(C) || C <- Consumers],
?_assertEqual([timeout, timeout, timeout], Pongs);
should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) ->
consume(Consumer, 1),
Pong = ping(Consumer),
?_assertEqual(timeout, Pong).
should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) ->
[consume(C, 3) || C <- Consumers],
Item1 = produce(Q, Producer, 10, false),
ok = ping(Producer),
?assertEqual(0, couch_work_queue:item_count(Q)),
?assertEqual(0, couch_work_queue:size(Q)),
Item2 = produce(Q, Producer, 10, false),
ok = ping(Producer),
?assertEqual(0, couch_work_queue:item_count(Q)),
?assertEqual(0, couch_work_queue:size(Q)),
Item3 = produce(Q, Producer, 10, false),
ok = ping(Producer),
?assertEqual(0, couch_work_queue:item_count(Q)),
?assertEqual(0, couch_work_queue:size(Q)),
R = [{ping(C), Item}
|| {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])],
?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R);
should_consume_right_item({Q, Producer, Consumer}) ->
consume(Consumer, 1),
Item = produce(Q, Producer, 10, false),
produce(Q, Producer, 20, true),
ok = ping(Producer),
ok = ping(Consumer),
{ok, Items} = last_consumer_items(Consumer),
?_assertEqual([Item], Items).
should_increase_queue_size_on_produce({Q, Producer, _}) ->
produce(Q, Producer, 50, true),
ok = ping(Producer),
Count1 = couch_work_queue:item_count(Q),
Size1 = couch_work_queue:size(Q),
produce(Q, Producer, 10, true),
Count2 = couch_work_queue:item_count(Q),
Size2 = couch_work_queue:size(Q),
?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]).
should_block_producer_on_full_queue_count({Q, Producer, _}) ->
produce(Q, Producer, 10, true),
?assertEqual(1, couch_work_queue:item_count(Q)),
ok = ping(Producer),
produce(Q, Producer, 15, true),
?assertEqual(2, couch_work_queue:item_count(Q)),
ok = ping(Producer),
produce(Q, Producer, 20, false),
?assertEqual(3, couch_work_queue:item_count(Q)),
Pong = ping(Producer),
?_assertEqual(timeout, Pong).
should_block_producer_on_full_queue_size({Q, Producer, _}) ->
produce(Q, Producer, 100, true),
ok = ping(Producer),
?assertEqual(1, couch_work_queue:item_count(Q)),
?assertEqual(100, couch_work_queue:size(Q)),
produce(Q, Producer, 110, false),
Pong = ping(Producer),
?assertEqual(2, couch_work_queue:item_count(Q)),
?assertEqual(210, couch_work_queue:size(Q)),
?_assertEqual(timeout, Pong).
should_consume_multiple_items({Q, Producer, Consumer}) ->
Item1 = produce(Q, Producer, 10, true),
ok = ping(Producer),
Item2 = produce(Q, Producer, 15, true),
ok = ping(Producer),
consume(Consumer, 2),
{ok, Items} = last_consumer_items(Consumer),
?_assertEqual([Item1, Item2], Items).
should_receive_first_queued_item({Q, Producer, Consumer}) ->
consume(Consumer, 100),
timeout = ping(Consumer),
Item = produce(Q, Producer, 11, false),
ok = ping(Producer),
ok = ping(Consumer),
?assertEqual(0, couch_work_queue:item_count(Q)),
{ok, Items} = last_consumer_items(Consumer),
?_assertEqual([Item], Items).
should_consume_all({Q, Producer, Consumer}) ->
Item1 = produce(Q, Producer, 10, true),
Item2 = produce(Q, Producer, 15, true),
Item3 = produce(Q, Producer, 20, true),
consume(Consumer, all),
{ok, Items} = last_consumer_items(Consumer),
?_assertEqual([Item1, Item2, Item3], Items).
should_timeout_on_close_non_empty_queue({Q, Producer, _}) ->
produce(Q, Producer, 1, true),
Status = close_queue(Q),
?_assertEqual(timeout, Status).
should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
produce(Q, Producer, 1, true),
close_queue(Q),
Pong = ping(Producer),
Size = couch_work_queue:size(Q),
Count = couch_work_queue:item_count(Q),
?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
ok = close_queue(Q),
[consume(C, 1) || C <- Consumers],
LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
ItemsCount = couch_work_queue:item_count(Q),
Size = couch_work_queue:size(Q),
?_assertEqual({[closed, closed, closed], closed, closed},
{LastConsumerItems, ItemsCount, Size});
should_be_closed({Q, _, Consumer}) ->
ok = close_queue(Q),
consume(Consumer, 1),
LastConsumerItems = last_consumer_items(Consumer),
ItemsCount = couch_work_queue:item_count(Q),
Size = couch_work_queue:size(Q),
?_assertEqual({closed, closed, closed},
{LastConsumerItems, ItemsCount, Size}).
close_queue(Q) ->
test_util:stop_sync(Q, fun() ->
ok = couch_work_queue:close(Q)
end, ?TIMEOUT).
spawn_consumer(Q) ->
Parent = self(),
spawn(fun() -> consumer_loop(Parent, Q, nil) end).
consumer_loop(Parent, Q, PrevItem) ->
receive
{stop, Ref} ->
Parent ! {ok, Ref};
{ping, Ref} ->
Parent ! {pong, Ref},
consumer_loop(Parent, Q, PrevItem);
{last_item, Ref} ->
Parent ! {item, Ref, PrevItem},
consumer_loop(Parent, Q, PrevItem);
{consume, N} ->
Result = couch_work_queue:dequeue(Q, N),
consumer_loop(Parent, Q, Result)
end.
spawn_producer(Q) ->
Parent = self(),
spawn(fun() -> producer_loop(Parent, Q) end).
producer_loop(Parent, Q) ->
receive
{stop, Ref} ->
Parent ! {ok, Ref};
{ping, Ref} ->
Parent ! {pong, Ref},
producer_loop(Parent, Q);
{produce, Ref, Size} ->
Item = crypto:rand_bytes(Size),
Parent ! {item, Ref, Item},
ok = couch_work_queue:queue(Q, Item),
producer_loop(Parent, Q)
end.
consume(Consumer, N) ->
Consumer ! {consume, N}.
last_consumer_items(Consumer) ->
Ref = make_ref(),
Consumer ! {last_item, Ref},
receive
{item, Ref, Items} ->
Items
after ?TIMEOUT ->
timeout
end.
produce(Q, Producer, Size, Wait) ->
Ref = make_ref(),
ItemsCount = couch_work_queue:item_count(Q),
Producer ! {produce, Ref, Size},
receive
{item, Ref, Item} when Wait ->
ok = wait_increment(Q, ItemsCount),
Item;
{item, Ref, Item} ->
Item
after ?TIMEOUT ->
erlang:error({assertion_failed,
[{module, ?MODULE},
{line, ?LINE},
{reason, "Timeout asking producer to produce an item"}]})
end.
ping(Pid) ->
Ref = make_ref(),
Pid ! {ping, Ref},
receive
{pong, Ref} ->
ok
after ?TIMEOUT ->
timeout
end.
stop(Pid, Name) ->
Ref = make_ref(),
Pid ! {stop, Ref},
receive
{ok, Ref} -> ok
after ?TIMEOUT ->
?debugMsg("Timeout stopping " ++ Name),
timeout
end.
wait_increment(Q, ItemsCount) ->
test_util:wait(fun() ->
case couch_work_queue:item_count(Q) > ItemsCount of
true ->
ok;
false ->
wait
end
end).