mod defined for plugin design
diff --git a/config_example/sys.config b/config_example/sys.config
index b100088..0ed8a87 100644
--- a/config_example/sys.config
+++ b/config_example/sys.config
@@ -23,7 +23,7 @@
{protocol,hessian},
{port,20881},
{consumer,[
- {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
+ % {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
]},
{provider,[
{dubbo_service_user_impl,userOperator,<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]}
diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl
index 31204a5..ce6baf1 100644
--- a/include/dubboerl.hrl
+++ b/include/dubboerl.hrl
@@ -18,4 +18,7 @@
-define(PROVIDER_WORKER,provider_worker).
--define(TRAFFIC_CONTROL,traffic_control).
\ No newline at end of file
+-define(TRAFFIC_CONTROL,traffic_control).
+
+
+-record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}).
\ No newline at end of file
diff --git a/rebar.config b/rebar.config
index aebfaa3..2eb91f2 100644
--- a/rebar.config
+++ b/rebar.config
@@ -21,9 +21,10 @@
{deps, [
{erlzk, ".*", {git, "https://github.com/huaban/erlzk.git", {tag, "v0.6.2"}}},
- {ranch, ".*", {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}},
+ {ranch, ".*", {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}},
{poolboy, ".*", {git, "https://github.com/devinus/poolboy.git", {tag, "1.5.1"}}},
- {jiffy, "0.15.1"}
+ {jiffy, "0.15.1"},
+ {hooks,{git,"https://github.com/benoitc/hooks.git",{tag,"2.1.0"}}}
]}.
diff --git a/rebar.lock b/rebar.lock
index f57b258..cc2515b 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -3,6 +3,10 @@
{git,"https://github.com/huaban/erlzk.git",
{ref,"aa7190ee2343ac1341cea3edc9b9eea36c591708"}},
0},
+ {<<"hooks">>,
+ {git,"https://github.com/benoitc/hooks.git",
+ {ref,"d4872554a27c0ee9c2166d18000f725f8c3dc8a8"}},
+ 0},
{<<"jiffy">>,{pkg,<<"jiffy">>,<<"0.15.1">>},0},
{<<"poolboy">>,
{git,"https://github.com/devinus/poolboy.git",
diff --git a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
index 974fabc..425e41e 100644
--- a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
+++ b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml
@@ -10,8 +10,8 @@
<dubbo:consumer check="false" timeout="300000" id="dubboConsumerConfig" retries="0"/>
- <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>
- <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>
+<!-- <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>-->
+<!-- <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>-->
- <!-- <dubbo:reference id="userInterface" interface="UserOperator" retries="0" />-->
+ <dubbo:reference id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" />
</beans>
diff --git a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
index b92ca62..d7e0809 100644
--- a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
+++ b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl
@@ -67,7 +67,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec genUserId()->
@@ -96,7 +96,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec queryUserList(Arg0::list())->
@@ -158,5 +158,5 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
diff --git a/src/dubbo_cluster.erl b/src/dubbo_cluster.erl
new file mode 100644
index 0000000..da4031e
--- /dev/null
+++ b/src/dubbo_cluster.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_cluster).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_cluster_failfast.erl b/src/dubbo_cluster_failfast.erl
new file mode 100644
index 0000000..6078517
--- /dev/null
+++ b/src/dubbo_cluster_failfast.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_cluster_failfast).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl
index 6744171..e74f57f 100644
--- a/src/dubbo_common_fun.erl
+++ b/src/dubbo_common_fun.erl
@@ -16,8 +16,9 @@
%%------------------------------------------------------------------------------
-module(dubbo_common_fun).
+-include("dubboerl.hrl").
%% API
--export([local_ip_v4/0, local_ip_v4_str/0]).
+-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, map_to_url/1]).
local_ip_v4() ->
{ok, Addrs} = inet:getifaddrs(),
@@ -29,3 +30,60 @@
local_ip_v4_str() ->
{V1, V2, V3, V4} = local_ip_v4(),
list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])).
+
+
+-spec(parse_url(Url :: binary()|list()) -> {ok, map()}).
+parse_url(Url) when is_binary(Url) ->
+ parse_url(binary_to_list(Url));
+parse_url(Url) ->
+ case http_uri:parse(Url, []) of
+ {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} ->
+ QueryStr = case lists:prefix("?", Query) of
+ true ->
+ [_ | Query2] = Query,
+ Query2;
+ false ->
+ Query
+ end,
+ QueryListTmp = string:tokens(QueryStr, "&"),
+ Parameters = parse_url_parameter(QueryListTmp, #{}),
+ Result = #dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters},
+ {ok, Result};
+ {error, R1} ->
+ {error, R1}
+ end.
+
+
+parse_url_parameter([], Parameters) ->
+ Parameters;
+parse_url_parameter([Item | Rest], Parameters) ->
+ case string:tokens(Item, "=") of
+ KeyPair when length(KeyPair) == 2 ->
+ [Key, Value] = KeyPair,
+ parse_url_parameter(Rest, maps:put(Key, Value, Parameters));
+ KeyPair2 ->
+ logger:error("parse parameter error, keypair ~p", [KeyPair2]),
+ parse_url_parameter(Rest, Parameters)
+ end.
+
+
+map_to_url(UrlInfo) ->
+ ParameterStr =
+ case UrlInfo#dubbo_url.parameters of
+ undefined ->
+ "";
+ Parameter ->
+ KeyValues = maps:to_list(Parameter),
+ KeyValues2 = [io_lib:format("~s=~s", [Key, http_uri:encode(Value)]) || {Key, Value} <= KeyValues],
+ ParameterStr1 = string:join(KeyValues2, "&"),
+ ParameterStr2 = ["?" | ParameterStr1],
+ list_to_binary(ParameterStr2)
+ end,
+ Value = io_lib:format(<<"~s://~s/~s?~s">>,
+ [
+ UrlInfo#dubbo_url.scheme,
+ UrlInfo#dubbo_url.host,
+ UrlInfo#dubbo_url.path,
+ ParameterStr
+ ]),
+ list_to_binary(Value).
\ No newline at end of file
diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl
new file mode 100644
index 0000000..2356e7c
--- /dev/null
+++ b/src/dubbo_directory.erl
@@ -0,0 +1,173 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_directory).
+
+-behaviour(gen_server).
+
+-export([subscribe/2,notify/1]).
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+ {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+ {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term()} | ignore).
+init([]) ->
+ {ok, #state{}}.
+
+subscribe(RegistryName,SubcribeUrl)->
+ try gen_server:call(?SERVER,{subscribe,RegistryName,SubcribeUrl},5000) of
+ ok->
+ ok
+ catch
+ Error:Reason->
+ %% todo improve erro type
+ {error,Reason}
+ end.
+
+notify(UrlList)->
+ dubbo_consumer_pool:start_consumer(Interface, UrlList),
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: #state{}) ->
+ {reply, Reply :: term(), NewState :: #state{}} |
+ {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_call({subscribe,RegistryName,SubcribeUrl}, _From, State) ->
+ NotifyFun= fun dubbo_directory:notify/1,
+ apply(RegistryName,subscribe,[SubcribeUrl,NotifyFun]),
+ {reply, ok, State};
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+ Extra :: term()) ->
+ {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
diff --git a/src/dubbo_extension.erl b/src/dubbo_extension.erl
new file mode 100644
index 0000000..4e5eb0c
--- /dev/null
+++ b/src/dubbo_extension.erl
@@ -0,0 +1,189 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_extension).
+-behaviour(gen_server).
+
+%% API
+-export([run/3,run_fold/4,register/3,unregister/3]).
+
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+
+-define(TAB, ?MODULE).
+
+-record(state, {}).
+
+
+-spec reg(HookName::hookname(), Module::atom(),Priority::integer()) -> ok | {error, term()}.
+register(HookName, Module,Priority) ->
+ gen_server:call(?MODULE, {register, HookName, {Priority, {Module}}}).
+
+-spec unregister(HookName::hookname(), Module::atom(),Priority::integer()) -> ok.
+unregister(HookName, Module,Priority) ->
+ gen_server:call(?MODULE, {unregister, HookName, {Priority, Module}}).
+
+%% @doc run all hooks registered for the HookName.
+%% Execution can be interrupted if an hook return the atom `stop'.
+-spec run(HookName::hookname(), Args::list()) -> ok.
+run(HookName,Fun, Args) ->
+ case find_hooks(HookName) of
+ no_hook -> ok;
+ Hooks -> run1(Hooks, HookName,Fun, Args)
+ end.
+
+run1([], _HookName,_Fun, _Args) ->
+ ok;
+run1([M | Rest], HookName, Fun, Args) ->
+ Ret = (catch apply(M, Fun, Args)),
+ case Ret of
+ {'EXIT', Reason} ->
+ logger:error("~p~n error running extension: ~p~n", [HookName, Reason]),
+ run1(Rest, HookName,Fun, Args);
+ stop ->
+ ok;
+ _ ->
+ run1(Rest, HookName,Fun, Args)
+ end.
+
+-spec run_fold(HookName::hookname(), Args::list(), Acc::any()) -> Acc2::any().
+run_fold(HookName, Fun, Args, Acc) ->
+ case find_hooks(HookName) of
+ no_hook -> Acc;
+ Hooks -> run_fold1(Hooks,HookName, Fun, Args, Acc)
+ end.
+
+
+run_fold1([], _HookName,_Fun, _Args, Acc) ->
+ Acc;
+run_fold1([M | Rest], HookName,Fun, Args0, Acc) ->
+ Args = Args0 ++ [Acc],
+ Ret = (catch apply(M, Fun, Args)),
+ case Ret of
+ {'EXIT', Reason} ->
+ error_logger:error_msg("~p~n error running hook: ~p~n",
+ [HookName, Reason]),
+ run_fold1(Rest, HookName,Fun,Args0, Acc);
+ stop ->
+ Acc;
+ {stop, NewAcc} ->
+ NewAcc;
+ _ ->
+ run_fold1(Rest, HookName,Fun,Args0, Ret)
+ end.
+
+
+
+
+%% @doc retrieve the lists of registered functions for an hook.
+-spec find(HookName::hookname()) -> {ok, [{atom(), atom()}]} | error.
+find(HookName) ->
+ case ?find_hook(HookName) of
+ no_hook -> error;
+ Hooks -> {ok, Hooks}
+ end.
+
+%% @hidden
+start_link() ->
+ _ = init_tabs(),
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init_tabs() ->
+ case ets:info(?TAB, name) of
+ undefined ->
+ ets:new(?TAB, [ordered_set, public, named_table,
+ {read_concurrency, true},
+ {write_concurrency, true}]);
+ _ ->
+ true
+ end.
+
+%% @hidden
+init([]) ->
+ {ok, #state{}}.
+
+%% @hidden
+handle_call({register, HookName, {Priority, Module}}, _From, State) ->
+ do_register(HookName, {Priority, Module}),
+ {reply, ok, State};
+handle_call({unregister, HookName, {Priority, Module}}, _From, State) ->
+ do_unregister(HookName, {Priority, Module}),
+ {reply, ok, State};
+handle_call(_Msg, _From, State) ->
+ {reply, badarg, State}.
+
+%% @hidden
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%% @hidden
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% @hidden
+terminate(_Reason, _Srv) ->
+ ok.
+
+do_register(HookName, {_Priority, ModuleName}=Hook) ->
+ check_module(ModuleName),
+ update_hooks(HookName, [Hook]).
+
+
+do_unregister(HookName, Hook) ->
+ remove_hooks(HookName, [Hook]),
+ ok.
+
+update_hooks(HookName, HookFuns) ->
+ case ets:lookup(?TAB, HookName) of
+ [] ->
+ true = ets:insert(?TAB, {HookName, HookFuns});
+ [{_, Funs}] ->
+ Funs2 = lists:keysort(1, Funs ++ HookFuns),
+ true = ets:insert(?TAB, {HookName, Funs2})
+ end.
+
+remove_hooks(HookName, HookFuns) ->
+ case ets:lookup(?TAB, HookName) of
+ [] ->
+ ok;
+ [{_, Funs}] ->
+ Funs2 = Funs -- HookFuns,
+ case Funs2 of
+ [] ->
+ ets:delete(?TAB, HookName);
+ _ ->
+ ets:insert(?TAB, {HookName, Funs2})
+ end
+ end.
+
+check_module(ModuleName) ->
+ _ = code:ensure_loaded(ModuleName),
+ ok.
+
+find_hooks(HookName)->
+ case ets:lookup(?TAB,HookName) of
+ []->
+ no_hook;
+ [{_, Modules}]->
+ Modules
+ end.
diff --git a/src/dubbo_filter.erl b/src/dubbo_filter.erl
new file mode 100644
index 0000000..7fa7950
--- /dev/null
+++ b/src/dubbo_filter.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_filter).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index b29751c..0a3527b 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -16,82 +16,8 @@
%%------------------------------------------------------------------------------
-module(dubbo_invoker).
--include("dubbo.hrl").
%% API
--export([invoke_request/2, invoke_request/3, invoke_request/5]).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
- {ok, reference()}|
- {ok, reference(), Data :: any(), RpcContent :: list()}|
- {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request) ->
- invoke_request(Interface, Request, [], #{}, self()).
-
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
- {ok, reference()}|
- {ok, reference(), Data :: any(), RpcContent :: list()}|
- {error, Reason :: timeout|no_provider|any()}.
-invoke_request(Interface, Request, RequestOption) ->
- invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
+-export([]).
--spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
- {ok, reference()}|
- {ok, reference(), Data :: any(), RpcContent :: list()}|
- {error, Reason :: timeout|no_provider|request_full|any()}.
-invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
- case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of
- {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
- case dubbo_traffic_control:check_goon(HostFlag, 199) of
- ok ->
- Request2 = merge_attachments(Request, RpcContext),
- {ok, RequestData} = dubbo_codec:encode_request(Request2),
- Ref = get_ref(RequestState),
- gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
- case is_sync(RequestState) of
- true ->
- sync_receive(Ref, get_timeout(RequestState));
- false -> {ok, Ref}
- end;
- full ->
- {error, request_full}
- end;
- {error, none} ->
- logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
- {error, no_provider}
- end.
-
-
-is_sync(Option) ->
- maps:is_key(sync, Option).
-get_ref(Option) ->
- maps:get(ref, Option, make_ref()).
-
-get_timeout(Option) ->
- maps:get(timeout, Option, ?REQUEST_TIME_OUT).
-
-
-sync_receive(Ref, TimeOut) ->
- receive
- {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
- {ok, Ref, Response, RpcContent}
- after
- TimeOut ->
- {error, timeout}
- end.
-merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
- Request;
-merge_attachments(Request, Option) ->
- Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
- case lists:keyfind(attachments, 1, Option) of
- false -> OptionAttachments = [];
- {attachments, OptionAttachments} ->
- OptionAttachments
- end,
- List = [
- {<<"version">>, <<"0.0.0">>},
- {<<"timeout">>, <<"5000">>}
- ],
- Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
- Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
- Request#dubbo_request{data = Data2}.
+-callback(invoke(Invoker,Invocation) -> ok).
\ No newline at end of file
diff --git a/src/dubbo_invoker_cluster.erl b/src/dubbo_invoker_cluster.erl
new file mode 100644
index 0000000..906a0fa
--- /dev/null
+++ b/src/dubbo_invoker_cluster.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_invoker_cluster).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl
new file mode 100644
index 0000000..c878656
--- /dev/null
+++ b/src/dubbo_invoker_old.erl
@@ -0,0 +1,97 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_invoker_old).
+
+-include("dubbo.hrl").
+%% API
+-export([invoke_request/2, invoke_request/3, invoke_request/5]).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
+ {ok, reference()}|
+ {ok, reference(), Data :: any(), RpcContent :: list()}|
+ {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request) ->
+ invoke_request(Interface, Request, [], #{}, self()).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) ->
+ {ok, reference()}|
+ {ok, reference(), Data :: any(), RpcContent :: list()}|
+ {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request, RequestOption) ->
+ invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()).
+
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
+ {ok, reference()}|
+ {ok, reference(), Data :: any(), RpcContent :: list()}|
+ {error, Reason :: timeout|no_provider|request_full|any()}.
+invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
+ case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of
+ {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
+ case dubbo_traffic_control:check_goon(HostFlag, 199) of
+ ok ->
+ Request2 = merge_attachments(Request, RpcContext),
+ {ok, RequestData} = dubbo_codec:encode_request(Request2),
+ Ref = get_ref(RequestState),
+ gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}),
+ case is_sync(RequestState) of
+ true ->
+ sync_receive(Ref, get_timeout(RequestState));
+ false -> {ok, Ref}
+ end;
+ full ->
+ {error, request_full}
+ end;
+ {error, none} ->
+ logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
+ {error, no_provider}
+ end.
+
+
+is_sync(Option) ->
+ maps:is_key(sync, Option).
+get_ref(Option) ->
+ maps:get(ref, Option, make_ref()).
+
+get_timeout(Option) ->
+ maps:get(timeout, Option, ?REQUEST_TIME_OUT).
+
+
+sync_receive(Ref, TimeOut) ->
+ receive
+ {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
+ {ok, Ref, Response, RpcContent}
+ after
+ TimeOut ->
+ {error, timeout}
+ end.
+merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
+ Request;
+merge_attachments(Request, Option) ->
+ Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
+ case lists:keyfind(attachments, 1, Option) of
+ false -> OptionAttachments = [];
+ {attachments, OptionAttachments} ->
+ OptionAttachments
+ end,
+ List = [
+ {<<"version">>, <<"0.0.0">>},
+ {<<"timeout">>, <<"5000">>}
+ ],
+ Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
+ Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2},
+ Request#dubbo_request{data = Data2}.
diff --git a/src/dubbo_protocol.erl b/src/dubbo_protocol.erl
new file mode 100644
index 0000000..3c82119
--- /dev/null
+++ b/src/dubbo_protocol.erl
@@ -0,0 +1,26 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_protocol).
+
+-callback refer(InterfaceClassInfo,Url)->ok.
+
+%% API
+-export([refer/2]).
+
+
+refer(InterfaceClassInfo,Url)->
+ dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]).
\ No newline at end of file
diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl
new file mode 100644
index 0000000..2242939
--- /dev/null
+++ b/src/dubbo_protocol_dubbo.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_protocol_dubbo).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl
new file mode 100644
index 0000000..f1d9bc4
--- /dev/null
+++ b/src/dubbo_protocol_registry.erl
@@ -0,0 +1,53 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_protocol_registry).
+-behaviour(dubbo_protocol).
+
+-include("dubboerl.hrl").
+
+%% API
+-export([]).
+
+refer(InterfaceClassInfo,Url)->
+ {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+
+ {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo),
+
+ ConsumerUrl = gen_consumer_url(UrlInfo),
+ %% 通知directory
+ dubbo_registry:register(RegistryName,ConsumerUrl),
+
+ dubbo_directory:subscribe(RegistryName,ConsumerUrl),
+
+ %% return
+ ok.
+
+
+gen_consumer_url(UrlInfo)->
+ Parameters = UrlInfo#dubbo_url.parameters,
+ #{<<"refer">> := Refer} = Parameters,
+ Refer2 = http_uri:decode(Refer),
+ Parameters2 = dubbo_common_fun:parse_url(Refer2,#{}),
+ #{<<"interface">> := Interface} = Parameters2,
+ ConsumerUrlInfo = UrlInfo#dubbo_url{
+ scheme = <<"consumer">>,
+ host = dubbo_common_fun:local_ip_v4_str(),
+ path = Interface,
+ parameters = Parameters2
+ },
+ ConsumerUrl = dubbo_common_fun:map_to_url(ConsumerUrlInfo),
+ ConsumerUrl.
\ No newline at end of file
diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl
new file mode 100644
index 0000000..3386cdc
--- /dev/null
+++ b/src/dubbo_provider_consumer_reg_table.erl
@@ -0,0 +1,21 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_provider_consumer_reg_table).
+-author("dlive").
+
+%% API
+-export([]).
diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl
new file mode 100644
index 0000000..f6e8bdd
--- /dev/null
+++ b/src/dubbo_reference_config.erl
@@ -0,0 +1,74 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_reference_config).
+
+-record(dubbo_interface_info,{}).
+
+%% API
+-export([]).
+
+init_reference()->
+ InitConfigMap= #{
+
+ },
+ %% 组装各类需要数据
+ ok.
+
+
+create_proxy(InitConfigMap)->
+
+
+ InterfaceClassInfo = #{},
+ Para = gen_parameter(),
+ Url = gen_registry_url(Para),
+ dubbo_extension:run(protoco_wapper,refer,[InterfaceClassInfo,Url]),
+ ok.
+
+ %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901®ister.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false×tamp=1559727789953®istry=zookeeper&release=2.7.1×tamp=1559727842451
+
+
+gen_registry_url(Para)->
+ %%todo 组装para & url
+
+ Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953®istry=zookeeper&release=2.7.1×tamp=1559727842451",
+ Url.
+gen_parameter()->
+ Para = #{
+ <<"application">> => get_appname(),
+ <<"dubbo">> => <<"2.0.2">>,
+ <<"pid">> => get_pid(),
+ <<"refer">> => get_refinfo(),
+ <<"registry">> => get_registry_type(),
+ <<"release">> => <<"2.7.1">>,
+ <<"timestamp">> => <<"1559727842451">>
+ },
+
+ Para.
+
+get_appname()->
+ %%todo
+ <<"hello-world">>.
+get_pid()->
+ %%todo
+ <<"68901">>.
+get_refinfo()->
+ %%todo
+ <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>.
+
+get_registry_type()->
+ %%todo
+ <<"zookeeper">>.
\ No newline at end of file
diff --git a/src/dubbo_registry.erl b/src/dubbo_registry.erl
new file mode 100644
index 0000000..89eb64b
--- /dev/null
+++ b/src/dubbo_registry.erl
@@ -0,0 +1,47 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_registry).
+-include("dubboerl.hrl").
+
+-callback start(Url :: binary) -> ok.
+-callback register(Url::binary())-> term().
+-callback subscribe(SubcribeUrl::binary(),NotifyFun::function())->ok.
+
+%% API
+-export([setup_register/1,register/2]).
+
+-spec(setup_register(UrlInfo :: map()) -> {ok, RegistryProcessName :: atom()}|{error, term()}).
+setup_register(UrlInfo) ->
+ RegistryModuleName = get_registry_module(UrlInfo),
+ case whereis(RegistryModuleName) of
+ undefined ->
+ apply(RegistryModuleName, start, [UrlInfo]),
+ {ok, RegistryModuleName};
+ _ ->
+ {ok, RegistryModuleName}
+ end.
+
+register(RegistryName,Url) ->
+ logger:info("call ~p register url ~p",[RegistryName,Url]),
+ Result = apply(RegistryName,register,[Url]),
+ Result.
+
+
+get_registry_module(Info) ->
+ RegistryName = Info#dubbo_url.scheme,
+ FullName = << <<"dubbo_registry_">>, RegistryName/binary>>,
+ binary_to_existing_atom(FullName).
\ No newline at end of file
diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl
new file mode 100644
index 0000000..ea9ef62
--- /dev/null
+++ b/src/dubbo_registry_zookeeper.erl
@@ -0,0 +1,314 @@
+%%------------------------------------------------------------------------------
+%% Licensed to the Apache Software Foundation (ASF) under one or more
+%% contributor license agreements. See the NOTICE file distributed with
+%% this work for additional information regarding copyright ownership.
+%% The ASF licenses this file to You under the Apache License, Version 2.0
+%% (the "License"); you may not use this file except in compliance with
+%% the License. You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%------------------------------------------------------------------------------
+-module(dubbo_registry_zookeeper).
+-behaviour(gen_server).
+-behaviour(dubbo_registry).
+
+-include("dubbo.hrl").
+-include("dubboerl.hrl").
+%% API
+-export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]).
+
+-export([start/1,register/1,subscribe/2]).
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {zk_pid,notify_fun}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(start_link() ->
+ {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+-spec(init(Args :: term()) ->
+ {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term()} | ignore).
+init([]) ->
+ {ok, Pid} = connection(),
+ {ok, #state{zk_pid = Pid}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: #state{}) ->
+ {reply, Reply :: term(), NewState :: #state{}} |
+ {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+
+handle_call({add_consumer, Interface,ConsumerUrl}, _From, State) ->
+ add_consumer(Interface,ConsumerUrl, State),
+ {reply, ok, State};
+handle_call({add_provider, Provider}, _From, State) ->
+ register_provider_path(Provider, State),
+ {reply, ok, State};
+handle_call({subscribe_provider,InterfaceName,NotifyFun}, _From, #state{zk_pid = ZkPid} = State) ->
+ NewState=State#state{notify_fun = NotifyFun},
+ get_provider_list(InterfaceName,ZkPid,NotifyFun),
+ {reply, ok, NewState};
+
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) ->
+ get_provider_and_start(Pid, Interface, Path),
+ {noreply, State};
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State) ->
+ logger:info("zk server recv msg:~p", [_Info]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: #state{}) -> term()).
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+ Extra :: term()) ->
+ {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+
+%%----------------------------------------------
+%% dubbo_registry
+%%----------------------------------------------
+start(Url) ->
+ ok.
+register(Url)->
+ {ok,UrlInfo} = dubbo_common_fun:parse_url(Url),
+ InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
+ register(UrlInfo#dubbo_url.scheme,InterfaceName,Url),
+ ok.
+
+register(<<"consumer">>,InterfaceName,Url)->
+ gen_server:call(?SERVER, {add_consumer,InterfaceName, Url}),
+ ok;
+register(<<"provider">>,InterfaceName,Url)->
+
+ ok.
+
+subscribe(SubcribeUrl,NotifyFun)->
+ {ok,UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl),
+ InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters),
+ try gen_server:call(?SERVER,{subscribe_provider,InterfaceName,NotifyFun},5000) of
+ ok->
+ ok
+ catch
+ Error:Reason->
+ %%todo improve error type
+ {error,Reason}
+ end.
+
+register_consumer(Consumer) ->
+ gen_server:call(?SERVER, {add_consumer, Consumer}),
+ ok.
+register_consumer(Name, Option) ->
+ Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]},
+ register_consumer(Consumer),
+ ok.
+register_provider(Provider) ->
+ gen_server:call(?SERVER, {add_provider, Provider}),
+ ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+connection() ->
+ {ok, List} = application:get_env(dubboerl, zookeeper_list),
+ {ok, Pid} = erlzk:connect(List, 30000, [
+ {chroot, "/"},
+ {monitor, self()}]),
+ {ok, Pid}.
+
+add_consumer(InterfaceName,ConsumerUrl, State) ->
+ Pid = State#state.zk_pid,
+%% ConsumerNode = gen_consumer_node_info(Consumer),
+ ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))),
+ check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]),
+ %% todo
+%% get_provider_list(Consumer, State),
+ ok.
+register_provider_path(Provider, State) ->
+ #state{zk_pid = Pid} = State,
+ ProviderNode = dubbo_node_config_util:gen_provider_info(Provider),
+ check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]),
+ ok.
+
+
+get_provider_list(InterfaceName,ZkPid,NotifyFun) ->
+ InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>,
+ ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath),
+ NotifyFun(ChildList),
+ ok.
+get_provider_and_start(Pid, Interface, Path) ->
+ case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of
+ {ok, ChildList} ->
+ logger:debug("get provider list ~p", [ChildList]),
+%% start_provider_process(Interface, ChildList),
+ ChildList;
+ {error, R1} ->
+ logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]),
+ []
+ end.
+
+provider_watcher(Interface) ->
+ receive
+ {node_children_changed, Path} ->
+ gen_server:cast(?SERVER, {provider_node_change, Interface, Path}),
+ logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]);
+ {Event, Path} ->
+%% Path = "/a",
+%% Event = node_created
+ logger:debug("provider_watcher get event ~p ~p", [Event, Path])
+ end,
+ ok.
+
+
+create_path(Pid, Path, CreateType) ->
+ case erlzk:create(Pid, Path, CreateType) of
+ {ok, ActualPath} ->
+ logger:debug("[add_consumer] create zk path success ~p", [ActualPath]),
+ ok;
+ {error, R1} ->
+ logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1])
+ end,
+ ok.
+check_and_create_path(_Pid, _RootPath, []) ->
+ ok;
+check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) ->
+ CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>,
+ case erlzk:exists(Pid, CheckPath) of
+ {ok, Stat} ->
+ check_and_create_path(Pid, CheckPath, Rst);
+ {error, no_node} ->
+ logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]),
+ create_path(Pid, CheckPath, CreateType),
+ check_and_create_path(Pid, CheckPath, Rst);
+ {error, R1} ->
+ logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]),
+ check_and_create_path(Pid, CheckPath, Rst)
+ end.
+
+gen_consumer_node_info(Consumer) ->
+ %% revision参数字段的作用是什么? 暂时不添加
+ Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>),
+ Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s×tamp=~p">>,
+ [dubbo_common_fun:local_ip_v4_str(),
+ Consumer#consumer_config.interface,
+ Consumer#consumer_config.application,
+ Consumer#consumer_config.category,
+ Consumer#consumer_config.check,
+ Consumer#consumer_config.default_timeout,
+ Consumer#consumer_config.dubbo_version,
+ Consumer#consumer_config.interface,
+ Methods,
+ Consumer#consumer_config.side,
+ dubbo_time_util:timestamp_ms()
+ ]),
+ list_to_binary(Value).
+
+%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]).
+start_provider_process(Interface, ProviderList) ->
+ dubbo_consumer_pool:start_consumer(Interface, ProviderList).
\ No newline at end of file
diff --git a/test/userOperator.erl b/test/userOperator.erl
index 0d35917..e8567ae 100644
--- a/test/userOperator.erl
+++ b/test/userOperator.erl
@@ -67,7 +67,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec genUserId()->
@@ -96,7 +96,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec queryUserInfo(Arg0::#userInfoRequest{})->
@@ -127,7 +127,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
-spec queryUserList(Arg0::list())->
@@ -158,7 +158,7 @@
]
},
Request = dubbo_adapter:reference(Data),
- dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
+ dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption).
test() ->
queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}).
\ No newline at end of file