| %%%------------------------------------------------------------------- |
| %%% @author dlive |
| %%% @copyright (C) 2018, <COMPANY> |
| %%% @doc |
| %%% |
| %%% @end |
| %%% Created : 20. Mar 2018 7:19 PM |
| %%%------------------------------------------------------------------- |
| -module(dubbo_provider_protocol). |
| -author("dlive"). |
| |
| -behaviour(gen_server). |
| -behaviour(ranch_protocol). |
| -include("dubboerl.hrl"). |
| -include("dubbo.hrl"). |
| |
| |
| %% API |
| -export([start_link/4,register_impl_provider/3, select_impl_provider/1]). |
| |
| %% gen_server callbacks |
| -export([init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3]). |
| |
| -define(SERVER, ?MODULE). |
| |
| -define(TIMEOUT, 5000). |
| |
| -record(heartbeat,{last_write=0,last_read=0,timeout=50000,max_timeout=9000}). |
| |
| -record(state, {transport,provider_config,socket =undefined, |
| heartbeat=#heartbeat{}, |
| recv_buffer= <<>> %%从客户端接收的数据 |
| }). |
| |
| %%%=================================================================== |
| %%% API |
| %%%=================================================================== |
| |
| |
| start_link(Ref, Socket, Transport, Opts) -> |
| {ok, proc_lib:spawn_link(?MODULE, init, [{Ref, Socket, Transport, Opts}])}. |
| |
| %% gen_server. |
| |
| %% This function is never called. We only define it so that |
| %% we can use the -behaviour(gen_server) attribute. |
| %init([]) -> {ok, undefined}. |
| |
| init({Ref, Socket, Transport, _Opts = []}) -> |
| lager:info("provider connected"), |
| ok = ranch:accept_ack(Ref), |
| ok = Transport:setopts(Socket, [{active, once}]), |
| gen_server:enter_loop(?MODULE, [], |
| #state{socket=Socket, transport=Transport}, |
| ?TIMEOUT). |
| |
| handle_info({tcp,_Port,Data}, #state{recv_buffer = RecvBuffer,socket=Socket, transport=Transport} = State) -> |
| Transport:setopts(Socket, [{active, once}]), |
| NowBuffer = << RecvBuffer/binary,Data/binary >>, |
| |
| {ok,NextBuffer,NewState} = case check_recv_data(NowBuffer,State) of |
| {next_buffer,<<>>,State2} -> |
| {ok,<<>>,State2}; |
| {next_buffer,NextBuffer2,State3}-> |
| lager:debug("[INFO] recv one data state wait next_buffer"), |
| {ok,NextBuffer2,State3} |
| end, |
| %% HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat), |
| {noreply, NewState#state{recv_buffer = NextBuffer}}; |
| handle_info({tcp_closed,Port},State)-> |
| %% NewState=reconnect(State), |
| {noreply, State}; |
| %%handle_info({timeout, _TimerRef, {reconnect}},State)-> |
| %% NewState=reconnect(State), |
| %% {noreply, NewState}; |
| |
| handle_info({tcp_closed, _Socket}, State) -> |
| {stop, normal, State}; |
| handle_info({tcp_error, _, Reason}, State) -> |
| {stop, Reason, State}; |
| handle_info(timeout, State) -> |
| {stop, normal, State}; |
| handle_info(_Info, State) -> |
| {stop, normal, State}. |
| |
| handle_call(_Request, _From, State) -> |
| {reply, ok, State}. |
| |
| handle_cast({send_response,Data}, #state{socket = Socket} = State) -> |
| case gen_tcp:send(Socket,Data) of |
| ok -> |
| ok; |
| Other -> |
| lager:warning("response error ~p",[Other]) |
| end, |
| {noreply, State}; |
| handle_cast(_Msg, State) -> |
| {noreply, State}. |
| |
| terminate(_Reason, _State) -> |
| ok. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| register_impl_provider(Interface,ImplModuleName,ModuleName)-> |
| ets:insert(?PROVIDER_IMPL_TABLE,{Interface,ImplModuleName,ModuleName}), |
| ok. |
| |
| -spec select_impl_provider(Interface::binary()) ->{ok,binary()} | {error,term()}. |
| select_impl_provider(Interface) -> |
| case ets:lookup(?PROVIDER_IMPL_TABLE,Interface) of |
| []-> |
| {error,no_provider}; |
| [{Interface,ImplModuleName,ModuleName}] -> |
| {ok,ImplModuleName} |
| end. |
| |
| |
| |
| %%%================================================================= |
| %%% 接收数据处理 |
| %%%================================================================= |
| -spec check_recv_data(Data::binary(),State::#state{})->{ready,ReadyData::binary()} | {ready,ReadyData::binary(),NextBuffer::binary()}. |
| check_recv_data(<<?DUBBO_MEGIC_HIGH,?DUBBO_MEGIC_LOW,Rest/binary>> = Data,State) when byte_size(Rest)<14 -> |
| {next_buffer,Data,State}; |
| check_recv_data(<<?DUBBO_MEGIC_HIGH,?DUBBO_MEGIC_LOW,_OtherFlag:80,DataLen:32,Rest/binary>> = Data,State) -> |
| RestSize = byte_size(Rest), |
| if |
| DataLen==RestSize -> |
| {ok,State2} = process_data(Data,State), |
| {next_buffer,<<>>,State2}; |
| DataLen>RestSize -> |
| lager:warning("need wait next buffer data ~p",[Data]), |
| {next_buffer,Data,State}; |
| DataLen<RestSize -> |
| <<ReadyData:DataLen/binary,NextBuffer/binary>> = Rest, |
| OneData = <<?DUBBO_MEGIC_HIGH:8,?DUBBO_MEGIC_LOW:8,_OtherFlag:80,DataLen:32,ReadyData/binary>>, |
| {ok,State3} = process_data(OneData,State), |
| lager:warning("recevi more data ~w ",[NextBuffer]), |
| check_recv_data(NextBuffer,State3) |
| end; |
| check_recv_data(<<Error/integer,Data/binary>>,State)-> |
| lager:error("recv bad header data,Begin Byte:~p",[Error]), |
| check_recv_data(Data,State); |
| check_recv_data(<<>>,State)-> |
| {next_buffer,<<>>,State}. |
| |
| |
| process_data(Data,State)-> |
| <<Header:16/binary,RestData/binary>> = Data, |
| case de_codec:decode_header(Header) of |
| {ok,request,RequestInfo}-> |
| {ok,Req} = de_codec:decode_request(RequestInfo,RestData), |
| lager:info("get one request mid ~p, is_event ~p",[Req#dubbo_request.mid,Req#dubbo_request.is_event]), |
| {ok,State2} = process_request(Req#dubbo_request.is_event,Req,State), |
| {ok,State2}; |
| {ok,response,ResponseInfo}-> |
| {ok,Res} = de_codec:decode_response(ResponseInfo,RestData), |
| lager:info("get one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]), |
| {ok,State3} =process_response(Res#dubbo_response.is_event,Res,State), |
| {ok,State3}; |
| {error,Type,RelData}-> |
| lager:error("process_data error type ~p RelData ~p",[Type,RelData]), |
| {ok,State} |
| end. |
| |
| |
| %% @doc process event |
| -spec process_response(IsEvent::boolean(),#dubbo_response{},#state{})->ok. |
| process_response(true,Response,State)-> |
| |
| {ok,State}; |
| |
| process_response(false,Response,State)-> |
| %% case get_earse_request_info(Response#dubbo_response.mid) of |
| %% undefined-> |
| %% lager:error("dubbo response can't find request data,response ~p",[Response]); |
| %% {SourcePid,Ref,Request} -> |
| %% lager:debug("will cast mid ~p to source process SourcePid ~p",[Response#dubbo_response.mid,SourcePid]), |
| %% RpcContent=[], |
| %% ResponseData = de_type_transfer:response_to_native(Response), |
| %% gen_server:cast(SourcePid,{msg_back,Ref,ResponseData,RpcContent}) |
| %% end, |
| {ok,State}. |
| |
| process_request(true,Request,State)-> |
| %% {ok,NewState} = send_heartbeat_msg(Request#dubbo_request.mid,State), |
| lager:info("process request event ~p",[Request]), |
| {ok,State}; |
| process_request(false,Request,State)-> |
| lager:info("process request ~p",[Request]), |
| dubbo_provider_worker:process_request(Request,self()), |
| {ok,State}. |