blob: 805119c311aa9c91892db174187198ee27919c95 [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_provider_protocol).
-behaviour(gen_server).
-behaviour(ranch_protocol).
-include("dubboerl.hrl").
-include("dubbo.hrl").
%% API
-export([start_link/4, register_impl_provider/2, select_impl_provider/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-define(IDLE_TIMEOUT, 70000).
-record(heartbeat, {last_write = 0, last_read = 0, timeout = 50000, max_timeout = 9000}).
-record(state, {transport, provider_config, socket = undefined,
heartbeat = #heartbeat{},
recv_buffer = <<>> %%从客户端接收的数据
}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(Ref, Socket, Transport, Opts) ->
{ok, proc_lib:spawn_link(?MODULE, init, [{Ref, Socket, Transport, Opts}])}.
%% gen_server.
%% This function is never called. We only define it so that
%% we can use the -behaviour(gen_server) attribute.
%init([]) -> {ok, undefined}.
init({Ref, Socket, Transport, _Opts}) ->
{ok, {IP, Port}} = inet:peername(Socket),
logger:info("consumer ~p:~p connect the server", [IP, Port]),
ok = ranch:accept_ack(Ref),
ok = Transport:setopts(Socket, [{active, true}, {packet, 0}]),
gen_server:enter_loop(?MODULE, [],
#state{socket = Socket, transport = Transport},
?IDLE_TIMEOUT).
handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) ->
NowBuffer = <<RecvBuffer/binary, Data/binary>>,
{ok, NextBuffer, NewState} = case check_recv_data(NowBuffer, State) of
{next_buffer, <<>>, State2} ->
{ok, <<>>, State2};
{next_buffer, NextBuffer2, State3} ->
logger:debug("[INFO] recv one data state wait next_buffer"),
{ok, NextBuffer2, State3}
end,
{noreply, NewState#state{recv_buffer = NextBuffer}, ?IDLE_TIMEOUT};
handle_info({tcp_closed, _Socket}, State) ->
logger:warning("provider socket is closed"),
{stop, normal, State};
handle_info({tcp_error, _, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
logger:info("dubbo provider connection idle timeout"),
{stop, {shutdown, idle_timeout}, State};
handle_info(_Info, State) ->
{stop, normal, State}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({send_response, Data}, #state{socket = Socket} = State) ->
case gen_tcp:send(Socket, Data) of
ok ->
ok;
Other ->
logger:warning("response error ~p", [Other])
end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
terminate(Reason, _State) ->
logger:info("proviver connection terminal reason ~p", [Reason]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
register_impl_provider(Interface, ImplModuleName) ->
ets:insert(?PROVIDER_IMPL_TABLE, {Interface, ImplModuleName}),
ok.
-spec select_impl_provider(Interface :: binary()) -> {ok, binary()} | {error, term()}.
select_impl_provider(Interface) ->
case ets:lookup(?PROVIDER_IMPL_TABLE, Interface) of
[] ->
{error, no_provider};
[{Interface, ImplModuleName}] ->
{ok, ImplModuleName}
end.
%%%=================================================================
%%% 接收数据处理
%%%=================================================================
-spec check_recv_data(Data :: binary(), State :: #state{}) -> {ready, ReadyData :: binary()} | {ready, ReadyData :: binary(), NextBuffer :: binary()}.
check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, Rest/binary>> = Data, State) when byte_size(Rest) < 14 ->
{next_buffer, Data, State};
check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, _OtherFlag:80, DataLen:32, Rest/binary>> = Data, State) ->
RestSize = byte_size(Rest),
if
DataLen == RestSize ->
{ok, State2} = process_data(Data, State),
{next_buffer, <<>>, State2};
DataLen > RestSize ->
logger:warning("need wait next buffer data ~p", [Data]),
{next_buffer, Data, State};
DataLen < RestSize ->
<<ReadyData:DataLen/binary, NextBuffer/binary>> = Rest,
OneData = <<?DUBBO_MEGIC_HIGH:8, ?DUBBO_MEGIC_LOW:8, _OtherFlag:80, DataLen:32, ReadyData/binary>>,
{ok, State3} = process_data(OneData, State),
logger:warning("recevi more data ~w ", [NextBuffer]),
check_recv_data(NextBuffer, State3)
end;
check_recv_data(<<Error/integer, Data/binary>>, State) ->
logger:error("recv bad header data,Begin Byte:~p", [Error]),
check_recv_data(Data, State);
check_recv_data(<<>>, State) ->
{next_buffer, <<>>, State}.
process_data(Data, State) ->
<<Header:16/binary, RestData/binary>> = Data,
case dubbo_codec:decode_header(Header) of
{ok, request, RequestInfo} ->
{ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData),
logger:info("dubbo process one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]),
{ok, State2} = process_request(Req#dubbo_request.is_event, Req, State),
{ok, State2};
{ok, response, ResponseInfo} ->
{ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData),
logger:info("get one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]),
{ok, State3} = process_response(Res#dubbo_response.is_event, Res, State),
{ok, State3};
{error, Type, RelData} ->
logger:error("process_data error type ~p RelData ~p", [Type, RelData]),
{ok, State}
end.
%% @doc process event
-spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}) -> ok.
process_response(true, _Response, State) ->
{ok, State};
process_response(false, _Response, State) ->
{ok, State}.
process_request(true, Request, State) ->
{ok,NewState} = send_heartbeat_msg(Request#dubbo_request.mid,false,State),
logger:debug("process request event ~p", [Request]),
{ok, NewState};
process_request(false, Request, State) ->
logger:info("process request ~p", [Request]),
dubbo_provider_worker:process_request(Request, self()),
{ok, State}.
send_heartbeat_msg(Mid, NeedResponse, State) ->
{ok, Bin} = dubbo_heartbeat:generate_request(Mid, NeedResponse),
case send_msg(Bin, State#state.socket) of
ok ->
State;
{error, Reason} ->
logger:warning("dubbo connection send heartbeat error ~p", [Reason]),
State
end,
{ok, State}.
send_msg(_Msg, undefined) ->
{error, closed};
send_msg(Msg, Socket) ->
case gen_tcp:send(Socket, Msg) of
ok ->
ok;
{error, Reason} ->
logger:error("protocol socket send error,reason:~p", [Reason]),
{error, Reason}
end.