添加group和version的支持
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
index d2945ce..3633ac8 100644
--- a/dubbo_client/common.py
+++ b/dubbo_client/common.py
@@ -10,6 +10,8 @@
path = '' # like /com.qianmi.dubbo.UserProvider
ip = '127.0.0.1'
port = '9090'
+ version = ''
+ group = ''
def __init__(self, url):
result = urlparse(url)
diff --git a/dubbo_client/config.py b/dubbo_client/config.py
index 681fccd..880ec2b 100644
--- a/dubbo_client/config.py
+++ b/dubbo_client/config.py
@@ -1 +1,8 @@
+# coding=utf-8
__author__ = 'caozupeng'
+
+
+class ReferenceConfig(object):
+ registry = None
+ interface = ''
+ version = ''
\ No newline at end of file
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 5ea57a0..6aa1b9e 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -2,8 +2,10 @@
__author__ = 'caozupeng'
import urllib
+
from kazoo.protocol.states import KazooState
from kazoo.client import KazooClient
+
from dubbo_client.common import ServiceURL
@@ -16,7 +18,7 @@
"""
pass
- def get_provides(self, provide_name, default=None):
+ def get_provides(self, provide_name, default=None, **kwargs):
"""
获取已经注册的服务URL对象
:param provide_name: com.ofpay.demo.api.UserProvider
@@ -29,8 +31,11 @@
class ZookeeperRegistry(Registry):
"""
所有注册过的服务端将在这里
- 格式为{providername:{ip+port:service}}
- providername = group_version_servicename
+ interface=com.ofpay.demo.DemoService
+ location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
+ providername = servicename|version|group
+ dict 格式为{interface:{providername:{ip+port:service_url}}}
+
"""
__service_provides = {}
__connect_state = 'UNCONNECT'
@@ -61,57 +66,71 @@
"""
self.__do_event(event)
- def __handler_nodes(self, nodes):
+ def __to_key(self, interface, versioin, group):
+ return '{0}|{1}|{2}'.format(interface, versioin, group)
+
+ def __handler_nodes(self, interface, nodes):
"""
将zookeeper中查询到的服务节点列表加入到一个dict中
:param nodes: 节点列表
:return: 不需要返回
"""
+ # 如果已经存在,首先删除原有的服务的集合
+ if interface in self.__service_provides:
+ del self.__service_provides[interface]
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
- service_key = self.__service_provides.get(service_url.interface)
- if service_key:
- service_key[service_url.location] = service_url
+ key = self.__to_key(service_url.interface, service_url.version, service_url.group)
+ second_dict = self.__service_provides.get(interface)
+ if second_dict:
+ # 获取最内层的nest的dict
+ inner_dict = second_dict.get(key)
+ if inner_dict:
+ inner_dict[service_url.location] = service_url
+ else:
+ second_dict[key] = {service_url.location: service_url}
else:
- self.__service_provides[service_url.interface] = {service_url.location: service_url}
+ # create the second dict
+ self.__service_provides[interface] = {key: {service_url.location: service_url}}
def __do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
provide_name = event.path[7:event.path.rfind('/')]
- if provide_name in self.__service_provides:
- del self.__service_provides[provide_name]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
- self.__handler_nodes(children)
+ self.__handler_nodes(provide_name, children)
if event.state == 'DELETED':
children = self.__zk.get_children(event.path, watch=self.__event_listener)
- self.__handler_nodes(children)
+ self.__handler_nodes(provide_name, children)
- def add_provider_listener(self, provide_name):
+ def add_provider_listener(self, interface, **kwargs):
"""
监听注册中心的服务上下线
- :param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
+ :param interface: 类似com.ofpay.demo.api.UserProvider这样的服务名
:return: 无返回
"""
- # 如果已经存在,首先删除原有的服务的集合
- if provide_name in self.__service_provides:
- del self.__service_provides[provide_name]
- children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'),
+ version = kwargs.get('version', '')
+ group = kwargs.get('group', '')
+ children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.__event_listener)
# 全部重新添加
- self.__handler_nodes(children)
+ self.__handler_nodes(interface, children)
- def get_provides(self, provide_name, default=None):
+ def get_provides(self, interface, default=None, **kwargs):
"""
获取已经注册的服务URL对象
- :param provide_name: com.ofpay.demo.api.UserProvider
+ :param interface: com.ofpay.demo.api.UserProvider
:param default:
:return: 返回一个dict的服务集合
"""
- return self.__service_provides.get(provide_name, default)
+ group = kwargs.get('group', '')
+ version = kwargs.get('version', '')
+ key = self.__to_key(interface, version, group)
+ second = self.__service_provides.get(interface, {})
+ return second.get(key, default)
if __name__ == '__main__':
diff --git a/dubbo_client/rpclib.py b/dubbo_client/rpclib.py
index 001bfb4..30f2b54 100644
--- a/dubbo_client/rpclib.py
+++ b/dubbo_client/rpclib.py
@@ -1,13 +1,19 @@
# coding=utf-8
import random
from urllib2 import HTTPError
+
from pyjsonrpc import HttpClient, JsonRpcError
+
from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors, InternalError
+
+
__author__ = 'caozupeng'
class DubboClient(object):
interface = ''
+ group = ''
+ version = ''
class _Method(object):
@@ -18,17 +24,20 @@
def __call__(self, *args, **kwargs):
return self.client_instance.call(self.method, *args, **kwargs)
- def __init__(self, interface, registry):
+ def __init__(self, interface, registry, **kwargs):
self.interface = interface
self.registry = registry
+ self.group = kwargs.get('group', '')
+ self.version = kwargs.get('version', '')
self.registry.add_provider_listener(interface)
def call(self, method, *args, **kwargs):
- provides = self.registry.get_provides(self.interface, {})
+ provides = self.registry.get_provides(self.interface, {}, version=self.version, group=self.group)
if len(provides) == 0:
raise NoProvider('can not find provide', self.interface)
- location, provide = random.choice(provides.items())
- client = HttpClient(url="http://{0}{1}".format(location, provide.path))
+ ip_port, service_url = random.choice(provides.items())
+ print service_url.location
+ client = HttpClient(url="http://{0}{1}".format(ip_port, service_url.path))
try:
return client.call(method, *args, **kwargs)
except HTTPError, e:
@@ -48,7 +57,6 @@
"""
Allows the usage of attributes as *method* names.
"""
-
return self._Method(client_instance=self, method=method)
diff --git a/test_dubbo_client/test_rawclient.py b/test_dubbo_client/test_rawclient.py
new file mode 100644
index 0000000..179644b
--- /dev/null
+++ b/test_dubbo_client/test_rawclient.py
@@ -0,0 +1,16 @@
+from pyjsonrpc import HttpClient
+
+__author__ = 'caozupeng'
+
+def test_client_every_new():
+ user_provider = HttpClient(url="http://{0}{1}".format('172.19.3.111:38081/', 'com.ofpay.demo.api.UserProvider2'))
+ print user_provider.getUser('A003')
+ print user_provider.queryUser(
+ {u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
+ print user_provider.queryAll()
+ print user_provider.isLimit('MAN', 'Joe')
+ print user_provider('getUser', 'A005')
+
+
+if __name__ == '__main__':
+ test_client_every_new()
\ No newline at end of file
diff --git a/test_dubbo_client/test_rpclib.py b/test_dubbo_client/test_rpclib.py
index 0db9b5d..d8b673e 100644
--- a/test_dubbo_client/test_rpclib.py
+++ b/test_dubbo_client/test_rpclib.py
@@ -1,16 +1,16 @@
# coding=utf-8
import time
-from user_provider import DubboClient, DubboClientError
-from user_provider import ZookeeperRegistry
+from dubbo_client import ZookeeperRegistry, DubboClient, DubboClientError
__author__ = 'caozupeng'
if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
+ # 该对象较重,有zookeeper的连接,需要保存使用
registry = ZookeeperRegistry('172.19.65.33:2181')
- user_provider = DubboClient(service_interface, registry)
+ user_provider = DubboClient(service_interface, registry, version='3.0')
for i in range(1000):
try:
print user_provider.getUser('A003')