新增ApplicationConfig,明确客户端信息
将客户端信息注册到zookeeper的consumers下面
diff --git a/dubbo_client/__init__.py b/dubbo_client/__init__.py
index 2364b3f..ec42361 100644
--- a/dubbo_client/__init__.py
+++ b/dubbo_client/__init__.py
@@ -8,4 +8,7 @@
from registry import (
Registry,
ZookeeperRegistry,
+)
+from config import (
+ ApplicationConfig,
)
\ No newline at end of file
diff --git a/dubbo_client/config.py b/dubbo_client/config.py
index 880ec2b..c19e40b 100644
--- a/dubbo_client/config.py
+++ b/dubbo_client/config.py
@@ -2,7 +2,37 @@
__author__ = 'caozupeng'
+class ApplicationConfig(object):
+ # 应用名称
+ name = 'default'
+ # 模块版本
+ version = '1.0.0'
+ #应用负责人
+ owner = ''
+ #组织名(BU或部门)
+ organization = ''
+ #分层
+ architecture = 'web'
+ #环境,如:dev/test/run
+ environment = 'run'
+
+ def __init__(self, name, **kwargs):
+ self.name = name
+ object_property = dir(ApplicationConfig)
+ for key, value in kwargs.items():
+ if key in object_property:
+ setattr(self, key, value)
+
+ def __str__(self):
+ return 'ApplicationConfig is {0}'.format(",".join(k + ':' + v for k, v in vars(self).iteritems()))
+
+
class ReferenceConfig(object):
registry = None
interface = ''
- version = ''
\ No newline at end of file
+ version = ''
+
+
+if __name__ == '__main__':
+ application_config = ApplicationConfig('test_app', version='2.0.0', owner='caozupeng', error='ssd')
+ print application_config
\ No newline at end of file
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 2501b1d..d635fe7 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,4 +1,8 @@
# coding=utf-8
+import os
+
+from dubbo_client.config import ApplicationConfig
+
__author__ = 'caozupeng'
import urllib
@@ -37,26 +41,30 @@
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
- __service_provides = {}
- __connect_state = 'UNCONNECT'
+ _service_provides = {}
+ _app_config = ApplicationConfig('default_app')
+ _connect_state = 'UNCONNECT'
- def __init__(self, zk_hosts):
- self.__zk = KazooClient(hosts=zk_hosts, read_only=True)
+
+ def __init__(self, zk_hosts, application_config=None):
+ if application_config:
+ self._app_config = application_config
+ self.__zk = KazooClient(hosts=zk_hosts)
self.__zk.add_listener(self.__state_listener)
self.__zk.start()
def __state_listener(self, state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
- self.__connect_state = state
+ self._connect_state = state
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
# print 'disconnect from zookeeper'
- self.__connect_state = state
+ self._connect_state = state
else:
# Handle being connected/reconnected to Zookeeper
# print 'connected'
- self.__connect_state = state
+ self._connect_state = state
def __event_listener(self, event):
"""
@@ -64,7 +72,7 @@
:param event:
:return:
"""
- self.__do_event(event)
+ self._do_event(event)
def __to_key(self, interface, versioin, group):
return '{0}|{1}|{2}'.format(interface, versioin, group)
@@ -83,14 +91,14 @@
:return: 不需要返回
"""
# 如果已经存在,首先删除原有的服务的集合
- if interface in self.__service_provides:
- del self.__service_provides[interface]
+ if interface in self._service_provides:
+ del self._service_provides[interface]
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
key = self.__to_key(service_url.interface, service_url.version, service_url.group)
- second_dict = self.__service_provides.get(interface)
+ second_dict = self._service_provides.get(interface)
if second_dict:
# 获取最内层的nest的dict
inner_dict = second_dict.get(key)
@@ -100,9 +108,9 @@
second_dict[key] = {service_url.location: service_url}
else:
# create the second dict
- self.__service_provides[interface] = {key: {service_url.location: service_url}}
+ self._service_provides[interface] = {key: {service_url.location: service_url}}
- def __do_event(self, event):
+ def _do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
@@ -113,6 +121,28 @@
children = self.__zk.get_children(event.path, watch=self.__event_listener)
self.__handler_nodes(provide_name, children)
+ def register(self, interface, **kwargs):
+ ip = self.__zk._connection._socket.getsockname()[0]
+ params = {
+ 'interface': interface,
+ 'application': self._app_config.name,
+ 'application.version': self._app_config.version,
+ 'category': 'consumer',
+ 'dubbo': 'dubbo-client-py-1.0.0',
+ 'environment': self._app_config.environment,
+ 'interface': interface,
+ 'method': '',
+ 'owner': self._app_config.owner,
+ 'side': 'consumer',
+ 'pid': os.getpid(),
+ 'version': '1.0'
+ }
+ url = 'consumer://{0}/{1}?{2}'.format(ip, interface, urllib.urlencode(params))
+ # print urllib.quote(url, safe='')
+
+ consumer_path = '{0}/{1}/{2}'.format('dubbo', interface, 'consumers')
+ self.__zk.create(consumer_path + '/' + urllib.quote(url, safe=''), ephemeral=True)
+
def subscribe(self, interface, **kwargs):
"""
监听注册中心的服务上下线
@@ -136,10 +166,16 @@
group = kwargs.get('group', '')
version = kwargs.get('version', '')
key = self.__to_key(interface, version, group)
- second = self.__service_provides.get(interface, {})
+ second = self._service_provides.get(interface, {})
return second.get(key, {})
if __name__ == '__main__':
- pass
-
+ zk = KazooClient(hosts='192.168.59.103:2181')
+ zk.start()
+ parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', 'consumers')
+ nodes = zk.get_children(parent_node)
+ for child_node in nodes:
+ node = urllib.unquote(child_node).decode('utf8')
+ print node
+ # zk.delete(parent_node+'/'+child_node, recursive=True)
diff --git a/dubbo_client/rpclib.py b/dubbo_client/rpclib.py
index 61b8a38..1dc3cd5 100644
--- a/dubbo_client/rpclib.py
+++ b/dubbo_client/rpclib.py
@@ -30,6 +30,7 @@
self.group = kwargs.get('group', '')
self.version = kwargs.get('version', '')
self.registry.subscribe(interface)
+ self.registry.register(interface)
def call(self, method, *args, **kwargs):
provides = self.registry.get_provides(self.interface, version=self.version, group=self.group)
diff --git a/test_dubbo_client/test_rpclib.py b/test_dubbo_client/test_rpclib.py
index 0168a9c..b20921a 100644
--- a/test_dubbo_client/test_rpclib.py
+++ b/test_dubbo_client/test_rpclib.py
@@ -1,15 +1,16 @@
# coding=utf-8
import time
-from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError
+from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError, ApplicationConfig
__author__ = 'caozupeng'
if __name__ == '__main__':
+ config = ApplicationConfig('test_rpclib')
service_interface = 'com.ofpay.demo.api.UserProvider'
# 该对象较重,有zookeeper的连接,需要保存使用
- registry = ZookeeperRegistry('192.168.59.103:2181')
+ registry = ZookeeperRegistry('192.168.59.103:2181', config)
user_provider = DubboClient(service_interface, registry, version='2.0')
for i in range(1000):
try: