blob: 0a01d381ee1e7df17c3e645b33a43f1b99994fd1 [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_consumer_pool).
-behaviour(gen_server).
%% API
-export([start_link/0, start_consumer/2]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([select_connection/1, select_connection/2, update_connection_readonly/2]).
-include("dubbo.hrl").
-define(SERVER, ?MODULE).
-define(INTERFCE_LIST_TABLE, interface_list).
-define(PROVIDER_NODE_LIST_TABLE, provider_node_list).
-record(state, {}).
-ifdef(TEST).
-compile([export_all]).
-endif.
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @end
%%--------------------------------------------------------------------
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
init_ets_table(),
{ok, #state{}}.
init_ets_table() ->
try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
?INTERFCE_LIST_TABLE ->
ok
catch
_Type:Reason ->
logger:error("new ets table error ~p", [Reason]),
error
end,
try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of
?PROVIDER_NODE_LIST_TABLE ->
ok
catch
_Type1:Reason1 ->
logger:error("new ets table error ~p", [Reason1]),
error
end,
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) ->
OldProviderList = get_interface_provider_node(Interface),
NewProviderList = add_consumer(ProviderNodeList, []),
DeleteProverList = OldProviderList -- NewProviderList,
clean_invalid_provider(DeleteProverList),
{reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
start_consumer(Interface, ProviderNodeInfo) ->
gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}).
%%%===================================================================
%%% Internal functions
%%%===================================================================
add_consumer([], RegisterList) ->
RegisterList;
add_consumer([ProviderNodeInfo | ProviderList], RegisterList) ->
case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
{ok, ProviderConfig} ->
HostFlag = get_host_flag(ProviderConfig),
case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
[] ->
ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),
ok;
List ->
List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) ->
ConnectionItem
end, List),
ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false),
ok
end,
add_consumer(ProviderList, [HostFlag] ++ RegisterList);
{error, R1} ->
logger:error("parse provider info error reason ~p", [R1]),
add_consumer(ProviderList, RegisterList)
end.
start_provider_process(HostFlag, Weight, ProviderConfig) ->
ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes),
ConnectionList = lists:map(fun(Item) ->
ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>,
ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8),
AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]},
{ok, Pid} = dubbo_consumer_pool_sup:add_children(AChild),
logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
#connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag}
end, ExecutesList),
ConnectionList.
get_host_flag(ProviderConfig) ->
HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
HostFlag.
update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) ->
lists:map(fun(Item) ->
I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}),
logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
case IsUpdateProvideNode of
true ->
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
false ->
ok
end,
ok
end, ConnectionList),
ok.
get_interface_provider_node(Interface) ->
case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
[] ->
[];
List ->
ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
dubbo_lists_util:del_duplicate(ListRet)
end.
select_connection(Interface) ->
RandNum = rand:uniform(2048),
select_connection(Interface, RandNum).
select_connection(Interface, RandNum) ->
case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of
[] ->
{error, none};
List ->
Len = length(List),
RemNum = (RandNum rem Len) + 1,
InterfaceListItem = lists:nth(RemNum, List),
{ok, InterfaceListItem#interface_list.connection_info}
end.
-spec(update_connection_readonly(pid(), boolean()) -> ok).
update_connection_readonly(ConnectionPid, Readonly) ->
Pattern = #interface_list{pid = ConnectionPid, _ = '_'},
Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern),
lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) ->
logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]),
NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly},
NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo},
ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection),
ets:insert(?INTERFCE_LIST_TABLE, NewObject)
end, Objects),
{ok, length(Objects)}.
clean_invalid_provider([]) ->
ok;
clean_invalid_provider([HostFlag | DeleteProverList]) ->
case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
[] ->
ok;
ProviderNodeList ->
ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
clean_connection_info(ProviderNodeList1)
end,
clean_invalid_provider(DeleteProverList).
clean_connection_info(ProviderNodeList) ->
lists:map(fun(Item) ->
Pid = Item#provider_node_list.connection_info#connection_info.pid,
ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
Pattern = #interface_list{pid = Pid, _ = '_'},
ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
dubbo_consumer_pool_sup:stop_children(ConnectionId)
end, ProviderNodeList),
ok.