| %%------------------------------------------------------------------------------ |
| %% Licensed to the Apache Software Foundation (ASF) under one or more |
| %% contributor license agreements. See the NOTICE file distributed with |
| %% this work for additional information regarding copyright ownership. |
| %% The ASF licenses this file to You under the Apache License, Version 2.0 |
| %% (the "License"); you may not use this file except in compliance with |
| %% the License. You may obtain a copy of the License at |
| %% |
| %% http://www.apache.org/licenses/LICENSE-2.0 |
| %% |
| %% Unless required by applicable law or agreed to in writing, software |
| %% distributed under the License is distributed on an "AS IS" BASIS, |
| %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| %% See the License for the specific language governing permissions and |
| %% limitations under the License. |
| %%------------------------------------------------------------------------------ |
| -module(dubbo_netty_client). |
| |
| -behaviour(gen_server). |
| |
| -include("dubbo.hrl"). |
| %% API |
| -export([start_link/4]). |
| |
| %% gen_server callbacks |
| -export([init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3]). |
| -export([check_recv_data/2]). |
| |
| -define(SERVER, ?MODULE). |
| |
| -record(heartbeat, {last_write = 0, last_read = 0, timeout = 60000, max_timeout = 180000}). |
| -record(state, {provider_config, socket = undefined, |
| heartbeat = #heartbeat{}, |
| recv_buffer = <<>>, %%从服务端接收的数据 |
| host_flag, |
| reconnection_timer |
| }). |
| |
| %%%=================================================================== |
| %%% API |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @doc |
| %% Starts the server |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) -> |
| {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). |
| start_link(Name, HostFlag, ProviderConfig, Index) -> |
| gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []). |
| |
| %%%=================================================================== |
| %%% gen_server callbacks |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Initializes the server |
| %% |
| %% @spec init(Args) -> {ok, State} | |
| %% {ok, State, Timeout} | |
| %% ignore | |
| %% {stop, Reason} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(init(Args :: term()) -> |
| {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term()} | ignore). |
| init([HostFlag, ProviderConfig, Index]) -> |
| erlang:process_flag(min_bin_vheap_size, 1024 * 1024), |
| #provider_config{host = Host, port = Port} = ProviderConfig, |
| State = case open(Host, Port) of |
| {ok, Socket} -> |
| #state{socket = Socket}; |
| {error, _Reason} -> |
| #state{} |
| end, |
| NowStamp = dubbo_time_util:timestamp_ms(), |
| HeartBeatInfo = #heartbeat{last_read = NowStamp, last_write = NowStamp}, |
| logger:info("netty client start ~p", [HostFlag]), |
| start_heartbeat_timer(HeartBeatInfo), |
| {ok, State#state{provider_config = ProviderConfig, heartbeat = HeartBeatInfo, host_flag = HostFlag}}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling call messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, |
| State :: #state{}) -> |
| {reply, Reply :: term(), NewState :: #state{}} | |
| {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| handle_call(_Request, _From, State) -> |
| {reply, ok, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling cast messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_cast(Request :: term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| |
| handle_cast({send_request, Ref, Request, Data, SourcePid, RequestState}, State) -> |
| logger:debug("[send_request begin] send data to provider consumer mid ~p pid ~p sourcePid ~p", [Request#dubbo_request.mid, self(), SourcePid]), |
| NewState = case send_msg(Data, State) of |
| ok -> |
| save_request_info(Request, SourcePid, Ref, RequestState), |
| logger:debug("[send_request end] send data to provider consumer pid ~p state ok", [self()]), |
| State; |
| {error, closed} -> |
| logger:warning("send request error, connection is closed"), |
| State2 = reconnect(State), |
| State2; |
| {error, R1} -> |
| logger:error("[send_request end] send data to provider consumer pid error ~p ~p", [self(), R1]), |
| State |
| end, |
| HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat), |
| {noreply, NewState#state{heartbeat = HeartbeatInfo}}; |
| |
| handle_cast(_Request, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling all non call/cast messages |
| %% |
| %% @spec handle_info(Info, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| |
| |
| handle_info({tcp, _Port, Data}, #state{recv_buffer = RecvBuffer} = State) -> |
| %% inet:setopts(State#state.socket, [{active, once}]), |
| %% logger:debug("[INFO] recv one data ~w",[Data]), |
| {ok, NextBuffer, NewState} = case check_recv_data(<<RecvBuffer/binary, Data/binary>>, State) of |
| {next_buffer, NextBuffer2, State3} -> |
| logger:debug("[INFO] recv one data state wait next_buffer"), |
| {ok, NextBuffer2, State3} |
| end, |
| %% HeartbeatInfo =update_heartbeat(write,NewState#state.heartbeat), |
| {noreply, NewState#state{recv_buffer = NextBuffer}}; |
| handle_info({tcp_closed, Port}, State) -> |
| logger:info("dubbo connection closed ~p", [Port]), |
| NewState = reconnect(State), |
| {noreply, NewState}; |
| handle_info({timeout, _TimerRef, {reconnect}}, State) -> |
| NewState = reconnect(State#state{reconnection_timer = undefined}), |
| {noreply, NewState}; |
| handle_info({timeout, _TimerRef, {heartbeat_timer}}, State) -> |
| {ok, NewState} = case check_heartbeat_state(State) of |
| {normal} -> {ok, State}; |
| {send_heart} -> |
| send_heartbeat_msg(undefined, true, State); |
| {reconnect} -> |
| %% @todo reconnect |
| {ok, State} |
| end, |
| HeartbeatInfo = update_heartbeat(write, NewState#state.heartbeat), |
| start_heartbeat_timer(HeartbeatInfo), |
| {noreply, NewState#state{heartbeat = HeartbeatInfo}}; |
| handle_info(_Info, State) -> |
| logger:warning("[INFO] get one info:~p", [_Info]), |
| %% inet:setopts(State#state.socket, [{active, once}]), |
| %% case State#state.tmp_pid of |
| %% undefined ->ok; |
| %% Pid -> |
| %% gen_server:cast(Pid,{msg_back}) |
| %% end, |
| HeartbeatInfo = update_heartbeat(write, State#state.heartbeat), |
| {noreply, State#state{heartbeat = HeartbeatInfo}}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% This function is called by a gen_server when it is about to |
| %% terminate. It should be the opposite of Module:init/1 and do any |
| %% necessary cleaning up. When it returns, the gen_server terminates |
| %% with Reason. The return value is ignored. |
| %% |
| %% @spec terminate(Reason, State) -> void() |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), |
| State :: #state{}) -> term()). |
| terminate(_Reason, _State) -> |
| logger:warning("terminate reason:~p", [_Reason]), |
| ok. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Convert process state when code is changed |
| %% |
| %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, |
| Extra :: term()) -> |
| {ok, NewState :: #state{}} | {error, Reason :: term()}). |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| %%%=================================================================== |
| %%% Internal functions |
| %%%=================================================================== |
| |
| open(Host, Port) -> |
| logger:debug("will connect to provider ~p ~p", [Host, Port]), |
| % |
| case gen_tcp:connect(Host, Port, [ |
| binary, |
| {packet, 0}, {active, false}, |
| {reuseaddr, true}, |
| {delay_send, true}, |
| {nodelay, true}, |
| {high_watermark, 512 * 1024}, |
| {low_watermark, 256 * 1024}, |
| {sndbuf, 512 * 1024}, |
| {recbuf, 512 * 1024} |
| ]) of |
| {ok, Sockets} -> |
| inet:setopts(Sockets, [{active, true}]), |
| {ok, Sockets}; |
| Info -> |
| logger:error("start client connection error ~p", [Info]), |
| {error, Info} |
| end. |
| |
| reconnect(#state{reconnection_timer = Timer} = State) when Timer /= undefined -> |
| State; |
| reconnect(State) -> |
| #provider_config{host = Host, port = Port} = State#state.provider_config, |
| case State#state.socket of |
| undefined -> ok; |
| Socket -> |
| gen_tcp:close(Socket) |
| end, |
| case open(Host, Port) of |
| {ok, Socket2} -> |
| logger:warning("reconnect to provider ~p ~p success", [Host, Port]), |
| State#state{socket = Socket2, recv_buffer = <<>>}; |
| {error, Reason} -> |
| logger:warning("connect to provider error ~p", [Reason]), |
| TimerRef = erlang:start_timer(2000, self(), {reconnect}), |
| State#state{socket = undefined, reconnection_timer = TimerRef} |
| end. |
| |
| send_msg(Msg, State) -> |
| case State#state.socket of |
| undefined -> |
| {error, closed}; |
| Socket -> |
| case gen_tcp:send(Socket, Msg) of |
| ok -> |
| ok; |
| {error, Reason} -> |
| logger:error("send to server error,reason:~p", [Reason]), |
| {error, Reason} |
| end |
| end. |
| |
| %%%================================================================= |
| %%% 心跳检测 |
| %%%================================================================= |
| start_heartbeat_timer(HeartbeatInfo) -> |
| erlang:start_timer(HeartbeatInfo#heartbeat.timeout, self(), {heartbeat_timer}), |
| ok. |
| update_heartbeat(write, Info) -> |
| Info#heartbeat{last_write = dubbo_time_util:timestamp_ms()}; |
| update_heartbeat(read, Info) -> |
| Info#heartbeat{last_read = dubbo_time_util:timestamp_ms()}. |
| |
| |
| check_heartbeat_state(#state{heartbeat = HeartBeatInfo} = _State) -> |
| Now = dubbo_time_util:timestamp_ms(), |
| #heartbeat{last_read = LastRead, last_write = LastWrite, timeout = Timeout, max_timeout = MaxTimeout} = HeartBeatInfo, |
| if |
| (Now - LastRead) > Timeout -> |
| {send_heart}; |
| (Now - LastWrite) > Timeout -> |
| {send_heart}; |
| (Now - LastRead) > MaxTimeout -> |
| {reconnect}; |
| true -> |
| {normal} |
| end. |
| |
| |
| send_heartbeat_msg(Mid, NeedResponse, State) -> |
| {ok, Bin} = dubbo_heartbeat:generate_request(Mid, NeedResponse), |
| NewState = case send_msg(Bin, State) of |
| ok -> |
| logger:info("send one heartbeat to server"), |
| State; |
| {error, Reason} -> |
| logger:warning("dubbo connection send heartbeat error ~p", [Reason]), |
| State2 = reconnect(State), |
| State2 |
| end, |
| {ok, NewState}. |
| |
| %%%================================================================= |
| %%% 接收数据处理 |
| %%%================================================================= |
| -spec check_recv_data(Data :: binary(), State :: #state{}) -> {ready, ReadyData :: binary()} | {ready, ReadyData :: binary(), NextBuffer :: binary()}. |
| check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, Rest/binary>> = Data, State) when byte_size(Rest) < 14 -> |
| {next_buffer, Data, State}; |
| check_recv_data(<<?DUBBO_MEGIC_HIGH, ?DUBBO_MEGIC_LOW, _OtherFlag:80, DataLen:32, Rest/binary>> = Data, State) -> |
| RestSize = byte_size(Rest), |
| if |
| DataLen == RestSize -> |
| {ok, State2} = process_data(Data, State), |
| {next_buffer, <<>>, State2}; |
| DataLen > RestSize -> |
| logger:warning("need wait next buffer data ~p", [Data]), |
| {next_buffer, Data, State}; |
| DataLen < RestSize -> |
| <<ReadyData:DataLen/binary, NextBuffer/binary>> = Rest, |
| OneData = <<?DUBBO_MEGIC_HIGH:8, ?DUBBO_MEGIC_LOW:8, _OtherFlag:80, DataLen:32, ReadyData/binary>>, |
| {ok, State3} = process_data(OneData, State), |
| %% logger:warning("recevi more data ~w ",[NextBuffer]), |
| check_recv_data(NextBuffer, State3) |
| end; |
| check_recv_data(<<Error/integer, Data/binary>>, State) -> |
| logger:error("recv bad header data,Begin Byte:~p", [Error]), |
| check_recv_data(Data, State); |
| check_recv_data(<<>>, State) -> |
| {next_buffer, <<>>, State}. |
| |
| |
| process_data(Data, State) -> |
| <<Header:16/binary, RestData/binary>> = Data, |
| case dubbo_codec:decode_header(Header) of |
| {ok, response, ResponseInfo} -> |
| process_response(ResponseInfo#dubbo_response.is_event, ResponseInfo, RestData, State), |
| %% dubbo_traffic_control:decr_count(State#state.host_flag), |
| %% case get_earse_request_info(ResponseInfo#dubbo_response.mid) of |
| %% undefined-> |
| %% logger:error("dubbo response can't find request data,response ~p",[ResponseInfo]); |
| %% {SourcePid,Ref,_RequestState} -> |
| %% {ok,Res} = dubbo_codec:decode_response(ResponseInfo,RestData), |
| %% |
| %% logger:info("got one response mid ~p, is_event ~p state ~p",[Res#dubbo_response.mid,Res#dubbo_response.is_event,Res#dubbo_response.state]), |
| %% case Res#dubbo_response.is_event of |
| %% false -> |
| %% %% todo rpccontent need merge response with request |
| %% RpcContent=[], |
| %% ResponseData = dubbo_type_transfer:response_to_native(Res), |
| %% gen_server:cast(SourcePid,{response_process,Ref,RpcContent,ResponseData}); |
| %% _-> |
| %% ok |
| %% end |
| %% end, |
| {ok, State}; |
| {ok, request, RequestInfo} -> |
| {ok, Req} = dubbo_codec:decode_request(RequestInfo, RestData), |
| logger:info("get one request mid ~p, is_event ~p", [Req#dubbo_request.mid, Req#dubbo_request.is_event]), |
| {ok, State2} = process_request(Req#dubbo_request.is_event, Req, State), |
| {ok, State2}; |
| {error, Type, RelData} -> |
| logger:error("process_data error type ~p RelData ~p", [Type, RelData]), |
| {ok, State} |
| end. |
| |
| |
| %% @doc process event |
| -spec process_response(IsEvent :: boolean(), #dubbo_response{}, #state{}, term()) -> ok. |
| process_response(false, ResponseInfo, RestData, State) -> |
| dubbo_traffic_control:decr_count(State#state.host_flag), |
| case get_earse_request_info(ResponseInfo#dubbo_response.mid) of |
| undefined -> |
| logger:error("dubbo response can't find request data,response ~p", [ResponseInfo]); |
| {SourcePid, Ref, _RequestState} -> |
| {ok, Res} = dubbo_codec:decode_response(ResponseInfo, RestData), |
| logger:info("got one response mid ~p, is_event ~p state ~p", [Res#dubbo_response.mid, Res#dubbo_response.is_event, Res#dubbo_response.state]), |
| case Res#dubbo_response.is_event of |
| false -> |
| %% todo rpccontent need merge response with request |
| RpcContent = [], |
| ResponseData = dubbo_type_transfer:response_to_native(Res), |
| gen_server:cast(SourcePid, {response_process, Ref, RpcContent, ResponseData}); |
| _ -> |
| ok |
| end |
| end, |
| {ok, State}; |
| process_response(true, _ResponseInfo, _RestData, State) -> |
| {ok, State}. |
| |
| process_request(true, #dubbo_request{data = <<"R">>}, State) -> |
| {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true), |
| {ok, State}; |
| process_request(true, Request, State) -> |
| {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State), |
| {ok, NewState}; |
| process_request(false, Request, State) -> |
| {ok, State}. |
| |
| |
| save_request_info(Request, SourcePid, Ref, RequestState) -> |
| put(Request#dubbo_request.mid, {SourcePid, Ref, RequestState}). |
| get_earse_request_info(Mid) -> |
| erase(Mid). |