blob: 915bb71ef2f3207b560f71250798796087b12cba [file] [log] [blame]
%%------------------------------------------------------------------------------
%% Licensed to the Apache Software Foundation (ASF) under one or more
%% contributor license agreements. See the NOTICE file distributed with
%% this work for additional information regarding copyright ownership.
%% The ASF licenses this file to You under the Apache License, Version 2.0
%% (the "License"); you may not use this file except in compliance with
%% the License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-module(dubbo_registry_zookeeper).
-behaviour(gen_server).
-behaviour(dubbo_registry).
-include("dubbo.hrl").
-include("dubboerl.hrl").
%% API
-export([start_link/0, register_provider/1, provider_watcher/1]).
-export([start/1, register/1, unregister/1, subscribe/2]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {zk_pid, provider_notify_fun}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @end
%%--------------------------------------------------------------------
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, Pid} = connection(),
{ok, #state{zk_pid = Pid}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({do_register, Url}, _From, State) ->
do_register(State#state.zk_pid, Url),
{reply, ok, State};
handle_call({do_unregister, Url}, _From, State) ->
do_unregister(State#state.zk_pid, Url),
{reply, ok, State};
handle_call({subscribe_provider, InterfaceName, NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
logger:debug("subscribe provider ~p notify fun ~p",[InterfaceName,NotifyFun]),
NewState = State#state{provider_notify_fun = NotifyFun},
List = get_provider_list(InterfaceName, ZkPid),
notify_provider_change(NotifyFun, InterfaceName, List),
{reply, ok, NewState};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @end
%%--------------------------------------------------------------------
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid, provider_notify_fun = NotifyFun} = State) ->
ProviderList = get_provider_and_start(Pid, Interface, Path),
notify_provider_change(NotifyFun, Interface, ProviderList),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------
%% dubbo_registry
%%----------------------------------------------
start(Url) ->
ok.
%%register(Url) ->
%% {ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
%% InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
%% register(UrlInfo#dubbo_url.scheme, InterfaceName, Url),
%% ok.
%%register(<<"consumer">>, InterfaceName, Url) ->
%% gen_server:call(?SERVER, {add_consumer, InterfaceName, Url}),
%% ok.
register(Url) ->
gen_server:call(?SERVER, {do_register, Url}, 10000),
ok.
unregister(Url) ->
gen_server:call(?SERVER, {do_unregister, Url}, 10000),
ok.
do_register(Pid, Url) ->
case dubbo_common_fun:parse_url(Url) of
{ok, UrlInfo} ->
CreateNodeList = [{get_register_node(Item, UrlInfo), p} || Item <- [root, service, category]],
UrlNode = {list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))), get_dynamic(UrlInfo)},
CreateNodeList2 = CreateNodeList ++ [UrlNode],
RetFullNode = check_and_create_path(Pid, <<"">>, CreateNodeList2),
{ok, RetFullNode};
Reason ->
logger:error("zk parse url fail reason ~p", [Reason]),
{error, Reason}
end.
do_unregister(Pid, Url) ->
case dubbo_common_fun:parse_url(Url) of
{ok, UrlInfo} ->
CreateNodeList = [get_register_node(Item, UrlInfo) || Item <- [root, service, category]],
UrlNode = list_to_binary(edoc_lib:escape_uri(binary_to_list(Url))),
CreateNodeList2 = CreateNodeList ++ [UrlNode],
Path = dubbo_common_fun:binary_list_join(CreateNodeList2, <<"/">>),
FullPath = <<<<"/">>/binary, Path/binary>>,
del_path(Pid, FullPath);
Reason ->
logger:error("zk parse url fail reason ~p", [Reason]),
{error, Reason}
end.
get_dynamic(UrlInfo) ->
case maps:get(<<"dynamic">>, UrlInfo#dubbo_url.parameters, <<"true">>) of
<<"true">> ->
e;
_ ->
p
end.
get_register_node(root, _UrlInfo) ->
<<"dubbo">>;
get_register_node(service, UrlInfo) ->
maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters);
get_register_node(category, UrlInfo) ->
maps:get(<<"category">>, UrlInfo#dubbo_url.parameters, <<"providers">>).
subscribe(SubcribeUrl, NotifyFun) ->
{ok, UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
InterfaceName = maps:get(<<"interface">>, UrlInfo#dubbo_url.parameters),
try gen_server:call(?SERVER, {subscribe_provider, InterfaceName, NotifyFun}, 5000) of
ok ->
ok
catch
_Error:Reason ->
%%todo improve error type
{error, Reason}
end.
register_provider(Provider) ->
gen_server:call(?SERVER, {add_provider, Provider}),
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
connection() ->
{ok, List} = application:get_env(dubboerl, zookeeper_list),
{ok, Pid} = erlzk:connect(List, 30000, [
{chroot, "/"},
{monitor, self()}]),
{ok, Pid}.
add_consumer(InterfaceName, ConsumerUrl, State) ->
Pid = State#state.zk_pid,
ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))),
check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
%% todo
%% get_provider_list(Consumer, State),
ok.
register_provider_path(Provider, State) ->
#state{zk_pid = Pid} = State,
ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
ok.
get_provider_list(InterfaceName, ZkPid) ->
InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
ChildList = get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
ChildList.
get_provider_and_start(Pid, Interface, Path) ->
case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of
{ok, ChildList} ->
logger:debug("get provider list ~p", [ChildList]),
ChildList;
{error, no_node} ->
logger:warning("interface ~p provide zk node unexist", [Interface]),
check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Interface, p}, {<<"providers">>, p}]),
get_provider_and_start(Pid, Interface, Path);
{error, R1} ->
logger:debug("[add_consumer] get_provider_list error ~p", [R1]),
[]
end.
provider_watcher(Interface) ->
receive
{node_children_changed, Path} ->
gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
{Event, Path} ->
logger:debug("provider_watcher get event ~p ~p", [Event, Path])
end,
ok.
notify_provider_change(Fun, Interface, []) ->
UrlInfo = #dubbo_url{
scheme = <<"empty">>,
host = <<"127.0.0.1">>,
path = Interface,
port = 80,
parameters = #{
<<"interface">> => Interface
}
},
UrlInfoBin = dubbo_common_fun:url_to_binary(UrlInfo),
logger:debug("notify provider change fun ~p", [Fun]),
Fun(Interface, [UrlInfoBin]),
ok;
notify_provider_change(Fun, Interface, List) ->
List2 = [http_uri:decode(Item) || Item <- List],
logger:debug("notify provider change fun ~p", [Fun]),
Fun(Interface, List2),
ok.
del_path(Pid, Path) ->
case erlzk:delete(Pid, Path) of
ok ->
ok;
{error, Reason} ->
logger:warning("zookeeper registry del path error ~p path ~p", [Reason, Path]),
{error, Reason}
end.
create_path(Pid, Path, CreateType) ->
case erlzk:create(Pid, Path, CreateType) of
{ok, ActualPath} ->
logger:debug("create zk path success ~p", [ActualPath]),
ok;
{error, R1} ->
logger:debug("create zk path error ~p ~p", [Path, R1])
end,
ok.
check_and_create_path(_Pid, RootPath, []) ->
RootPath;
check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
case erlzk:exists(Pid, CheckPath) of
{ok, Stat} ->
check_and_create_path(Pid, CheckPath, Rst);
{error, no_node} ->
logger:debug("check_and_create_path unexist no_node ~p", [CheckPath]),
create_path(Pid, CheckPath, CreateType),
check_and_create_path(Pid, CheckPath, Rst);
{error, R1} ->
logger:debug("check_and_create_path unexist ~p", [R1]),
check_and_create_path(Pid, CheckPath, Rst)
end.