blob: 8594a6f8787b23e0062734df519c947ebc3e5af7 [file] [log] [blame]
#!/usr/bin/env escript
%% -*- erlang -*-
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
main(_) ->
test_util:init_code_path(),
etap:plan(155),
case (catch test()) of
ok ->
etap:end_tests();
Other ->
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
etap:bail(Other)
end,
ok.
test() ->
ok = crypto:start(),
test_single_consumer_max_item_count(),
test_single_consumer_max_size(),
test_single_consumer_max_item_count_and_size(),
test_multiple_consumers(),
ok.
test_single_consumer_max_item_count() ->
etap:diag("Spawning a queue with 3 max items, 1 producer and 1 consumer"),
{ok, Q} = couch_work_queue:new([{max_items, 3}]),
Producer = spawn_producer(Q),
Consumer = spawn_consumer(Q),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
consume(Consumer, 1),
etap:is(ping(Consumer), timeout,
"Consumer blocked when attempting to dequeue 1 item from empty queue"),
Item1 = produce(Producer, 10),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
etap:is(last_consumer_items(Consumer), {ok, [Item1]},
"Consumer received the right item"),
Item2 = produce(Producer, 20),
etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
Item3 = produce(Producer, 15),
etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
Item4 = produce(Producer, 3),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3]},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item4]},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
consume(Consumer, 100),
etap:is(ping(Consumer), timeout,
"Consumer blocked when attempting to dequeue 100 items from empty queue"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
Item5 = produce(Producer, 11),
etap:is(ping(Producer), ok, "Producer not blocked with empty queue"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
Item6 = produce(Producer, 19),
etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
Item7 = produce(Producer, 2),
etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
Item8 = produce(Producer, 33),
etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
etap:is(last_consumer_items(Consumer), {ok, [Item5]},
"Consumer received the first queued item"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8]},
"Consumer received all queued items"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(close_queue(Q), ok, "Closed queue"),
consume(Consumer, 1),
etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
stop(Producer, "producer"),
stop(Consumer, "consumer").
test_single_consumer_max_size() ->
etap:diag("Spawning a queue with max size of 160 bytes, "
"1 producer and 1 consumer"),
{ok, Q} = couch_work_queue:new([{max_size, 160}]),
Producer = spawn_producer(Q),
Consumer = spawn_consumer(Q),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
consume(Consumer, 1),
etap:is(ping(Consumer), timeout,
"Consumer blocked when attempting to dequeue 1 item from empty queue"),
Item1 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(ping(Consumer), ok, "Consumer unblocked"),
etap:is(last_consumer_items(Consumer), {ok, [Item1]},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
Item2 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
etap:is(couch_work_queue:size(Q), 50, "Queue size is 50 bytes"),
Item3 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
Item4 = produce(Producer, 61),
etap:is(ping(Producer), timeout, "Producer blocked"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(couch_work_queue:size(Q), 161, "Queue size is 161 bytes"),
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from full queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item2]},
"Consumer received the right item"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 111, "Queue size is 111 bytes"),
Item5 = produce(Producer, 20),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(couch_work_queue:size(Q), 131, "Queue size is 131 bytes"),
Item6 = produce(Producer, 40),
etap:is(ping(Producer), timeout, "Producer blocked"),
etap:is(couch_work_queue:item_count(Q), 4, "Queue item count is 4"),
etap:is(couch_work_queue:size(Q), 171, "Queue size is 171 bytes"),
etap:is(close_queue(Q), timeout,
"Timeout when trying to close non-empty queue"),
consume(Consumer, 2),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 2 items from full queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4]},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 60, "Queue size is 60 bytes"),
etap:is(close_queue(Q), timeout,
"Timeout when trying to close non-empty queue"),
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
consume(Consumer, all),
etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
stop(Producer, "producer"),
stop(Consumer, "consumer").
test_single_consumer_max_item_count_and_size() ->
etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
"1 producer and 1 consumer"),
{ok, Q} = couch_work_queue:new([{max_items, 3}, {max_size, 200}]),
Producer = spawn_producer(Q),
Consumer = spawn_consumer(Q),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
Item1 = produce(Producer, 100),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
Item2 = produce(Producer, 110),
etap:is(ping(Producer), timeout,
"Producer blocked when queue size >= max_size"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 210, "Queue size is 210 bytes"),
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2]},
"Consumer received the right items"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Producer), ok, "Producer not blocked anymore"),
Item3 = produce(Producer, 10),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
etap:is(couch_work_queue:size(Q), 10, "Queue size is 10 bytes"),
Item4 = produce(Producer, 4),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 14, "Queue size is 14 bytes"),
Item5 = produce(Producer, 2),
etap:is(ping(Producer), timeout,
"Producer blocked when queue item count = max_items"),
etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
etap:is(couch_work_queue:size(Q), 16, "Queue size is 16 bytes"),
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item3]},
"Consumer received 1 item"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 6, "Queue size is 6 bytes"),
etap:is(close_queue(Q), timeout,
"Timeout when trying to close non-empty queue"),
consume(Consumer, 1),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue 1 item from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item4]},
"Consumer received 1 item"),
etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
etap:is(couch_work_queue:size(Q), 2, "Queue size is 2 bytes"),
Item6 = produce(Producer, 50),
etap:is(ping(Producer), ok,
"Producer not blocked when queue is not full and already received"
" a close request"),
etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
etap:is(couch_work_queue:size(Q), 52, "Queue size is 52 bytes"),
consume(Consumer, all),
etap:is(ping(Consumer), ok,
"Consumer not blocked when attempting to dequeue all items from queue"),
etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
"Consumer received all queued items"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
consume(Consumer, 1),
etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
stop(Producer, "producer"),
stop(Consumer, "consumer").
test_multiple_consumers() ->
etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
"1 producer and 3 consumers"),
{ok, Q} = couch_work_queue:new(
[{max_items, 3}, {max_size, 200}, {multi_workers, true}]),
Producer = spawn_producer(Q),
Consumer1 = spawn_consumer(Q),
Consumer2 = spawn_consumer(Q),
Consumer3 = spawn_consumer(Q),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
consume(Consumer1, 1),
etap:is(ping(Consumer1), timeout,
"Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
consume(Consumer2, 2),
etap:is(ping(Consumer2), timeout,
"Consumer 2 blocked when attempting to dequeue 2 items from empty queue"),
consume(Consumer3, 1),
etap:is(ping(Consumer3), timeout,
"Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
Item1 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
Item2 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
Item3 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
etap:is(last_consumer_items(Consumer1), {ok, [Item1]},
"Consumer 1 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
etap:is(last_consumer_items(Consumer2), {ok, [Item2]},
"Consumer 2 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
etap:is(last_consumer_items(Consumer3), {ok, [Item3]},
"Consumer 3 received 1 item"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
consume(Consumer1, 1),
etap:is(ping(Consumer1), timeout,
"Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
consume(Consumer2, 2),
etap:is(ping(Consumer2), timeout,
"Consumer 2 blocked when attempting to dequeue 1 item from empty queue"),
consume(Consumer3, 1),
etap:is(ping(Consumer3), timeout,
"Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
Item4 = produce(Producer, 50),
etap:is(ping(Producer), ok, "Producer not blocked"),
etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
etap:is(close_queue(Q), ok, "Closed queue"),
etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
etap:is(last_consumer_items(Consumer1), {ok, [Item4]},
"Consumer 1 received 1 item"),
etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
etap:is(last_consumer_items(Consumer2), closed,
"Consumer 2 received 'closed' atom"),
etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
etap:is(last_consumer_items(Consumer3), closed,
"Consumer 3 received 'closed' atom"),
stop(Producer, "producer"),
stop(Consumer1, "consumer 1"),
stop(Consumer2, "consumer 2"),
stop(Consumer3, "consumer 3").
close_queue(Q) ->
ok = couch_work_queue:close(Q),
MonRef = erlang:monitor(process, Q),
receive
{'DOWN', MonRef, process, Q, _Reason} ->
etap:diag("Queue closed")
after 3000 ->
erlang:demonitor(MonRef),
timeout
end.
spawn_consumer(Q) ->
Parent = self(),
spawn(fun() -> consumer_loop(Parent, Q, nil) end).
consumer_loop(Parent, Q, PrevItem) ->
receive
{stop, Ref} ->
Parent ! {ok, Ref};
{ping, Ref} ->
Parent ! {pong, Ref},
consumer_loop(Parent, Q, PrevItem);
{last_item, Ref} ->
Parent ! {item, Ref, PrevItem},
consumer_loop(Parent, Q, PrevItem);
{consume, N} ->
Result = couch_work_queue:dequeue(Q, N),
consumer_loop(Parent, Q, Result)
end.
spawn_producer(Q) ->
Parent = self(),
spawn(fun() -> producer_loop(Parent, Q) end).
producer_loop(Parent, Q) ->
receive
{stop, Ref} ->
Parent ! {ok, Ref};
{ping, Ref} ->
Parent ! {pong, Ref},
producer_loop(Parent, Q);
{produce, Ref, Size} ->
Item = crypto:rand_bytes(Size),
Parent ! {item, Ref, Item},
ok = couch_work_queue:queue(Q, Item),
producer_loop(Parent, Q)
end.
consume(Consumer, N) ->
Consumer ! {consume, N}.
last_consumer_items(Consumer) ->
Ref = make_ref(),
Consumer ! {last_item, Ref},
receive
{item, Ref, Items} ->
Items
after 3000 ->
timeout
end.
produce(Producer, Size) ->
Ref = make_ref(),
Producer ! {produce, Ref, Size},
receive
{item, Ref, Item} ->
Item
after 3000 ->
etap:bail("Timeout asking producer to produce an item")
end.
ping(Pid) ->
Ref = make_ref(),
Pid ! {ping, Ref},
receive
{pong, Ref} ->
ok
after 3000 ->
timeout
end.
stop(Pid, Name) ->
Ref = make_ref(),
Pid ! {stop, Ref},
receive
{ok, Ref} ->
etap:diag("Stopped " ++ Name)
after 3000 ->
etap:bail("Timeout stopping " ++ Name)
end.