%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements.  See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License.  You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_protocol_dubbo).
-behaviour(dubbo_protocol).

-include("dubboerl.hrl").
-include("dubbo.hrl").

%% API
-export([refer/2, invoke/2, data_receive/1]).
-export([export/2]).

refer(Url, Acc) ->
    {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
    case UrlInfo#dubbo_url.scheme of
        <<"dubbo">> ->
            {ok, Invoker} = do_refer(UrlInfo),
            {ok, Invoker};
        _ ->
            {ok, Acc}
    end.

do_refer(UrlInfo) ->
    case dubbo_node_config_util:parse_provider_info(UrlInfo) of
        {ok, ProviderConfig} ->
            case getClients(ProviderConfig) of
                {ok, ConnectionInfoList} ->
                    dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface, ConnectionInfoList),
                    HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
                    {ok, #dubbo_invoker{host_flag = HostFlag, handle = ?MODULE}};
                {error, Reason} ->
                    {error, Reason}
            end;
        {error, R1} ->
            logger:error("parse provider info error reason ~p", [R1]),
            {error, R1}
    end.

export(Invoker, _Acc) ->
    registry_provider_impl_module(Invoker),
    ok = service_listen_check_start(Invoker#invoker.url),
    {ok, Invoker}.

getClients(ProviderConfig) ->
    %% @todo if connections parameter > 1, need new spec transport
    case new_transport(ProviderConfig) of
        {ok, ConnectionInfoList} ->
            {ok, ConnectionInfoList};
        {error, Reason} ->
            {error, Reason}
    end.


new_transport(ProviderConfig) ->

    case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config.host, ProviderConfig#provider_config.port) of
        [] ->
            case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of
                {ok, ConnectionInfo} ->
                    {ok, [ConnectionInfo]};
                {error, Reason} ->
                    logger:warning("start client fail ~p ~p ~p", [Reason, ProviderConfig#provider_config.host, ProviderConfig#provider_config.port]),
                    {error, Reason}
            end;
        ConnectionInfoList ->
            {ok, ConnectionInfoList}
    end.




invoke(#dubbo_rpc_invocation{source_pid = CallBackPid, transport_pid = TransportPid, call_ref = Ref} = Invocation, Acc) ->

%%    Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
    Request = dubbo_adapter:reference(Invocation),
    {ok, RequestData} = dubbo_codec:encode_request(Request),
    gen_server:cast(TransportPid, {send_request, Ref, Request, RequestData, CallBackPid, Invocation}),
    {ok, Invocation, Acc}.
%%    case is_sync(RequestState) of
%%        true ->
%%            sync_receive(Ref, get_timeout(RequestState));
%%        false -> {ok, Ref}
%%    end.



-spec(data_receive(binary()) -> ok|{do_heartbeat, Mid :: integer()}).
data_receive(Data) ->
    <<Header:16/binary, RestData/binary>> = Data,
    case dubbo_codec:decode_header(Header) of
        {ok, response, ResponseInfo} ->
            process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData);
        {ok, request, RequestInfo} ->
            {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
            logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
            process_request(Req#dubbo_request.is_event, Req);
        {error, Type, RelData} ->
            logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
            ok
    end.


%% @doc process event
-spec process_response(IsEvent :: boolean(), #dubbo_response{}, RestData :: binary()) -> ok.
process_response(false, ResponseInfo, RestData) ->
%%    dubbo_traffic_control:decr_count(State#state.host_flag),

    %% @todo traffic need move limit filter
    case get_earse_request_info(ResponseInfo#dubbo_response.mid) of
        undefined ->
            logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]);
        {SourcePid, Ref, Invocation} ->
            {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
            logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
            case Res#dubbo_response.is_event of
                false ->
                    %% @todo rpccontent need merge response with request
                    ResponseData = dubbo_type_transfer:response_to_native(Res),
                    dubbo_invoker:invoke_response(Invocation, ResponseData);
                _ ->
                    ok
            end
    end,
    ok;
process_response(true, _ResponseInfo, _RestData) ->
    ok.

process_request(true, #dubbo_request{data = <<"R">>}) ->
    {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true),
    ok;
process_request(true, Request) ->
    {do_heartbeat, Request#dubbo_request.mid};
process_request(false, Request) ->
    ok.

get_earse_request_info(Mid) ->
    erase(Mid).


registry_provider_impl_module(Invoker) ->

    case dubbo_common_fun:parse_url(Invoker#invoker.url) of
        {ok, UrlInfo} ->
            Interface = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
            ok = dubbo_provider_protocol:register_impl_provider(Interface, Invoker#invoker.handler)
    end.


service_listen_check_start(Url) ->
    case dubbo_common_fun:parse_url(Url) of
        {ok,#dubbo_url{port = Port}} ->
            case server_is_start() of
                true ->
                    ok;
                false ->
                    {ok, _} = ranch:start_listener(dubbo_provider, ranch_tcp, [{port, Port}], dubbo_provider_protocol, []),
                    ok
            end;
        {error,Reason} ->
            {error,Reason}
    end.

server_is_start() ->
    try ranch:get_protocol_options(dubbo_provider) of
        _ ->
            true
    catch
        _:_ ->
            false
    end.