add RpcContext feature
diff --git a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/impl/UserOperatorImpl.java b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/impl/UserOperatorImpl.java
index c62913d..0670887 100644
--- a/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/impl/UserOperatorImpl.java
+++ b/samples/dubbo-sample-service/src/main/java/org/apache/dubbo/erlang/sample/service/impl/UserOperatorImpl.java
@@ -21,6 +21,7 @@
import org.apache.dubbo.erlang.sample.service.bean.UserInfoRequest;
import org.apache.dubbo.erlang.sample.service.bean.UserRes;
import org.apache.dubbo.erlang.sample.service.facade.UserOperator;
+import org.apache.dubbo.rpc.RpcContext;
public class UserOperatorImpl implements UserOperator {
@@ -46,6 +47,7 @@
info.setUserAge(99);
info.setUserId("id123");
info.setUserName("中文姓名");
+ RpcContext.getContext().setAttachment("attachment_test_key","attachment_test_value");
return info;
}
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index e9bbc4d..05aea10 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -37,34 +37,14 @@
{error, Reason :: timeout|no_provider|any()}.
invoke_request(Interface, Request, RequestOption) ->
invoke_request(Interface, Request, RequestOption, self()).
-%% invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
{ok, reference()}|
{ok, reference(), Data :: any(), RpcContent :: list()}|
{error, Reason :: timeout|no_provider|request_full|any()}.
-invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
- case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of
- {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
- case dubbo_traffic_control:check_goon(HostFlag, 199) of
- ok ->
- Request2 = merge_attachments(Request, RpcContext),
- {ok, RequestData} = dubbo_codec:encode_request(Request2),
- Ref = get_ref(RequestState),
- gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
- case is_sync(RequestState) of
- true ->
- sync_receive(Ref, get_timeout(RequestState));
- false -> {ok, Ref}
- end;
- full ->
- {error, request_full}
- end;
- {error, none} ->
- logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
- {error, no_provider}
- end.
+invoke_request(Interface, Request, _RpcContext, _RequestState, CallBackPid) ->
+ invoke_request(Interface,Request,#{},CallBackPid).
invoke_request(Interface, Request, RequestOption, CallBackPid) ->
case dubbo_provider_consumer_reg_table:get_interface_info(Interface) of
@@ -73,11 +53,14 @@
#interface_info{protocol = Protocol, loadbalance = LoadBalance} ->
ReferenceConfig = #reference_config{sync = is_sync(RequestOption)},
Ref = get_ref(RequestOption),
+ RpcContext = get_ctx(RequestOption),
+ Attachments = merge_attachments(Request,RpcContext),
Invocation = Request#dubbo_request.data#dubbo_rpc_invocation{
loadbalance = LoadBalance,
call_ref = Ref,
reference_ops = ReferenceConfig,
- source_pid = CallBackPid
+ source_pid = CallBackPid,
+ attachments = Attachments
},
Result = dubbo_extension:invoke(filter, invoke, [Invocation], {ok, Ref}, [Protocol]),
Result
@@ -92,32 +75,12 @@
maps:is_key(sync, Option).
get_ref(Option) ->
maps:get(ref, Option, make_ref()).
+get_ctx(Option)->
+ maps:get(ctx, Option, []).
-get_timeout(Option) ->
- maps:get(timeout, Option, ?REQUEST_TIME_OUT).
-
-
-sync_receive(Ref, TimeOut) ->
- receive
- {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
- {ok, Ref, Response, RpcContent}
- after
- TimeOut ->
- {error, timeout}
- end.
-merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
- Request;
-merge_attachments(Request, Option) ->
- Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
- case lists:keyfind(attachments, 1, Option) of
- false -> OptionAttachments = [];
- {attachments, OptionAttachments} ->
- OptionAttachments
- end,
+merge_attachments(Request, OptionAttachments) ->
+ Attachments = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
List = [
- {<<"version">>, <<"0.0.0">>},
- {<<"timeout">>, <<"5000">>}
+ {<<"version">>, <<"0.0.0">>}
],
- Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
- Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
- Request#dubbo_request{data = Data2}.
+ lists:merge3(Attachments, OptionAttachments, List).
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
index 21470a5..f559e35 100644
--- a/src/dubbo_protocol_dubbo.erl
+++ b/src/dubbo_protocol_dubbo.erl
@@ -84,8 +84,6 @@
invoke(#dubbo_rpc_invocation{source_pid = CallBackPid, transport_pid = TransportPid, call_ref = Ref} = Invocation, Acc) ->
-
-%% Request2 = merge_attachments(Request, RpcContext), %% @todo need add rpc context to attachment
Request = dubbo_adapter:reference(Invocation),
{ok, RequestData} = dubbo_codec:encode_request(Request),
gen_server:cast(TransportPid, {send_request, Ref, Request, RequestData, CallBackPid, Invocation}),
@@ -128,7 +126,6 @@
logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
case Res#dubbo_response.is_event of
false ->
- %% @todo rpccontent need merge response with request
ResponseData = dubbo_type_transfer:response_to_native(Res),
dubbo_invoker:invoke_response(Invocation, ResponseData);
_ ->