fix: heartbeat send fix
diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl
index 318b793..9fd3019 100644
--- a/src/dubbo_netty_client.erl
+++ b/src/dubbo_netty_client.erl
@@ -30,7 +30,8 @@
-record(state, {provider_config,socket =undefined,
heartbeat=#heartbeat{},
recv_buffer= <<>> , %%从服务端接收的数据
- host_flag
+ host_flag,
+ reconnection_timer
}).
%%%===================================================================
@@ -78,7 +79,7 @@
NowStamp = time_util:timestamp_ms(),
HeartBeatInfo = #heartbeat{last_read = NowStamp,last_write = NowStamp},
logger:info("netty client start ~p",[HostFlag]),
-%% start_heartbeat_timer(HeartBeatInfo),
+ start_heartbeat_timer(HeartBeatInfo),
{ok, State#state{provider_config=ProviderConfig,heartbeat=HeartBeatInfo,host_flag = HostFlag}}.
%%--------------------------------------------------------------------
@@ -120,6 +121,7 @@
logger:debug("[send_request end] send data to provider consumer pid ~p state ok",[self()]),
State;
{error,closed}->
+ logger:warning("send request error, connection is closed"),
State2 = reconnect(State),
State2;
{error,R1}->
@@ -159,17 +161,17 @@
%% HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
{noreply, NewState#state{recv_buffer = NextBuffer}};
handle_info({tcp_closed,Port},State)->
+ logger:info("dubbo connection closed ~p",[Port]),
NewState=reconnect(State),
{noreply, NewState};
handle_info({timeout, _TimerRef, {reconnect}},State)->
- NewState=reconnect(State),
+ NewState=reconnect(State#state{reconnection_timer = undefined}),
{noreply, NewState};
handle_info({timeout, _TimerRef, {heartbeat_timer}},State) ->
{ok,NewState} = case check_heartbeat_state(State) of
{normal}-> {ok,State};
{send_heart}->
-%% @todo send_heartbeat_msg(undefined,true,State);
- {ok,State};
+ send_heartbeat_msg(undefined,true,State);
{reconnect} ->
%% @todo reconnect
{ok,State}
@@ -248,6 +250,8 @@
{error,Info}
end.
+reconnect(#state{reconnection_timer = Timer}=State) when Timer /= undefined ->
+ State;
reconnect(State)->
#provider_config{host = Host,port = Port} = State#state.provider_config,
case State#state.socket of
@@ -257,10 +261,12 @@
end,
case open(Host,Port) of
{ok,Socket2}->
+ logger:warning("reconnect to provider ~p ~p success",[Host,Port]),
State#state{socket = Socket2,recv_buffer = <<>>};
- {error,_Info}->
- erlang:start_timer(2000,self(),{reconnect}),
- State#state{socket = undefined}
+ {error,Reason}->
+ logger:warning("connect to provider error ~p",[Reason]),
+ TimerRef = erlang:start_timer(2000,self(),{reconnect}),
+ State#state{socket = undefined,reconnection_timer = TimerRef}
end.
send_msg(Msg,State) ->
@@ -310,7 +316,8 @@
ok ->
logger:info("send one heartbeat msg to server"),
State;
- {error,_Reason} ->
+ {error,Reason} ->
+ logger:warning("dubbo connection send heartbeat error ~p",[Reason]),
State2 = reconnect(State),
State2
end,
diff --git a/src/dubbo_provider_protocol.erl b/src/dubbo_provider_protocol.erl
index d787e12..eec62cb 100644
--- a/src/dubbo_provider_protocol.erl
+++ b/src/dubbo_provider_protocol.erl
@@ -28,7 +28,7 @@
-define(SERVER, ?MODULE).
--define(TIMEOUT, 5000).
+-define(IDLE_TIMEOUT, 70000).
-record(heartbeat,{last_write=0,last_read=0,timeout=50000,max_timeout=9000}).
@@ -54,13 +54,14 @@
init({Ref, Socket, Transport, _Opts = []}) ->
logger:info("provider connected"),
ok = ranch:accept_ack(Ref),
- ok = Transport:setopts(Socket, [{active, once}]),
+%% ok = Transport:setopts(Socket, [{active, once}]),
+ ok = Transport:setopts(Socket, [{active, true},{packet,0}]),
gen_server:enter_loop(?MODULE, [],
#state{socket=Socket, transport=Transport},
- ?TIMEOUT).
+ ?IDLE_TIMEOUT).
handle_info({tcp,_Port,Data}, #state{recv_buffer = RecvBuffer,socket=Socket, transport=Transport} = State) ->
- Transport:setopts(Socket, [{active, once}]),
+%% Transport:setopts(Socket, [{active, once}]),
NowBuffer = << RecvBuffer/binary,Data/binary >>,
{ok,NextBuffer,NewState} = case check_recv_data(NowBuffer,State) of
@@ -71,20 +72,16 @@
{ok,NextBuffer2,State3}
end,
%% HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat),
- {noreply, NewState#state{recv_buffer = NextBuffer}};
-handle_info({tcp_closed,Port},State)->
-%% NewState=reconnect(State),
- {noreply, State};
-%%handle_info({timeout, _TimerRef, {reconnect}},State)->
-%% NewState=reconnect(State),
-%% {noreply, NewState};
+ {noreply, NewState#state{recv_buffer = NextBuffer},?IDLE_TIMEOUT};
handle_info({tcp_closed, _Socket}, State) ->
+ logger:warning("provider socket is closed"),
{stop, normal, State};
handle_info({tcp_error, _, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
- {stop, normal, State};
+ logger:info("dubbo provider connection idle timeout"),
+ {stop, {shutdown,idle_timeout}, State};
handle_info(_Info, State) ->
{stop, normal, State}.
@@ -102,7 +99,8 @@
handle_cast(_Msg, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(Reason, _State) ->
+ logger:info("proviver connection terminal reason ~p",[Reason]),
ok.
code_change(_OldVsn, State, _Extra) ->
@@ -175,7 +173,7 @@
%% @doc process event
-spec process_response(IsEvent::boolean(),#dubbo_response{},#state{})->ok.
process_response(true,Response,State)->
-
+%%
{ok,State};
process_response(false,Response,State)->
@@ -192,7 +190,7 @@
process_request(true,Request,State)->
%% {ok,NewState} = send_heartbeat_msg(Request#dubbo_request.mid,State),
- logger:info("process request event ~p",[Request]),
+ logger:debug("process request event ~p",[Request]),
{ok,State};
process_request(false,Request,State)->
logger:info("process request ~p",[Request]),