| %%------------------------------------------------------------------------------ |
| %% Licensed to the Apache Software Foundation (ASF) under one or more |
| %% contributor license agreements. See the NOTICE file distributed with |
| %% this work for additional information regarding copyright ownership. |
| %% The ASF licenses this file to You under the Apache License, Version 2.0 |
| %% (the "License"); you may not use this file except in compliance with |
| %% the License. You may obtain a copy of the License at |
| %% |
| %% http://www.apache.org/licenses/LICENSE-2.0 |
| %% |
| %% Unless required by applicable law or agreed to in writing, software |
| %% distributed under the License is distributed on an "AS IS" BASIS, |
| %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| %% See the License for the specific language governing permissions and |
| %% limitations under the License. |
| %%------------------------------------------------------------------------------ |
| -module(dubbo_provider_consumer_reg_table). |
| |
| -behaviour(gen_server). |
| |
| %% API |
| -export([start_link/0, start_consumer/2]). |
| |
| %% gen_server callbacks |
| -export([init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3]). |
| |
| -export([update_consumer_connections/2, update_node_conections/2, query_node_connections/1, get_interface_provider_node/1, get_host_connections/2, select_connection/1, |
| update_connection_readonly/2, get_host_flag/1, get_host_flag/2, clean_invalid_provider/1, update_interface_info/1, get_interface_info/1]). |
| |
| -include("dubbo.hrl"). |
| -define(SERVER, ?MODULE). |
| |
| -define(INTERFCE_LIST_TABLE, interface_list). |
| |
| -define(INTERFACE_INFO_TABLE, dubbo_interface_info). |
| |
| -define(PROVIDER_NODE_LIST_TABLE, provider_node_list). |
| |
| -record(state, {}). |
| |
| -ifdef(TEST). |
| -compile([export_all]). |
| -endif. |
| |
| |
| %%%=================================================================== |
| %%% API |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @doc |
| %% Starts the server |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(start_link() -> |
| {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). |
| start_link() -> |
| gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). |
| |
| %%%=================================================================== |
| %%% gen_server callbacks |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Initializes the server |
| %% |
| %% @spec init(Args) -> {ok, State} | |
| %% {ok, State, Timeout} | |
| %% ignore | |
| %% {stop, Reason} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(init(Args :: term()) -> |
| {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term()} | ignore). |
| init([]) -> |
| init_ets_table(), |
| {ok, #state{}}. |
| init_ets_table() -> |
| try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of |
| ?INTERFCE_LIST_TABLE -> |
| ok |
| catch |
| _Type:Reason -> |
| logger:error("new ets table INTERFCE_LIST_TABLE error ~p", [Reason]) |
| end, |
| try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of |
| ?PROVIDER_NODE_LIST_TABLE -> |
| ok |
| catch |
| _Type1:Reason1 -> |
| logger:error("new ets table PROVIDER_NODE_LIST_TABLE error ~p", [Reason1]) |
| end, |
| try ets:new(?INTERFACE_INFO_TABLE, [public, named_table, {keypos, 2}]) of |
| ?INTERFACE_INFO_TABLE -> |
| ok |
| catch |
| _Type2:Reason2 -> |
| logger:error("new ets table INTERFACE_INFO_TABLE error ~p", [Reason2]) |
| end, |
| ok. |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling call messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, |
| State :: #state{}) -> |
| {reply, Reply :: term(), NewState :: #state{}} | |
| {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| |
| %%handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) -> |
| %% |
| %% OldProviderList = get_interface_provider_node(Interface), |
| %% NewProviderList = add_consumer(ProviderNodeList, []), |
| %% DeleteProverList = OldProviderList -- NewProviderList, |
| %% clean_invalid_provider(DeleteProverList), |
| %% {reply, ok, State}; |
| handle_call(_Request, _From, State) -> |
| {reply, ok, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling cast messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_cast(Request :: term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| handle_cast(_Request, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling all non call/cast messages |
| %% |
| %% @spec handle_info(Info, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| handle_info(_Info, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% This function is called by a gen_server when it is about to |
| %% terminate. It should be the opposite of Module:init/1 and do any |
| %% necessary cleaning up. When it returns, the gen_server terminates |
| %% with Reason. The return value is ignored. |
| %% |
| %% @spec terminate(Reason, State) -> void() |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), |
| State :: #state{}) -> term()). |
| terminate(_Reason, _State) -> |
| ok. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Convert process state when code is changed |
| %% |
| %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, |
| Extra :: term()) -> |
| {ok, NewState :: #state{}} | {error, Reason :: term()}). |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| start_consumer(Interface, ProviderNodeInfo) -> |
| gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}). |
| |
| |
| |
| get_host_connections(Host, Port) -> |
| HostFlag = get_host_flag(Host, Port), |
| List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag), |
| List. |
| |
| |
| |
| update_interface_info(InterfaceInfo) -> |
| ets:insert(?INTERFACE_INFO_TABLE, InterfaceInfo). |
| |
| |
| get_interface_info(Interface) -> |
| case ets:lookup(?INTERFACE_INFO_TABLE, Interface) of |
| [] -> |
| undefined; |
| [Result] -> |
| Result |
| end. |
| |
| %%%=================================================================== |
| %%% Internal functions |
| %%%=================================================================== |
| update_node_conections(Interface, Connections) -> |
| lists:map( |
| fun(Item) -> |
| HostFlag = Item#connection_info.host_flag, |
| case ets:match_object(?PROVIDER_NODE_LIST_TABLE, #connection_info{host_flag = HostFlag, pid = Item#connection_info.pid, _ = '_'}) of |
| [] -> |
| I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), |
| logger:debug("update_node_conections insert one record ~p result:~p", [HostFlag, I2]); |
| _ -> |
| logger:debug("update_node_conections hostflag ~p already exit ", [HostFlag]), |
| ok |
| end |
| end, Connections), |
| ok. |
| |
| query_node_connections(Hostflag) -> |
| ets:lookup(?PROVIDER_NODE_LIST_TABLE, Hostflag). |
| |
| update_consumer_connections(Interface, Connections) -> |
| lists:map( |
| fun(Item) -> |
| I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), |
| logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), |
| ok |
| end, Connections), |
| ok. |
| |
| get_host_flag(ProviderConfig) -> |
| HostFlag = <<(ProviderConfig#provider_config.host)/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>, |
| HostFlag. |
| get_host_flag(Host, Port) -> |
| <<Host/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>. |
| |
| update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) -> |
| lists:map(fun(Item) -> |
| I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), |
| logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), |
| case IsUpdateProvideNode of |
| true -> |
| I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), |
| logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); |
| false -> |
| ok |
| end, |
| ok |
| end, ConnectionList), |
| ok. |
| |
| get_interface_provider_node(Interface) -> |
| case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of |
| [] -> |
| []; |
| List -> |
| ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List], |
| dubbo_lists_util:del_duplicate(ListRet) |
| end. |
| |
| select_connection(Interface) -> |
| case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of |
| [] -> |
| {error, none}; |
| List -> |
| Ret = [Item#interface_list.connection_info || Item <- List], |
| {ok, Ret} |
| end. |
| |
| -spec(update_connection_readonly(pid(), boolean()) -> ok). |
| update_connection_readonly(ConnectionPid, Readonly) -> |
| Pattern = #interface_list{pid = ConnectionPid, _ = '_'}, |
| Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern), |
| lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) -> |
| logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]), |
| NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly}, |
| NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo}, |
| ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection), |
| ets:insert(?INTERFCE_LIST_TABLE, NewObject) |
| end, Objects), |
| {ok, length(Objects)}. |
| |
| clean_invalid_provider([]) -> |
| ok; |
| clean_invalid_provider([HostFlag | DeleteProverList]) -> |
| case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of |
| [] -> |
| ok; |
| ProviderNodeConnections -> |
| ProviderNodeConnections1 = dubbo_lists_util:del_duplicate(ProviderNodeConnections), |
| clean_connection_info(ProviderNodeConnections1) |
| end, |
| clean_invalid_provider(DeleteProverList). |
| |
| clean_connection_info(ProviderNodeConnections) -> |
| lists:map(fun(Item) -> |
| Pid = Item#connection_info.pid, |
| Pattern = #interface_list{pid = Pid, _ = '_'}, |
| ets:delete_object(?INTERFCE_LIST_TABLE, Pattern), |
| dubbo_transport_pool_sup:stop_children(Pid) |
| end, ProviderNodeConnections), |
| ok. |