blob: 9adf0e13dd282d741512b81b4e036b636593e3b1 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(global_changes_listener).
-behavior(couch_event_listener).
-export([
start/0
]).
-export([
init/1,
terminate/2,
handle_event/3,
handle_cast/2,
handle_info/2
]).
-record(state, {
update_db,
pending_update_count,
pending_updates,
last_update_time,
max_event_delay,
dbname
}).
-include_lib("mem3/include/mem3.hrl").
start() ->
couch_event_listener:start(?MODULE, nil, [all_dbs]).
init(_) ->
% get configs as strings
UpdateDb0 = config:get("global_changes", "update_db", "true"),
MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),
% make config strings into other data types
UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
MaxEventDelay = list_to_integer(MaxEventDelay0),
State = #state{
update_db=UpdateDb,
pending_update_count=0,
pending_updates=sets:new(),
max_event_delay=MaxEventDelay,
dbname=global_changes_util:get_dbname()
},
{ok, State}.
terminate(_Reason, _State) ->
ok.
handle_event(_ShardName, _Event, #state{update_db=false}=State) ->
{ok, State};
handle_event(ShardName, Event, State0)
when Event =:= updated orelse Event =:= deleted
orelse Event =:= created ->
#state{dbname=ChangesDbName} = State0,
State = case mem3:dbname(ShardName) of
ChangesDbName ->
State0;
DbName ->
#state{pending_update_count=Count} = State0,
EventBin = erlang:atom_to_binary(Event, latin1),
Key = <<EventBin/binary, <<":">>/binary, DbName/binary>>,
Pending = sets:add_element(Key, State0#state.pending_updates),
couch_stats:update_gauge(
[global_changes, listener_pending_updates],
Count + 1
),
State0#state{pending_updates=Pending, pending_update_count=Count+1}
end,
maybe_send_updates(State);
handle_event(_DbName, _Event, State) ->
maybe_send_updates(State).
handle_cast({set_max_event_delay, MaxEventDelay}, State) ->
maybe_send_updates(State#state{max_event_delay=MaxEventDelay});
handle_cast({set_update_db, Boolean}, State0) ->
% If turning update_db off, clear out server state
State = case {Boolean, State0#state.update_db} of
{false, true} ->
State0#state{
update_db=Boolean,
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
};
_ ->
State0#state{update_db=Boolean}
end,
maybe_send_updates(State);
handle_cast(_Msg, State) ->
maybe_send_updates(State).
maybe_send_updates(#state{pending_update_count=0}=State) ->
{ok, State};
maybe_send_updates(#state{update_db=true}=State) ->
#state{max_event_delay=MaxEventDelay, last_update_time=LastUpdateTime} = State,
Now = os:timestamp(),
case LastUpdateTime of
undefined ->
{ok, State#state{last_update_time=Now}, MaxEventDelay};
_ ->
Delta = timer:now_diff(Now, LastUpdateTime) div 1000,
if Delta >= MaxEventDelay ->
Updates = sets:to_list(State#state.pending_updates),
try group_updates_by_node(State#state.dbname, Updates) of
Grouped ->
dict:map(fun(Node, Docs) ->
couch_stats:increment_counter([global_changes, rpcs]),
global_changes_server:update_docs(Node, Docs)
end, Grouped)
catch error:database_does_not_exist ->
ok
end,
couch_stats:update_gauge(
[global_changes, listener_pending_updates],
0
),
State1 = State#state{
pending_updates=sets:new(),
pending_update_count=0,
last_update_time=undefined
},
{ok, State1};
true ->
{ok, State, MaxEventDelay-Delta}
end
end;
maybe_send_updates(State) ->
{ok, State}.
handle_info(_Msg, State) ->
maybe_send_updates(State).
%% restore spec when R14 support is dropped
%% -spec group_updates_by_node(binary(), [binary()]) -> dict:dict().
group_updates_by_node(DbName, Updates) ->
lists:foldl(fun(Key, OuterAcc) ->
Shards = mem3:shards(DbName, Key),
lists:foldl(fun(#shard{node=Node}, InnerAcc) ->
dict:append(Node, Key, InnerAcc)
end, OuterAcc, Shards)
end, dict:new(), Updates).