blob: 84a95d620dc7c47b24db446713e137c6c0591419 [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_zookeeper).
-behaviour(gen_server).
-include("dubbo.hrl").
%% API
-export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {zk_pid}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @end
%%--------------------------------------------------------------------
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, Pid} = connection(),
{ok, #state{zk_pid = Pid}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({add_consumer, Consumer}, _From, State) ->
add_consumer(Consumer, State),
{reply, ok, State};
handle_call({add_provider, Provider}, _From, State) ->
register_provider_path(Provider, State),
{reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
get_provider_and_start(Pid, Interface, Path),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State) ->
logger:info("zk server recv msg:~p", [_Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
register_consumer(Consumer) ->
gen_server:call(?SERVER, {add_consumer, Consumer}),
ok.
register_consumer(Name, Option) ->
Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
register_consumer(Consumer),
ok.
register_provider(Provider) ->
gen_server:call(?SERVER, {add_provider, Provider}),
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
connection() ->
{ok, List} = application:get_env(dubboerl, zookeeper_list),
{ok, Pid} = erlzk:connect(List, 30000, [
{chroot, "/"},
{monitor, self()}]),
{ok, Pid}.
add_consumer(Consumer, State) ->
Pid = State#state.zk_pid,
%% InterfacePath= << <<"/dubbo/">>/binary,Name/binary ,<<"consumers">>/binary >>,
ConsumerNode = gen_consumer_node_info(Consumer),
ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerNode))),
check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Consumer#consumer_config.interface, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
get_provider_list(Consumer, State),
ok.
register_provider_path(Provider, State) ->
#state{zk_pid = Pid} = State,
ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
ok.
get_provider_list(Consumer, State) ->
Pid = State#state.zk_pid,
InterfacePath = <<<<"/dubbo/">>/binary, (Consumer#consumer_config.interface)/binary, <<"/providers">>/binary>>,
get_provider_and_start(Pid, Consumer#consumer_config.interface, InterfacePath),
ok.
get_provider_and_start(Pid, Interface, Path) ->
case erlzk:get_children(Pid, Path, spawn(dubbo_zookeeper, provider_watcher, [Interface])) of
{ok, ChildList} ->
logger:debug("get provider list ~p", [ChildList]),
start_provider_process(Interface, ChildList),
ok;
{error, R1} ->
logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
ok
end.
provider_watcher(Interface) ->
receive
{node_children_changed, Path} ->
gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
{Event, Path} ->
%% Path = "/a",
%% Event = node_created
logger:debug("provider_watcher get event ~p ~p", [Event, Path])
end,
ok.
create_path(Pid, Path, CreateType) ->
case erlzk:create(Pid, Path, CreateType) of
{ok, ActualPath} ->
logger:debug("[add_consumer] create zk path success ~p", [ActualPath]),
ok;
{error, R1} ->
logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
end,
ok.
check_and_create_path(_Pid, _RootPath, []) ->
ok;
check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
case erlzk:exists(Pid, CheckPath) of
{ok, Stat} ->
check_and_create_path(Pid, CheckPath, Rst);
{error, no_node} ->
logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
create_path(Pid, CheckPath, CreateType),
check_and_create_path(Pid, CheckPath, Rst);
{error, R1} ->
logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
check_and_create_path(Pid, CheckPath, Rst)
end.
gen_consumer_node_info(Consumer) ->
%% revision参数字段的作用是什么? 暂时不添加
Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s&timestamp=~p">>,
[dubbo_common_fun:local_ip_v4_str(),
Consumer#consumer_config.interface,
Consumer#consumer_config.application,
Consumer#consumer_config.category,
Consumer#consumer_config.check,
Consumer#consumer_config.default_timeout,
Consumer#consumer_config.dubbo_version,
Consumer#consumer_config.interface,
Methods,
Consumer#consumer_config.side,
dubbo_time_util:timestamp_ms()
]),
list_to_binary(Value).
%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
start_provider_process(Interface, ProviderList) ->
dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList).