blob: 05aea10d7081be9a3f0b126ef9b24e8f476e8f0c [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_invoker).
-include("dubbo.hrl").
%% API
-export([]).
%% API
-export([invoke_request/2, invoke_request/3, invoke_request/5, invoke_response/2]).
-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
{ok, reference()}|
{ok, reference(), Data :: any(), RpcContent :: list()}|
{error, Reason :: timeout|no_provider|any()}.
invoke_request(Interface, Request) ->
invoke_request(Interface, Request, [], #{}, self()).
-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
{ok, reference()}|
{ok, reference(), Data :: any(), RpcContent :: list()}|
{error, Reason :: timeout|no_provider|any()}.
invoke_request(Interface, Request, RequestOption) ->
invoke_request(Interface, Request, RequestOption, self()).
-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
{ok, reference()}|
{ok, reference(), Data :: any(), RpcContent :: list()}|
{error, Reason :: timeout|no_provider|request_full|any()}.
invoke_request(Interface, Request, _RpcContext, _RequestState, CallBackPid) ->
invoke_request(Interface,Request,#{},CallBackPid).
invoke_request(Interface, Request, RequestOption, CallBackPid) ->
case dubbo_provider_consumer_reg_table:get_interface_info(Interface) of
undefined ->
{error, no_provider};
#interface_info{protocol = Protocol, loadbalance = LoadBalance} ->
ReferenceConfig = #reference_config{sync = is_sync(RequestOption)},
Ref = get_ref(RequestOption),
RpcContext = get_ctx(RequestOption),
Attachments = merge_attachments(Request,RpcContext),
Invocation = Request#dubbo_request.data#dubbo_rpc_invocation{
loadbalance = LoadBalance,
call_ref = Ref,
reference_ops = ReferenceConfig,
source_pid = CallBackPid,
attachments = Attachments
},
Result = dubbo_extension:invoke(filter, invoke, [Invocation], {ok, Ref}, [Protocol]),
Result
end.
invoke_response(Invocation, Result) ->
Result2 = dubbo_extension:invoke_foldr(filter, on_response, [Invocation], Result),
gen_server:cast(Invocation#dubbo_rpc_invocation.source_pid, {response_process, Invocation#dubbo_rpc_invocation.call_ref, Invocation#dubbo_rpc_invocation.attachments, Result2}),
ok.
is_sync(Option) ->
maps:is_key(sync, Option).
get_ref(Option) ->
maps:get(ref, Option, make_ref()).
get_ctx(Option)->
maps:get(ctx, Option, []).
merge_attachments(Request, OptionAttachments) ->
Attachments = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
List = [
{<<"version">>, <<"0.0.0">>}
],
lists:merge3(Attachments, OptionAttachments, List).