%%%-------------------------------------------------------------------
%%% @author dlive
%%% @copyright (C) 2017, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. Dec 2017 7:34 PM
%%%-------------------------------------------------------------------
-module(dubbo_consumer_pool).
-author("dlive").

-behaviour(gen_server).

%% API
-export([start_link/0, start_consumer/2]).

%% gen_server callbacks
-export([init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2,
    code_change/3]).

-export([select_connection/1,select_connection/2]).

-include("dubbo.hrl").
-define(SERVER, ?MODULE).

-define(INTERFCE_LIST_TABLE,interface_list).
-define(PROVIDER_NODE_LIST_TABLE,provider_node_list).

-record(state, {}).



%%%===================================================================
%%% API
%%%===================================================================

%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @end
%%--------------------------------------------------------------------
-spec(start_link() ->
    {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%%                     {ok, State, Timeout} |
%%                     ignore |
%%                     {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec(init(Args :: term()) ->
    {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
    {stop, Reason :: term()} | ignore).
init([]) ->
    init_ets_table(),
    {ok, #state{}}.
init_ets_table()->
    try ets:new(?INTERFCE_LIST_TABLE, [bag,public,named_table,{keypos,2}]) of
        ?INTERFCE_LIST_TABLE ->
            ok
    catch
        _Type:Reason ->
            lager:error("new ets table error ~p",[Reason]),
            error
    end,
    try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag,public,named_table,{keypos,2}]) of
        ?PROVIDER_NODE_LIST_TABLE ->
            ok
    catch
        _Type1:Reason1 ->
            lager:error("new ets table error ~p",[Reason1]),
            error
    end,
    ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
    State :: #state{}) ->
    {reply, Reply :: term(), NewState :: #state{}} |
    {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
    {noreply, NewState :: #state{}} |
    {noreply, NewState :: #state{}, timeout() | hibernate} |
    {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
    {stop, Reason :: term(), NewState :: #state{}}).

handle_call({add_consumer,Interface,ProviderNodeList}, _From, State) ->

    OldProviderList = get_interface_provider_node(Interface),
    NewProviderList = add_consumer(ProviderNodeList,[]),
    DeleteProverList = OldProviderList -- NewProviderList,
    clean_invalid_provider(DeleteProverList),
    {reply, ok, State};
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_cast(Request :: term(), State :: #state{}) ->
    {noreply, NewState :: #state{}} |
    {noreply, NewState :: #state{}, timeout() | hibernate} |
    {stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%%                                   {noreply, State, Timeout} |
%%                                   {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
    {noreply, NewState :: #state{}} |
    {noreply, NewState :: #state{}, timeout() | hibernate} |
    {stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_se  rver when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
    State :: #state{}) -> term()).
terminate(_Reason, _State) ->
    ok.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
    Extra :: term()) ->
    {ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

start_consumer(Interface,ProviderNodeInfo)->
    gen_server:call(?SERVER,{add_consumer,Interface,ProviderNodeInfo}).


%%%===================================================================
%%% Internal functions
%%%===================================================================
add_consumer([],RegisterList)->
    RegisterList;
add_consumer([ProviderNodeInfo|ProviderList],RegisterList)->
    case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of
        {ok,ProviderConfig} ->
            HostFlag=get_host_flag(ProviderConfig),
            case ets:lookup(?PROVIDER_NODE_LIST_TABLE,HostFlag) of
                []->
                    ConnectionList = start_provider_process(HostFlag,30,ProviderConfig),
                    ok = update_connection_info(ProviderConfig#provider_config.interface,HostFlag,ConnectionList,true),
                    ok;
                List ->
                    List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem })->
                        ConnectionItem
                        end,List),
                    ok = update_connection_info(ProviderConfig#provider_config.interface,HostFlag,List2,false),
                    ok
            end,
            add_consumer(ProviderList,[HostFlag]++RegisterList);
        {error,R1} ->
            lager:error("parse provider info error reason ~p",[R1]),
            add_consumer(ProviderList,RegisterList)
    end.

start_provider_process(HostFlag,Weight,ProviderConfig) ->
    ExecutesList=lists:seq(1,ProviderConfig#provider_config.executes),
    ConnectionList= lists:map(fun(Item) ->
        ConnectionFlag= << HostFlag/binary,(integer_to_binary(Item))/binary >>,
        ConnectionFlagTerm= binary_to_atom(ConnectionFlag,utf8),
        AChild = {ConnectionFlagTerm,{dubbo_netty_client, start_link, [ConnectionFlagTerm,HostFlag,ProviderConfig,Item]}, permanent, 2000, worker, [dubbo_netty_client]},
        {ok,Pid} = dubbo_consumer_pool_sup:add_children(AChild),
        lager:info("start provider ~p pid info ~p~n",[HostFlag,Pid]),
        #connection_info{connection_id = ConnectionFlagTerm,pid = Pid,weight = Weight,host_flag = HostFlag}
        end,ExecutesList),
    ConnectionList.
get_host_flag(ProviderConfig)->
    HostFlag= << (list_to_binary(ProviderConfig#provider_config.host))/binary,<<"_">>/binary,(integer_to_binary(ProviderConfig#provider_config.port))/binary>>,
    HostFlag.

update_connection_info(Interface,HostFlag,ConnectionList,IsUpdateProvideNode)->
    lists:map(fun(Item) ->
        I1 = ets:insert(?INTERFCE_LIST_TABLE,#interface_list{interface = Interface,connection_info = Item}),
        lager:debug("save INTERFCE_LIST_TABLE ~p info:~p",[Interface,I1]),
        case IsUpdateProvideNode of
            true->
                I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,connection_info = Item }),
                lager:debug("save PROVIDER_NODE_LIST_TABLE ~p info:~p",[HostFlag,I2]);
            false->
                ok
        end,
        ok
              end,ConnectionList),
    ok.

get_interface_provider_node(Interface)->
    case ets:lookup(?INTERFCE_LIST_TABLE,Interface) of
        []->
            [];
        List->
            ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List],
            lists_util:del_duplicate(ListRet)
    end.

select_connection(Interface)->
    RandNum = rand:uniform(2048),
    select_connection(Interface,RandNum).
select_connection(Interface,RandNum)->
    case ets:lookup(?INTERFCE_LIST_TABLE,Interface) of
        []->
            {error,none};
        List->
            Len = length(List),
            RemNum = (RandNum rem Len)+1,
%%            RandNum2 = if
%%                           RandNum==Len -> RandNum-1;
%%                           true->RandNum
%%                       end,
            InterfaceListItem = lists:nth(RemNum,List),
            {ok,InterfaceListItem#interface_list.connection_info}
    end.

clean_invalid_provider([])->
    ok;
clean_invalid_provider([HostFlag | DeleteProverList])->
    case ets:lookup(?PROVIDER_NODE_LIST_TABLE,HostFlag) of
        []->
            ok;
        ProviderNodeList->
            io:format("ConnectionList ~p~n",[ProviderNodeList]),
            ProviderNodeList1 = lists_util:del_duplicate(ProviderNodeList),
            clean_connection_info(ProviderNodeList1)
    end,
    clean_invalid_provider(DeleteProverList).

clean_connection_info(ProviderNodeList)->
    lists:map(fun(Item)->
        Pid=Item#provider_node_list.connection_info#connection_info.pid,
        ConnectionId=Item#provider_node_list.connection_info#connection_info.connection_id,
        Pattern=#interface_list{pid=Pid,_='_'},
        ets:delete_object(?INTERFCE_LIST_TABLE,Pattern),
        dubbo_consumer_pool_sup:stop_children(ConnectionId)
              end,ProviderNodeList),
    ok.