blob: 25120a74516cfae663bf189196a0aea2c36b7218 [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_provider_worker).
-behaviour(gen_server).
%% API
-export([start_link/1, process_request/2]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-include("dubbo.hrl").
-include("dubboerl.hrl").
-include("dubbo_type.hrl").
-record(heartbeat, {last_write = 0, last_read = 0, timeout = 50000, max_timeout = 9000}).
-record(state, {provider_config, socket = undefined,
heartbeat = #heartbeat{},
recv_buffer = <<>> %%从客户端接收的数据
}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @end
%%--------------------------------------------------------------------
-spec(start_link(term()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init(Args) ->
process_flag(trap_exit, true),
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({request, Request, SourcePid}, State) ->
%% #dubbo_request{mid=Mid} = Request,
%% Data = #databaseOperateResponse{databaseOperateRsp = "ha-ha"},
%% Data2 =#dubbo_rpc_invocation{parameters = [Data]},
%% {ok,Content }= de_codec:encode_response(#dubbo_response{mid=Mid,is_event = false,data= Data2}),
{ok, Content} = invoker_implement(Request),
gen_server:cast(SourcePid, {send_response, Content}),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
process_request(Request, SourcePid) ->
Worker = poolboy:checkout(?PROVIDER_WORKER),
try
gen_server:cast(Worker, {request, Request, SourcePid})
after
ok = poolboy:checkin(?PROVIDER_WORKER, Worker)
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec invoker_implement(#dubbo_request{}) -> {ok, response_content()}.
invoker_implement(Request) ->
#dubbo_rpc_invocation{className = Interface, methodName = MethodName, parameters = Parameters} = Request#dubbo_request.data,
case dubbo_provider_protocol:select_impl_provider(Interface) of
{ok, ImplModule} ->
case apply(ImplModule, binary_to_atom(MethodName, latin1), Parameters) of
{error} ->
ok;
#dubbo_rpc_invocation{} = ResultInvoca ->
#dubbo_request{mid = Mid} = Request,
{ok, Content} =
dubbo_codec:encode_response(#dubbo_response{
serialize_type = Request#dubbo_request.serialize_type,
mid = Mid,
is_event = false,
data = ResultInvoca}),
{ok, Content};
ResultObj ->
%% Data = #databaseOperateResponse{databaseOperateRsp = "ha-ha"},
#dubbo_request{mid = Mid} = Request,
Data2 = #dubbo_rpc_invocation{parameters = [ResultObj]},
{ok, Content} = dubbo_codec:encode_response(#dubbo_response{serialize_type = Request#dubbo_request.serialize_type, mid = Mid, is_event = false, data = Data2}),
{ok, Content}
end;
{error, Reason} ->
{error, Reason}
end.