| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator_connection). |
| |
| -behavior(gen_server). |
| -behavior(config_listener). |
| |
| -export([ |
| start_link/0 |
| ]). |
| |
| -export([ |
| init/1, |
| terminate/2, |
| handle_call/3, |
| handle_info/2, |
| handle_cast/2, |
| code_change/3 |
| ]). |
| |
| -export([ |
| acquire/1, |
| acquire/2, |
| release/1 |
| ]). |
| |
| -export([ |
| handle_config_change/5, |
| handle_config_terminate/3 |
| ]). |
| |
| -include_lib("ibrowse/include/ibrowse.hrl"). |
| |
| -define(DEFAULT_CLOSE_INTERVAL, 90000). |
| -define(RELISTEN_DELAY, 5000). |
| |
| -record(state, { |
| close_interval, |
| timer |
| }). |
| |
| -record(connection, { |
| worker, |
| host, |
| port, |
| proxy_host, |
| proxy_port, |
| mref |
| }). |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| ?MODULE = ets:new(?MODULE, [ |
| named_table, |
| public, |
| {keypos, #connection.worker} |
| ]), |
| ok = config:listen_for_changes(?MODULE, nil), |
| Interval = config:get_integer( |
| "replicator", |
| "connection_close_interval", |
| ?DEFAULT_CLOSE_INTERVAL |
| ), |
| Timer = erlang:send_after(Interval, self(), close_idle_connections), |
| ibrowse:add_config([ |
| {inactivity_timeout, Interval}, |
| {worker_trap_exits, false} |
| ]), |
| {ok, #state{close_interval = Interval, timer = Timer}}. |
| |
| acquire(Url) -> |
| acquire(Url, undefined). |
| |
| acquire(Url, ProxyUrl) when is_binary(Url) -> |
| acquire(binary_to_list(Url), ProxyUrl); |
| acquire(Url, ProxyUrl) when is_binary(ProxyUrl) -> |
| acquire(Url, binary_to_list(ProxyUrl)); |
| acquire(Url0, ProxyUrl0) -> |
| Url = couch_util:url_strip_password(Url0), |
| ProxyUrl = |
| case ProxyUrl0 of |
| undefined -> undefined; |
| _ -> couch_util:url_strip_password(ProxyUrl0) |
| end, |
| case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of |
| {ok, Worker} -> |
| link(Worker), |
| {ok, Worker}; |
| {error, all_allocated} -> |
| {ok, Pid} = ibrowse:spawn_link_worker_process(Url), |
| ok = gen_server:call(?MODULE, {create, Url, ProxyUrl, Pid}), |
| {ok, Pid}; |
| {error, Reason} -> |
| {error, Reason} |
| end. |
| |
| release(Worker) -> |
| unlink(Worker), |
| gen_server:cast(?MODULE, {release, Worker}). |
| |
| handle_call({acquire, Url, ProxyUrl}, From, State) -> |
| {Pid, _Ref} = From, |
| case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of |
| {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} -> |
| Pat = #connection{ |
| host = Host, |
| port = Port, |
| proxy_host = ProxyHost, |
| proxy_port = ProxyPort, |
| mref = undefined, |
| _ = '_' |
| }, |
| case ets:match_object(?MODULE, Pat, 1) of |
| '$end_of_table' -> |
| {reply, {error, all_allocated}, State}; |
| {[Worker], _Cont} -> |
| couch_stats:increment_counter([ |
| couch_replicator, |
| connection, |
| acquires |
| ]), |
| ets:insert(?MODULE, Worker#connection{ |
| mref = monitor( |
| process, |
| Pid |
| ) |
| }), |
| {reply, {ok, Worker#connection.worker}, State} |
| end; |
| {{error, invalid_uri}, _} -> |
| {reply, {error, invalid_uri}, State}; |
| {_, {error, invalid_uri}} -> |
| {reply, {error, invalid_uri}, State} |
| end; |
| handle_call({create, Url, ProxyUrl, Worker}, From, State) -> |
| {Pid, _Ref} = From, |
| case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of |
| {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} -> |
| link(Worker), |
| couch_stats:increment_counter([ |
| couch_replicator, |
| connection, |
| creates |
| ]), |
| true = ets:insert_new( |
| ?MODULE, |
| #connection{ |
| host = Host, |
| port = Port, |
| proxy_host = ProxyHost, |
| proxy_port = ProxyPort, |
| worker = Worker, |
| mref = monitor(process, Pid) |
| } |
| ), |
| {reply, ok, State} |
| end. |
| |
| handle_cast({release, WorkerPid}, State) -> |
| couch_stats:increment_counter([couch_replicator, connection, releases]), |
| case ets:lookup(?MODULE, WorkerPid) of |
| [Worker] -> |
| case Worker#connection.mref of |
| MRef when is_reference(MRef) -> demonitor(MRef, [flush]); |
| undefined -> ok |
| end, |
| ets:insert(?MODULE, Worker#connection{mref = undefined}); |
| [] -> |
| ok |
| end, |
| {noreply, State}; |
| handle_cast({connection_close_interval, V}, State) -> |
| erlang:cancel_timer(State#state.timer), |
| NewTimer = erlang:send_after(V, self(), close_idle_connections), |
| ibrowse:add_config([{inactivity_timeout, V}]), |
| {noreply, State#state{close_interval = V, timer = NewTimer}}. |
| |
| % owner crashed |
| handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> |
| couch_stats:increment_counter([ |
| couch_replicator, |
| connection, |
| owner_crashes |
| ]), |
| Conns = ets:match_object(?MODULE, #connection{mref = Ref, _ = '_'}), |
| lists:foreach( |
| fun(Conn) -> |
| couch_stats:increment_counter([couch_replicator, connection, closes]), |
| delete_worker(Conn) |
| end, |
| Conns |
| ), |
| {noreply, State}; |
| % worker crashed |
| handle_info({'EXIT', Pid, Reason}, State) -> |
| couch_stats:increment_counter([ |
| couch_replicator, |
| connection, |
| worker_crashes |
| ]), |
| case ets:lookup(?MODULE, Pid) of |
| [] -> |
| ok; |
| [Worker] -> |
| #connection{host = Host, port = Port} = Worker, |
| maybe_log_worker_death(Host, Port, Reason), |
| case Worker#connection.mref of |
| MRef when is_reference(MRef) -> demonitor(MRef, [flush]); |
| undefined -> ok |
| end, |
| ets:delete(?MODULE, Pid) |
| end, |
| {noreply, State}; |
| handle_info(close_idle_connections, State) -> |
| #state{ |
| close_interval = Interval, |
| timer = Timer |
| } = State, |
| Conns = ets:match_object(?MODULE, #connection{mref = undefined, _ = '_'}), |
| lists:foreach( |
| fun(Conn) -> |
| couch_stats:increment_counter([couch_replicator, connection, closes]), |
| delete_worker(Conn) |
| end, |
| Conns |
| ), |
| erlang:cancel_timer(Timer), |
| NewTimer = erlang:send_after(Interval, self(), close_idle_connections), |
| {noreply, State#state{timer = NewTimer}}; |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| terminate(_Reason, _State) -> |
| ok. |
| |
| maybe_log_worker_death(_Host, _Port, normal) -> |
| ok; |
| maybe_log_worker_death(Host, Port, Reason) -> |
| ErrMsg = "Replication connection to: ~p:~p died with reason ~p", |
| couch_log:info(ErrMsg, [Host, Port, Reason]). |
| |
| -spec delete_worker(#connection{}) -> ok. |
| delete_worker(Worker) -> |
| ets:delete(?MODULE, Worker#connection.worker), |
| unlink(Worker#connection.worker), |
| spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end), |
| ok. |
| |
| handle_config_change("replicator", "connection_close_interval", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {connection_close_interval, list_to_integer(V)}), |
| {ok, S}; |
| handle_config_change(_, _, _, _, S) -> |
| {ok, S}. |
| |
| handle_config_terminate(_, stop, _) -> |
| ok; |
| handle_config_terminate(_, _, _) -> |
| Pid = whereis(?MODULE), |
| erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). |
| |
| parse_proxy_url(undefined) -> |
| #url{host = undefined, port = undefined}; |
| parse_proxy_url(ProxyUrl) -> |
| ibrowse_lib:parse_url(ProxyUrl). |