抽离providers到commons对象,注册中心使用线程进行event处理
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
new file mode 100644
index 0000000..ff103ba
--- /dev/null
+++ b/dubbo_client/common.py
@@ -0,0 +1,29 @@
+#encoding=utf-8
+from urlparse import urlparse, parse_qsl
+
+__author__ = 'caozupeng'
+
+
+class ServiceProvider(object):
+ protocol = 'jsonrpc'
+ location = '' # ip+port
+ path = '' # like /com.qianmi.dubbo.UserProvider
+ ip = '127.0.0.1'
+ port = '9090'
+
+ def __init__(self, url):
+ result = urlparse(url)
+ self.protocol = result[0]
+ self.location = result[1]
+ self.path = result[2]
+ if self.location.find(':') > -1:
+ self.ip, self.port = result[1].split(':')
+ params = parse_qsl(result[4])
+ for key, value in params:
+ # url has a default.timeout property, but it can not add in python object
+ # so keep the last one
+ pos = key.find('.')
+ if pos > -1:
+ key = key[pos + 1:]
+ print key
+ self.__dict__[key] = value
\ No newline at end of file
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 5be84a7..fe15119 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,9 +1,12 @@
# coding=utf-8
+import Queue
+from threading import Thread
import urllib
-from urlparse import urlparse, parse_qsl
from kazoo.protocol.states import KazooState
+from dubbo_client.common import ServiceProvider
+
__author__ = 'caozupeng'
from kazoo.client import KazooClient
@@ -11,6 +14,11 @@
logging.basicConfig()
zk = KazooClient(hosts='172.19.65.33:2181', read_only=True)
+"""
+所有注册过的服务端将在这里
+格式为{providername:{ip+port:service}}
+providername = group_version_servicename
+"""
service_provides = {}
@@ -29,41 +37,16 @@
zk.add_listener(state_listener)
-class JsonProvide(object):
- protocol = 'jsonrpc'
- location = ''
- path = ''
- ip = '127.0.0.1'
- port = '9090'
-
- def __init__(self, url):
- result = urlparse(url)
- self.protocol = result[0]
- self.location = result[1]
- self.path = result[2]
- if self.location.find(':') > -1:
- self.ip, self.port = result[1].split(':')
- params = parse_qsl(result[4])
- for key, value in params:
- # url has a default.timeout property, but it can not add in python object
- # so keep the last one
- pos = key.find('.')
- if pos > -1:
- key = key[pos + 1:]
- print key
- self.__dict__[key] = value
-
-
def node_listener(event):
print event
+ event_queue.put(event)
-def add_provider_listener(provide_name):
- children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
- for child_node in children:
+def handler_urls(urls):
+ for child_node in urls:
url = urllib.unquote(child_node).decode('utf8')
if url.startswith('jsonrpc'):
- provide = JsonProvide(url)
+ provide = ServiceProvider(url)
service_key = service_provides.get(provide.interface)
if service_key:
service_key[provide.location] = provide
@@ -71,15 +54,52 @@
service_provides[provide.interface] = {provide.location: provide}
+def add_provider_listener(provide_name):
+ #如果已经存在,首先删除原有的服务的集合
+ if provide_name in service_provides:
+ del service_provides[provide_name]
+ children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
+ #全部重新添加
+ handler_urls(children)
+
+
+num_worker_threads = 2
+
+
+def worker():
+ while True:
+ event = event_queue.get()
+ do_event(event)
+ event_queue.task_done()
+
+
+event_queue = Queue.Queue()
+
+
+def do_event(event):
+ # event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
+ # 如果要删除,必须先把/dubbo/和最后的/providers去掉
+ provide_name = event.path[7:event.path.rfind('/')]
+ if provide_name in service_provides:
+ del service_provides[provide_name]
+ if event.state == 'CONNECTED':
+ children = zk.get_children(event.path, watch=node_listener)
+ handler_urls(children)
+ if event.state == 'DELETED':
+ children = zk.get_children(event.path, watch=node_listener)
+ handler_urls(children)
+
+for i in range(num_worker_threads):
+ t = Thread(target=worker)
+ t.daemon = False
+ t.start()
+
+
zk.start()
+event_queue.join()
if __name__ == '__main__':
zk.start()
if zk.exists("/dubbo"):
- # Print the version of a node and its data
- # children = zk.get_children("/dubbo")
- # print "There are {0} children".format(len(children))
- # for node in children:
- # print node
add_provider_listener('com.ofpay.demo.api.UserProvider')
diff --git a/dubbo_client/rpc.py b/dubbo_client/rpc.py
index ed24b31..01d4e17 100644
--- a/dubbo_client/rpc.py
+++ b/dubbo_client/rpc.py
@@ -24,7 +24,7 @@
class DubboClient(object):
- clients = []
+ interface = ''
class _Method(object):
@@ -36,14 +36,16 @@
return self.client_instance.call(self.method, *args, **kwargs)
def __init__(self, interface):
+ self.interface = interface
add_provider_listener(interface)
- provides = service_provides.get(interface, ())
- if len(provides) > 0:
- for location, provide in provides.items():
- self.clients.append(HttpClient(url="http://{0}{1}".format(location, provide.path)))
def call(self, method, *args, **kwargs):
- client = random.choice(self.clients)
+ provides = service_provides.get(self.interface, ())
+ if len(provides) == 0:
+ return None
+ location, provide = random.choice(provides.items())
+ print 'location is {0}'.format(location)
+ client = HttpClient(url="http://{0}{1}".format(location, provide.path))
return client.call(method, *args, **kwargs)
def __call__(self, method, *args, **kwargs):
diff --git a/dubbo_client/tools.py b/dubbo_client/tools.py
deleted file mode 100644
index bfc1c30..0000000
--- a/dubbo_client/tools.py
+++ /dev/null
@@ -1,2 +0,0 @@
-#encoding=utf-8
-__author__ = 'caozupeng'
diff --git a/test_dubbo_client/test_rpclib.py b/test_dubbo_client/test_rpclib.py
index 4649c56..6bf7e04 100644
--- a/test_dubbo_client/test_rpclib.py
+++ b/test_dubbo_client/test_rpclib.py
@@ -1,13 +1,19 @@
# coding=utf-8
+import time
+
from dubbo_client import DubboClient
+
__author__ = 'caozupeng'
if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
dubbo_client = DubboClient(service_interface)
- print dubbo_client.getUser('A003')
- print dubbo_client.queryUser(
- {u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
- print dubbo_client.queryAll()
- print dubbo_client.isLimit('MAN', 'Joe')
\ No newline at end of file
+ for i in range(1000):
+ print dubbo_client.getUser('A003')
+ print dubbo_client.queryUser(
+ {u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
+ print dubbo_client.queryAll()
+ print dubbo_client.isLimit('MAN', 'Joe')
+ print dubbo_client('getUser', 'A005')
+ time.sleep(5)