| %%------------------------------------------------------------------------------ |
| %% Licensed to the Apache Software Foundation (ASF) under one or more |
| %% contributor license agreements. See the NOTICE file distributed with |
| %% this work for additional information regarding copyright ownership. |
| %% The ASF licenses this file to You under the Apache License, Version 2.0 |
| %% (the "License"); you may not use this file except in compliance with |
| %% the License. You may obtain a copy of the License at |
| %% |
| %% http://www.apache.org/licenses/LICENSE-2.0 |
| %% |
| %% Unless required by applicable law or agreed to in writing, software |
| %% distributed under the License is distributed on an "AS IS" BASIS, |
| %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| %% See the License for the specific language governing permissions and |
| %% limitations under the License. |
| %%------------------------------------------------------------------------------ |
| -module(dubbo_zookeeper). |
| -behaviour(gen_server). |
| |
| -include("dubbo.hrl"). |
| %% API |
| -export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]). |
| |
| %% gen_server callbacks |
| -export([init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3]). |
| |
| -define(SERVER, ?MODULE). |
| |
| -record(state, {zk_pid}). |
| |
| %%%=================================================================== |
| %%% API |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @doc |
| %% Starts the server |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(start_link() -> |
| {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). |
| start_link() -> |
| gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). |
| |
| %%%=================================================================== |
| %%% gen_server callbacks |
| %%%=================================================================== |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Initializes the server |
| %% |
| %% @spec init(Args) -> {ok, State} | |
| %% {ok, State, Timeout} | |
| %% ignore | |
| %% {stop, Reason} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(init(Args :: term()) -> |
| {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term()} | ignore). |
| init([]) -> |
| {ok, Pid} = connection(), |
| {ok, #state{zk_pid = Pid}}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling call messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, |
| State :: #state{}) -> |
| {reply, Reply :: term(), NewState :: #state{}} | |
| {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| |
| handle_call({add_consumer, Consumer}, _From, State) -> |
| add_consumer(Consumer, State), |
| {reply, ok, State}; |
| handle_call({add_provider, Provider}, _From, State) -> |
| register_provider_path(Provider, State), |
| {reply, ok, State}; |
| handle_call(_Request, _From, State) -> |
| {reply, ok, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling cast messages |
| %% |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_cast(Request :: term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) -> |
| get_provider_and_start(Pid, Interface, Path), |
| {noreply, State}; |
| handle_cast(_Request, State) -> |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Handling all non call/cast messages |
| %% |
| %% @spec handle_info(Info, State) -> {noreply, State} | |
| %% {noreply, State, Timeout} | |
| %% {stop, Reason, State} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> |
| {noreply, NewState :: #state{}} | |
| {noreply, NewState :: #state{}, timeout() | hibernate} | |
| {stop, Reason :: term(), NewState :: #state{}}). |
| handle_info(_Info, State) -> |
| logger:info("zk server recv msg:~p", [_Info]), |
| {noreply, State}. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% This function is called by a gen_server when it is about to |
| %% terminate. It should be the opposite of Module:init/1 and do any |
| %% necessary cleaning up. When it returns, the gen_server terminates |
| %% with Reason. The return value is ignored. |
| %% |
| %% @spec terminate(Reason, State) -> void() |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), |
| State :: #state{}) -> term()). |
| terminate(_Reason, _State) -> |
| ok. |
| |
| %%-------------------------------------------------------------------- |
| %% @private |
| %% @doc |
| %% Convert process state when code is changed |
| %% |
| %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} |
| %% @end |
| %%-------------------------------------------------------------------- |
| -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, |
| Extra :: term()) -> |
| {ok, NewState :: #state{}} | {error, Reason :: term()}). |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| register_consumer(Consumer) -> |
| gen_server:call(?SERVER, {add_consumer, Consumer}), |
| ok. |
| register_consumer(Name, Option) -> |
| Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]}, |
| register_consumer(Consumer), |
| ok. |
| register_provider(Provider) -> |
| gen_server:call(?SERVER, {add_provider, Provider}), |
| ok. |
| |
| %%%=================================================================== |
| %%% Internal functions |
| %%%=================================================================== |
| |
| connection() -> |
| {ok, List} = application:get_env(dubboerl, zookeeper_list), |
| {ok, Pid} = erlzk:connect(List, 30000, [ |
| {chroot, "/"}, |
| {monitor, self()}]), |
| {ok, Pid}. |
| |
| add_consumer(Consumer, State) -> |
| Pid = State#state.zk_pid, |
| %% InterfacePath= << <<"/dubbo/">>/binary,Name/binary ,<<"consumers">>/binary >>, |
| ConsumerNode = gen_consumer_node_info(Consumer), |
| ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerNode))), |
| check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Consumer#consumer_config.interface, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]), |
| get_provider_list(Consumer, State), |
| ok. |
| register_provider_path(Provider, State) -> |
| #state{zk_pid = Pid} = State, |
| ProviderNode = dubbo_node_config_util:gen_provider_info(Provider), |
| check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]), |
| ok. |
| |
| |
| get_provider_list(Consumer, State) -> |
| Pid = State#state.zk_pid, |
| InterfacePath = <<<<"/dubbo/">>/binary, (Consumer#consumer_config.interface)/binary, <<"/providers">>/binary>>, |
| get_provider_and_start(Pid, Consumer#consumer_config.interface, InterfacePath), |
| ok. |
| get_provider_and_start(Pid, Interface, Path) -> |
| case erlzk:get_children(Pid, Path, spawn(dubbo_zookeeper, provider_watcher, [Interface])) of |
| {ok, ChildList} -> |
| logger:debug("get provider list ~p", [ChildList]), |
| start_provider_process(Interface, ChildList), |
| ok; |
| {error, R1} -> |
| logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]), |
| ok |
| end. |
| |
| provider_watcher(Interface) -> |
| receive |
| {node_children_changed, Path} -> |
| gen_server:cast(?SERVER, {provider_node_change, Interface, Path}), |
| logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]); |
| {Event, Path} -> |
| %% Path = "/a", |
| %% Event = node_created |
| logger:debug("provider_watcher get event ~p ~p", [Event, Path]) |
| end, |
| ok. |
| |
| |
| create_path(Pid, Path, CreateType) -> |
| case erlzk:create(Pid, Path, CreateType) of |
| {ok, ActualPath} -> |
| logger:debug("[add_consumer] create zk path success ~p", [ActualPath]), |
| ok; |
| {error, R1} -> |
| logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1]) |
| end, |
| ok. |
| check_and_create_path(_Pid, _RootPath, []) -> |
| ok; |
| check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) -> |
| CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>, |
| case erlzk:exists(Pid, CheckPath) of |
| {ok, Stat} -> |
| check_and_create_path(Pid, CheckPath, Rst); |
| {error, no_node} -> |
| logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]), |
| create_path(Pid, CheckPath, CreateType), |
| check_and_create_path(Pid, CheckPath, Rst); |
| {error, R1} -> |
| logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]), |
| check_and_create_path(Pid, CheckPath, Rst) |
| end. |
| |
| gen_consumer_node_info(Consumer) -> |
| %% revision参数字段的作用是什么? 暂时不添加 |
| Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>), |
| Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s×tamp=~p">>, |
| [dubbo_common_fun:local_ip_v4_str(), |
| Consumer#consumer_config.interface, |
| Consumer#consumer_config.application, |
| Consumer#consumer_config.category, |
| Consumer#consumer_config.check, |
| Consumer#consumer_config.default_timeout, |
| Consumer#consumer_config.dubbo_version, |
| Consumer#consumer_config.interface, |
| Methods, |
| Consumer#consumer_config.side, |
| dubbo_time_util:timestamp_ms() |
| ]), |
| list_to_binary(Value). |
| |
| %%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]). |
| start_provider_process(Interface, ProviderList) -> |
| dubbo_consumer_pool:start_consumer(Interface, ProviderList). |
| |