blob: f7c96850dc148e1c38b5fe314c1fd257eb14a503 [file] [log] [blame]
%%%-------------------------------------------------------------------
%%% @author dlive
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. Mar 2018 12:17 AM
%%%-------------------------------------------------------------------
-module(dubbo_invoker).
-author("dlive").
-include("dubbo.hrl").
%% API
-export([invoke_request/2,invoke_request/3,invoke_request/5]).
-spec invoke_request(Interface::binary(),Request::#dubbo_request{})->
{ok,reference()}|
{ok,reference(),Data::any(),RpcContent::list()}|
{error,Reason::timeout|no_provider|any()}.
invoke_request(Interface,Request)->
invoke_request(Interface,Request,[],#{},self()).
-spec invoke_request(Interface::binary(),Request::#dubbo_request{},RequestOption::map())->
{ok,reference()}|
{ok,reference(),Data::any(),RpcContent::list()}|
{error,Reason::timeout|no_provider|any()}.
invoke_request(Interface,Request,RequestOption)->
invoke_request(Interface,Request,maps:get(ctx,RequestOption,[]),RequestOption,self()).
-spec invoke_request(Interface::binary(),Request::#dubbo_request{},RpcContext::list(),RequestState::map(),CallBackPid::pid())->
{ok,reference()}|
{ok,reference(),Data::any(),RpcContent::list()}|
{error,Reason::timeout|no_provider|request_full|any()}.
invoke_request(Interface,Request,RpcContext,RequestState,CallBackPid)->
case dubbo_consumer_pool:select_connection(Interface,Request#dubbo_request.mid) of
{ok,#connection_info{pid=Pid,host_flag = HostFlag}} ->
case dubbo_traffic_control:check_goon(HostFlag,199) of
ok ->
Request2 = merge_attachments(Request,RpcContext),
{ok,RequestData} = dubbo_codec:encode_request(Request2),
Ref=get_ref(RequestState),
RequestState2 = request_context:update(<<"t_agent_e">>,RequestState),
gen_server:cast(Pid,{send_request,Ref,Request2,RequestData,CallBackPid,RequestState2}),
case is_sync(RequestState) of
true->
sync_receive(Ref,get_timeout(RequestState));
false-> {ok, Ref}
end;
full ->
{error,request_full}
end;
{error,none}->
logger:error("[INVOKE] ~p error Reason no_provider",[Interface]),
{error,no_provider}
end.
is_sync(Option)->
maps:is_key(sync,Option).
get_ref(Option)->
maps:get(ref,Option,make_ref()).
get_timeout(Option)->
maps:get(timeout,Option,?REQUEST_TIME_OUT).
sync_receive(Ref,TimeOut)->
receive
{'$gen_cast',{response_process,Ref,RpcContent,Response}}->
{ok,Ref,Response,RpcContent}
after
TimeOut ->
{error,timeout}
end.
merge_attachments(#dubbo_request{data = null}=Request,_Option) ->
Request;
merge_attachments(Request,Option)->
Attachements= Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
case lists:keyfind(attachments,1,Option) of
false->OptionAttachments=[];
{attachments,OptionAttachments}->
OptionAttachments
end,
List=[
{<<"version">>, <<"0.0.0">>},
{<<"timeout">>, <<"5000">>}
],
Attachements2= lists:merge3(Attachements,OptionAttachments,List),
Data2=Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
Request#dubbo_request{data = Data2}.