| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_work_queue). |
| -behaviour(gen_server). |
| -vsn(1). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| % public API |
| -export([new/1, queue/2, dequeue/1, dequeue/2, close/1, item_count/1, size/1]). |
| |
| % gen_server callbacks |
| -export([init/1, terminate/2]). |
| -export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]). |
| |
| -record(q, { |
| queue = queue:new(), |
| blocked = [], |
| max_size, |
| max_items, |
| items = 0, |
| size = 0, |
| work_waiters = [], |
| close_on_dequeue = false, |
| multi_workers = false |
| }). |
| |
| new(Options) -> |
| gen_server:start_link(couch_work_queue, Options, []). |
| |
| queue(Wq, Item) when is_binary(Item) -> |
| gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity); |
| queue(Wq, Item) -> |
| gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity). |
| |
| dequeue(Wq) -> |
| dequeue(Wq, all). |
| |
| dequeue(Wq, MaxItems) -> |
| try |
| gen_server:call(Wq, {dequeue, MaxItems}, infinity) |
| catch |
| _:_ -> closed |
| end. |
| |
| item_count(Wq) -> |
| try |
| gen_server:call(Wq, item_count, infinity) |
| catch |
| _:_ -> closed |
| end. |
| |
| size(Wq) -> |
| try |
| gen_server:call(Wq, size, infinity) |
| catch |
| _:_ -> closed |
| end. |
| |
| close(Wq) -> |
| gen_server:cast(Wq, close). |
| |
| init(Options) -> |
| Q = #q{ |
| max_size = couch_util:get_value(max_size, Options, nil), |
| max_items = couch_util:get_value(max_items, Options, nil), |
| multi_workers = couch_util:get_value(multi_workers, Options, false) |
| }, |
| {ok, Q, hibernate}. |
| |
| terminate(_Reason, #q{work_waiters = Workers}) -> |
| lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). |
| |
| handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> |
| Q = Q0#q{ |
| size = Q0#q.size + Size, |
| items = Q0#q.items + 1, |
| queue = queue:in({Item, Size}, Q0#q.queue) |
| }, |
| case |
| (Q#q.size >= Q#q.max_size) orelse |
| (Q#q.items >= Q#q.max_items) |
| of |
| true -> |
| {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate}; |
| false -> |
| {reply, ok, Q, hibernate} |
| end; |
| handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> |
| gen_server:reply(W, {ok, [Item]}), |
| {reply, ok, Q#q{work_waiters = Rest}, hibernate}; |
| handle_call({dequeue, Max}, From, Q) -> |
| #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, |
| case {Workers, Multi} of |
| {[_ | _], false} -> |
| exit("Only one caller allowed to wait for this work at a time"); |
| {[_ | _], true} -> |
| {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; |
| _ -> |
| case Count of |
| 0 -> |
| {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; |
| C when C > 0 -> |
| deliver_queue_items(Max, Q) |
| end |
| end; |
| handle_call(item_count, _From, Q) -> |
| {reply, Q#q.items, Q}; |
| handle_call(size, _From, Q) -> |
| {reply, Q#q.size, Q}. |
| |
| deliver_queue_items(Max, Q) -> |
| #q{ |
| queue = Queue, |
| items = Count, |
| size = Size, |
| close_on_dequeue = Close, |
| blocked = Blocked |
| } = Q, |
| case (Max =:= all) orelse (Max >= Count) of |
| false -> |
| {Items, Size2, Queue2, Blocked2} = dequeue_items( |
| Max, Size, Queue, Blocked, [] |
| ), |
| Q2 = Q#q{ |
| items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 |
| }, |
| {reply, {ok, Items}, Q2}; |
| true -> |
| lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), |
| Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, |
| Items = [Item || {Item, _} <- queue:to_list(Queue)], |
| case Close of |
| false -> |
| {reply, {ok, Items}, Q2}; |
| true -> |
| {stop, normal, {ok, Items}, Q2} |
| end |
| end. |
| |
| dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) -> |
| {lists:reverse(DequeuedAcc), Size, Queue, Blocked}; |
| dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> |
| {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue), |
| case Blocked of |
| [] -> |
| Blocked2 = Blocked; |
| [From | Blocked2] -> |
| gen_server:reply(From, ok) |
| end, |
| dequeue_items( |
| NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc] |
| ). |
| |
| handle_cast(close, #q{items = 0} = Q) -> |
| {stop, normal, Q}; |
| handle_cast(close, Q) -> |
| {noreply, Q#q{close_on_dequeue = true}}. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| handle_info(X, Q) -> |
| {stop, X, Q}. |