% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_work_queue_tests).

-include_lib("couch/include/couch_eunit.hrl").

-define(TIMEOUT, 100).


setup(Opts) ->
    {ok, Q} = couch_work_queue:new(Opts),
    Producer = spawn_producer(Q),
    Consumer = spawn_consumer(Q),
    {Q, Producer, Consumer}.

setup_max_items() ->
    setup([{max_items, 3}]).

setup_max_size() ->
    setup([{max_size, 160}]).

setup_max_items_and_size() ->
    setup([{max_size, 160}, {max_items, 3}]).

setup_multi_workers() ->
    {Q, Producer, Consumer1} = setup([{max_size, 160},
                                      {max_items, 3},
                                      {multi_workers, true}]),
    Consumer2 = spawn_consumer(Q),
    Consumer3 = spawn_consumer(Q),
    {Q, Producer, [Consumer1, Consumer2, Consumer3]}.

teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
    % consume all to unblock and let producer/consumer stop without timeout
    [consume(Consumer, all) || Consumer <- Consumers],

    ok = close_queue(Q),
    ok = stop(Producer, "producer"),
    R = [stop(Consumer, "consumer") || Consumer <- Consumers],
    R = [ok || _ <- Consumers],
    ok;
teardown({Q, Producer, Consumer}) ->
    teardown({Q, Producer, [Consumer]}).


single_consumer_test_() ->
    {
        "Single producer and consumer",
        [
            {
                "Queue with 3 max items",
                {
                    foreach,
                    fun setup_max_items/0, fun teardown/1,
                    single_consumer_max_item_count() ++ common_cases()
                }
            },
            {
                "Queue with max size of 160 bytes",
                {
                    foreach,
                    fun setup_max_size/0, fun teardown/1,
                    single_consumer_max_size() ++ common_cases()
                }
            },
            {
                "Queue with max size of 160 bytes and 3 max items",
                {
                    foreach,
                    fun setup_max_items_and_size/0, fun teardown/1,
                    single_consumer_max_items_and_size() ++ common_cases()
                }
            }
        ]
    }.

multiple_consumers_test_() ->
    {
        "Single producer and multiple consumers",
        [
            {
                "Queue with max size of 160 bytes and 3 max items",
                {
                    foreach,
                    fun setup_multi_workers/0, fun teardown/1,
                    common_cases() ++ multiple_consumers()
                }

            }
        ]
    }.

common_cases()->
    [
        fun should_block_consumer_on_dequeue_from_empty_queue/1,
        fun should_consume_right_item/1,
        fun should_timeout_on_close_non_empty_queue/1,
        fun should_not_block_producer_for_non_empty_queue_after_close/1,
        fun should_be_closed/1
    ].

single_consumer_max_item_count()->
    [
        fun should_have_no_items_for_new_queue/1,
        fun should_block_producer_on_full_queue_count/1,
        fun should_receive_first_queued_item/1,
        fun should_consume_multiple_items/1,
        fun should_consume_all/1
    ].

single_consumer_max_size()->
    [
        fun should_have_zero_size_for_new_queue/1,
        fun should_block_producer_on_full_queue_size/1,
        fun should_increase_queue_size_on_produce/1,
        fun should_receive_first_queued_item/1,
        fun should_consume_multiple_items/1,
        fun should_consume_all/1
    ].

single_consumer_max_items_and_size() ->
    single_consumer_max_item_count() ++ single_consumer_max_size().

multiple_consumers() ->
    [
        fun should_have_zero_size_for_new_queue/1,
        fun should_have_no_items_for_new_queue/1,
        fun should_increase_queue_size_on_produce/1
    ].


should_have_no_items_for_new_queue({Q, _, _}) ->
    ?_assertEqual(0, couch_work_queue:item_count(Q)).

should_have_zero_size_for_new_queue({Q, _, _}) ->
    ?_assertEqual(0, couch_work_queue:size(Q)).

should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) ->
    [consume(C, 2) || C <- Consumers],
    Pongs = [ping(C) || C <- Consumers],
    ?_assertEqual([timeout, timeout, timeout], Pongs);
should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) ->
    consume(Consumer, 1),
    Pong = ping(Consumer),
    ?_assertEqual(timeout, Pong).

should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) ->
    [consume(C, 3) || C <- Consumers],

    Item1 = produce(Q, Producer, 10, false),
    ok = ping(Producer),
    ?assertEqual(0, couch_work_queue:item_count(Q)),
    ?assertEqual(0, couch_work_queue:size(Q)),

    Item2 = produce(Q, Producer, 10, false),
    ok = ping(Producer),
    ?assertEqual(0, couch_work_queue:item_count(Q)),
    ?assertEqual(0, couch_work_queue:size(Q)),

    Item3 = produce(Q, Producer, 10, false),
    ok = ping(Producer),
    ?assertEqual(0, couch_work_queue:item_count(Q)),
    ?assertEqual(0, couch_work_queue:size(Q)),

    R = [{ping(C), Item}
         || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])],

    ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R);
should_consume_right_item({Q, Producer, Consumer}) ->
    consume(Consumer, 1),
    Item = produce(Q, Producer, 10, false),
    produce(Q, Producer, 20, true),
    ok = ping(Producer),
    ok = ping(Consumer),
    {ok, Items} = last_consumer_items(Consumer),
    ?_assertEqual([Item], Items).

should_increase_queue_size_on_produce({Q, Producer, _}) ->
    produce(Q, Producer, 50, true),
    ok = ping(Producer),
    Count1 = couch_work_queue:item_count(Q),
    Size1 = couch_work_queue:size(Q),

    produce(Q, Producer, 10, true),
    Count2 = couch_work_queue:item_count(Q),
    Size2 = couch_work_queue:size(Q),

    ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]).

should_block_producer_on_full_queue_count({Q, Producer, _}) ->
    produce(Q, Producer, 10, true),
    ?assertEqual(1, couch_work_queue:item_count(Q)),
    ok = ping(Producer),

    produce(Q, Producer, 15, true),
    ?assertEqual(2, couch_work_queue:item_count(Q)),
    ok = ping(Producer),

    produce(Q, Producer, 20, false),
    ?assertEqual(3, couch_work_queue:item_count(Q)),
    Pong = ping(Producer),

    ?_assertEqual(timeout, Pong).

should_block_producer_on_full_queue_size({Q, Producer, _}) ->
    produce(Q, Producer, 100, true),
    ok = ping(Producer),
    ?assertEqual(1, couch_work_queue:item_count(Q)),
    ?assertEqual(100, couch_work_queue:size(Q)),

    produce(Q, Producer, 110, false),
    Pong = ping(Producer),
    ?assertEqual(2, couch_work_queue:item_count(Q)),
    ?assertEqual(210, couch_work_queue:size(Q)),

    ?_assertEqual(timeout, Pong).

should_consume_multiple_items({Q, Producer, Consumer}) ->
    Item1 = produce(Q, Producer, 10, true),
    ok = ping(Producer),

    Item2 = produce(Q, Producer, 15, true),
    ok = ping(Producer),

    consume(Consumer, 2),

    {ok, Items} = last_consumer_items(Consumer),
    ?_assertEqual([Item1, Item2], Items).

should_receive_first_queued_item({Q, Producer, Consumer}) ->
    consume(Consumer, 100),
    timeout = ping(Consumer),

    Item = produce(Q, Producer, 11, false),
    ok = ping(Producer),

    ok = ping(Consumer),
    ?assertEqual(0, couch_work_queue:item_count(Q)),

    {ok, Items} = last_consumer_items(Consumer),
    ?_assertEqual([Item], Items).

should_consume_all({Q, Producer, Consumer}) ->
    Item1 = produce(Q, Producer, 10, true),
    Item2 = produce(Q, Producer, 15, true),
    Item3 = produce(Q, Producer, 20, true),

    consume(Consumer, all),

    {ok, Items} = last_consumer_items(Consumer),
    ?_assertEqual([Item1, Item2, Item3], Items).

should_timeout_on_close_non_empty_queue({Q, Producer, _}) ->
    produce(Q, Producer, 1, true),
    Status = close_queue(Q),

    ?_assertEqual(timeout, Status).

should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
    produce(Q, Producer, 1, true),
    close_queue(Q),
    Pong = ping(Producer),
    Size = couch_work_queue:size(Q),
    Count = couch_work_queue:item_count(Q),

    ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).

should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
    ok = close_queue(Q),

    [consume(C, 1) || C <- Consumers],

    LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
    ItemsCount = couch_work_queue:item_count(Q),
    Size = couch_work_queue:size(Q),

    ?_assertEqual({[closed, closed, closed], closed, closed},
                  {LastConsumerItems, ItemsCount, Size});
should_be_closed({Q, _, Consumer}) ->
    ok = close_queue(Q),

    consume(Consumer, 1),

    LastConsumerItems = last_consumer_items(Consumer),
    ItemsCount = couch_work_queue:item_count(Q),
    Size = couch_work_queue:size(Q),

    ?_assertEqual({closed, closed, closed},
                  {LastConsumerItems, ItemsCount, Size}).


close_queue(Q) ->
    test_util:stop_sync(Q, fun() ->
        ok = couch_work_queue:close(Q)
    end, ?TIMEOUT).

spawn_consumer(Q) ->
    Parent = self(),
    spawn(fun() -> consumer_loop(Parent, Q, nil) end).

consumer_loop(Parent, Q, PrevItem) ->
    receive
        {stop, Ref} ->
            Parent ! {ok, Ref};
        {ping, Ref} ->
            Parent ! {pong, Ref},
            consumer_loop(Parent, Q, PrevItem);
        {last_item, Ref} ->
            Parent ! {item, Ref, PrevItem},
            consumer_loop(Parent, Q, PrevItem);
        {consume, N} ->
            Result = couch_work_queue:dequeue(Q, N),
            consumer_loop(Parent, Q, Result)
    end.

spawn_producer(Q) ->
    Parent = self(),
    spawn(fun() -> producer_loop(Parent, Q) end).

producer_loop(Parent, Q) ->
    receive
        {stop, Ref} ->
            Parent ! {ok, Ref};
        {ping, Ref} ->
            Parent ! {pong, Ref},
            producer_loop(Parent, Q);
        {produce, Ref, Size} ->
            Item = crypto:rand_bytes(Size),
            Parent ! {item, Ref, Item},
            ok = couch_work_queue:queue(Q, Item),
            producer_loop(Parent, Q)
    end.

consume(Consumer, N) ->
    Consumer ! {consume, N}.

last_consumer_items(Consumer) ->
    Ref = make_ref(),
    Consumer ! {last_item, Ref},
    receive
        {item, Ref, Items} ->
            Items
    after ?TIMEOUT ->
        timeout
    end.

produce(Q, Producer, Size, Wait) ->
    Ref = make_ref(),
    ItemsCount = couch_work_queue:item_count(Q),
    Producer ! {produce, Ref, Size},
    receive
        {item, Ref, Item} when Wait ->
            ok = wait_increment(Q, ItemsCount),
            Item;
        {item, Ref, Item} ->
            Item
    after ?TIMEOUT ->
        erlang:error({assertion_failed,
                      [{module, ?MODULE},
                       {line, ?LINE},
                       {reason, "Timeout asking producer to produce an item"}]})
    end.

ping(Pid) ->
    Ref = make_ref(),
    Pid ! {ping, Ref},
    receive
        {pong, Ref} ->
            ok
    after ?TIMEOUT ->
        timeout
    end.

stop(Pid, Name) ->
    Ref = make_ref(),
    Pid ! {stop, Ref},
    receive
        {ok, Ref} -> ok
    after ?TIMEOUT ->
        ?debugMsg("Timeout stopping " ++ Name),
        timeout
    end.

wait_increment(Q, ItemsCount) ->
    test_util:wait(fun() ->
       case couch_work_queue:item_count(Q) > ItemsCount of
           true ->
               ok;
           false ->
               wait
       end
    end).
