| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator_connection). |
| |
| -behavior(gen_server). |
| -behavior(config_listener). |
| |
| -export([start_link/0]). |
| -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). |
| -export([code_change/3, terminate/2]). |
| |
| -export([acquire/1, relinquish/1]). |
| |
| -export([handle_config_change/5, handle_config_terminate/3]). |
| |
| -define(DEFAULT_CLOSE_INTERVAL, 90000). |
| -define(RELISTEN_DELAY, 5000). |
| |
| -record(state, { |
| close_interval, |
| timer |
| }). |
| |
| -record(connection, { |
| worker, |
| host, |
| port, |
| mref |
| }). |
| |
| -include_lib("ibrowse/include/ibrowse.hrl"). |
| |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| ?MODULE = ets:new(?MODULE, [named_table, public, {keypos, #connection.worker}]), |
| ok = config:listen_for_changes(?MODULE, nil), |
| Interval = config:get_integer("replicator", "connection_close_interval", ?DEFAULT_CLOSE_INTERVAL), |
| {ok, Timer} = timer:send_after(Interval, close_idle_connections), |
| ibrowse:add_config([{inactivity_timeout, Interval}]), |
| {ok, #state{close_interval=Interval, timer=Timer}}. |
| |
| |
| acquire(URL) when is_binary(URL) -> |
| acquire(binary_to_list(URL)); |
| |
| acquire(URL) -> |
| case gen_server:call(?MODULE, {acquire, URL}) of |
| {ok, Worker} -> |
| link(Worker), |
| {ok, Worker}; |
| {error, all_allocated} -> |
| {ok, Pid} = ibrowse:spawn_link_worker_process(URL), |
| ok = gen_server:call(?MODULE, {create, URL, Pid}), |
| {ok, Pid}; |
| {error, Reason} -> |
| {error, Reason} |
| end. |
| |
| |
| relinquish(Worker) -> |
| unlink(Worker), |
| gen_server:cast(?MODULE, {relinquish, Worker}). |
| |
| |
| handle_call({acquire, URL}, From, State) -> |
| {Pid, _Ref} = From, |
| case ibrowse_lib:parse_url(URL) of |
| #url{host=Host, port=Port} -> |
| case ets:match_object(?MODULE, #connection{host=Host, port=Port, mref=undefined, _='_'}, 1) of |
| '$end_of_table' -> |
| {reply, {error, all_allocated}, State}; |
| {[Worker], _Cont} -> |
| couch_stats:increment_counter([couch_replicator, connection, acquires]), |
| ets:insert(?MODULE, Worker#connection{mref=monitor(process, Pid)}), |
| {reply, {ok, Worker#connection.worker}, State} |
| end; |
| {error, invalid_uri} -> |
| {reply, {error, invalid_uri}, State} |
| end; |
| |
| handle_call({create, URL, Worker}, From, State) -> |
| {Pid, _Ref} = From, |
| case ibrowse_lib:parse_url(URL) of |
| #url{host=Host, port=Port} -> |
| link(Worker), |
| couch_stats:increment_counter([couch_replicator, connection, creates]), |
| true = ets:insert_new( |
| ?MODULE, |
| #connection{host=Host, port=Port, worker=Worker, mref=monitor(process, Pid)} |
| ), |
| {reply, ok, State} |
| end. |
| |
| |
| handle_cast({relinquish, WorkerPid}, State) -> |
| couch_stats:increment_counter([couch_replicator, connection, relinquishes]), |
| case ets:lookup(?MODULE, WorkerPid) of |
| [Worker] -> |
| case Worker#connection.mref of |
| MRef when is_reference(MRef) -> demonitor(MRef, [flush]); |
| undefined -> ok |
| end, |
| ets:insert(?MODULE, Worker#connection{mref=undefined}); |
| [] -> |
| ok |
| end, |
| {noreply, State}; |
| |
| handle_cast({connection_close_interval, V}, State) -> |
| {ok, cancel} = timer:cancel(State#state.timer), |
| {ok, NewTimer} = timer:send_after(V, close_idle_connections), |
| ibrowse:add_config([{inactivity_timeout, V}]), |
| {noreply, State#state{close_interval=V, timer=NewTimer}}. |
| |
| |
| % owner crashed |
| handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> |
| couch_stats:increment_counter([couch_replicator, connection, owner_crashes]), |
| ets:match_delete(?MODULE, #connection{mref=Ref, _='_'}), |
| {noreply, State}; |
| |
| % worker crashed |
| handle_info({'EXIT', Pid, Reason}, State) -> |
| couch_stats:increment_counter([couch_replicator, connection, worker_crashes]), |
| case ets:lookup(?MODULE, Pid) of |
| [] -> |
| ok; |
| [Worker] -> |
| #connection{host=Host, port=Port} = Worker, |
| maybe_log_worker_death(Host, Port, Reason), |
| case Worker#connection.mref of |
| MRef when is_reference(MRef) -> demonitor(MRef, [flush]); |
| undefined -> ok |
| end, |
| ets:delete(?MODULE, Pid) |
| end, |
| {noreply, State}; |
| |
| handle_info(close_idle_connections, State) -> |
| #state{ |
| close_interval=Interval, |
| timer=Timer |
| } = State, |
| Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}), |
| lists:foreach(fun(Conn) -> |
| couch_stats:increment_counter([couch_replicator, connection, closes]), |
| delete_worker(Conn) |
| end, Conns), |
| {ok, cancel} = timer:cancel(Timer), |
| {ok, NewTimer} = timer:send_after(Interval, close_idle_connections), |
| {noreply, State#state{timer=NewTimer}}; |
| |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}. |
| |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| terminate(_Reason, _State) -> |
| ok. |
| |
| |
| maybe_log_worker_death(_Host, _Port, normal) -> |
| ok; |
| |
| maybe_log_worker_death(Host, Port, Reason) -> |
| ErrMsg = "Replication connection to: ~p:~p died with reason ~p", |
| couch_log:info(ErrMsg, [Host, Port, Reason]). |
| |
| |
| -spec delete_worker(#connection{}) -> ok. |
| delete_worker(Worker) -> |
| ets:delete(?MODULE, Worker#connection.worker), |
| unlink(Worker#connection.worker), |
| spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end), |
| ok. |
| |
| |
| handle_config_change("replicator", "connection_close_interval", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {connection_close_interval, |
| list_to_integer(V)}), |
| {ok, S}; |
| |
| handle_config_change(_, _, _, _, S) -> |
| {ok, S}. |
| |
| |
| handle_config_terminate(_, stop, _) -> |
| ok; |
| |
| handle_config_terminate(_, _, _) -> |
| Pid = whereis(?MODULE), |
| erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). |