| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(global_changes_server). |
| -behaviour(gen_server). |
| -vsn(1). |
| |
| |
| -export([ |
| start_link/0 |
| ]). |
| |
| -export([ |
| init/1, |
| terminate/2, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| code_change/3 |
| ]). |
| |
| -export([ |
| update_docs/2 |
| ]). |
| |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("mem3/include/mem3.hrl"). |
| |
| -record(state, { |
| update_db, |
| pending_update_count, |
| pending_updates, |
| max_write_delay, |
| dbname, |
| handler_ref |
| }). |
| |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| |
| init([]) -> |
| {ok, Handler} = global_changes_listener:start(), |
| % get configs as strings |
| UpdateDb0 = config:get("global_changes", "update_db", "true"), |
| MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "500"), |
| |
| % make config strings into other data types |
| UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end, |
| MaxWriteDelay = list_to_integer(MaxWriteDelay0), |
| |
| % Start our write triggers |
| erlang:send_after(MaxWriteDelay, self(), flush_updates), |
| |
| State = #state{ |
| update_db=UpdateDb, |
| pending_update_count=0, |
| pending_updates=sets:new(), |
| max_write_delay=MaxWriteDelay, |
| dbname=global_changes_util:get_dbname(), |
| handler_ref=erlang:monitor(process, Handler) |
| }, |
| {ok, State}. |
| |
| |
| terminate(_Reason, _Srv) -> |
| ok. |
| |
| |
| handle_call(_Msg, _From, State) -> |
| {reply, ok, State}. |
| |
| |
| handle_cast(_Msg, #state{update_db=false}=State) -> |
| {noreply, State}; |
| handle_cast({update_docs, DocIds}, State) -> |
| Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates), |
| PendingCount = sets:size(Pending), |
| couch_stats:update_gauge( |
| [global_changes, server_pending_updates], |
| PendingCount |
| ), |
| NewState = State#state{ |
| pending_updates=Pending, |
| pending_update_count=PendingCount |
| }, |
| {noreply, NewState}; |
| |
| handle_cast({set_max_write_delay, MaxWriteDelay}, State) -> |
| NewState = State#state{max_write_delay=MaxWriteDelay}, |
| {noreply, NewState}; |
| handle_cast({set_update_db, Boolean}, State0) -> |
| % If turning update_db off, clear out server state |
| State = case {Boolean, State0#state.update_db} of |
| {false, true} -> |
| State0#state{ |
| update_db=Boolean, |
| pending_updates=sets:new(), |
| pending_update_count=0 |
| }; |
| _ -> |
| State0#state{update_db=Boolean} |
| end, |
| {noreply, State}; |
| handle_cast(_Msg, State) -> |
| {noreply, State}. |
| |
| |
| handle_info(flush_updates, #state{pending_update_count=0}=State) -> |
| erlang:send_after(State#state.max_write_delay, self(), flush_updates), |
| {noreply, State}; |
| handle_info(flush_updates, #state{update_db=false}=State) -> |
| erlang:send_after(State#state.max_write_delay, self(), flush_updates), |
| {noreply, State}; |
| handle_info(flush_updates, State) -> |
| erlang:send_after(State#state.max_write_delay, self(), flush_updates), |
| flush_updates(State); |
| handle_info(start_listener, State) -> |
| {ok, Handler} = global_changes_listener:start(), |
| NewState = State#state{ |
| handler_ref=erlang:monitor(process, Handler) |
| }, |
| {noreply, NewState}; |
| handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) -> |
| couch_log:error("global_changes_listener terminated: ~w", [Reason]), |
| erlang:send_after(5000, self(), start_listener), |
| {noreply, State}; |
| handle_info(_, State) -> |
| {noreply, State}. |
| |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| |
| flush_updates(State) -> |
| DocIds = sets:to_list(State#state.pending_updates), |
| try group_ids_by_shard(State#state.dbname, DocIds) of |
| GroupedIds -> |
| Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) -> |
| {ok, Shard} = couch_db:open(ShardName, [?ADMIN_CTX]), |
| try |
| GroupedDocs = get_docs_locally(Shard, Ids), |
| GroupedDocs ++ DocInfoAcc |
| after |
| couch_db:close(Shard) |
| end |
| end, [], GroupedIds), |
| |
| spawn(fun() -> |
| fabric:update_docs(State#state.dbname, Docs, []) |
| end), |
| |
| Count = State#state.pending_update_count, |
| couch_stats:increment_counter( |
| [global_changes, db_writes], |
| Count |
| ) |
| catch error:database_does_not_exist -> |
| {noreply, State} |
| end, |
| couch_stats:update_gauge( |
| [global_changes, server_pending_updates], |
| 0 |
| ), |
| {noreply, State#state{ |
| pending_updates=sets:new(), |
| pending_update_count=0 |
| }}. |
| |
| |
| update_docs(Node, Updates) -> |
| gen_server:cast({?MODULE, Node}, {update_docs, Updates}). |
| |
| |
| group_ids_by_shard(DbName, DocIds) -> |
| LocalNode = node(), |
| lists:foldl(fun(DocId, Acc) -> |
| Shards = mem3:shards(DbName, DocId), |
| lists:foldl(fun |
| (#shard{node=Node, name=Name}, Acc1) when Node == LocalNode -> |
| dict:append(Name, DocId, Acc1); |
| (_, Acc1) -> |
| Acc1 |
| end, Acc, Shards) |
| end, dict:new(), DocIds). |
| |
| |
| get_docs_locally(Shard, Ids) -> |
| lists:map(fun(Id) -> |
| DocInfo = couch_db:get_doc_info(Shard, Id), |
| #doc{id=Id, revs=get_rev(DocInfo)} |
| end, Ids). |
| |
| |
| get_rev(not_found) -> |
| {0, []}; |
| get_rev({ok, #doc_info{revs=[RevInfo]}}) -> |
| {Pos, Rev} = RevInfo#rev_info.rev, |
| {Pos, [Rev]}; |
| get_rev({ok, #doc_info{revs=[RevInfo|_]}}) -> |
| % couch_doc:to_doc_info/1 sorts things so that the first |
| % #rev_info in the list is the "winning" revision which is |
| % the one we'd want to base our edit off of. In theory |
| % global_changes should never encounter a conflict by design |
| % but we should record if it happens in case our design isn't |
| % quite right. |
| couch_stats:increment_counter([global_changes, event_doc_conflict]), |
| {Pos, Rev} = RevInfo#rev_info.rev, |
| {Pos, [Rev]}. |