抽离providers到commons对象,注册中心使用线程进行event处理
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
new file mode 100644
index 0000000..ff103ba
--- /dev/null
+++ b/dubbo_client/common.py
@@ -0,0 +1,29 @@
+#encoding=utf-8
+from urlparse import urlparse, parse_qsl
+
+__author__ = 'caozupeng'
+
+
+class ServiceProvider(object):
+    protocol = 'jsonrpc'
+    location = ''  # ip+port
+    path = ''  # like /com.qianmi.dubbo.UserProvider
+    ip = '127.0.0.1'
+    port = '9090'
+
+    def __init__(self, url):
+        result = urlparse(url)
+        self.protocol = result[0]
+        self.location = result[1]
+        self.path = result[2]
+        if self.location.find(':') > -1:
+            self.ip, self.port = result[1].split(':')
+        params = parse_qsl(result[4])
+        for key, value in params:
+            # url has a default.timeout property, but it can not add in python object
+            # so keep the last one
+            pos = key.find('.')
+            if pos > -1:
+                key = key[pos + 1:]
+            print key
+            self.__dict__[key] = value
\ No newline at end of file
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 5be84a7..fe15119 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,9 +1,12 @@
 # coding=utf-8
+import Queue
+from threading import Thread
 import urllib
-from urlparse import urlparse, parse_qsl
 
 from kazoo.protocol.states import KazooState
 
+from dubbo_client.common import ServiceProvider
+
 
 __author__ = 'caozupeng'
 from kazoo.client import KazooClient
@@ -11,6 +14,11 @@
 
 logging.basicConfig()
 zk = KazooClient(hosts='172.19.65.33:2181', read_only=True)
+"""
+所有注册过的服务端将在这里
+格式为{providername:{ip+port:service}}
+providername = group_version_servicename
+"""
 service_provides = {}
 
 
@@ -29,41 +37,16 @@
 zk.add_listener(state_listener)
 
 
-class JsonProvide(object):
-    protocol = 'jsonrpc'
-    location = ''
-    path = ''
-    ip = '127.0.0.1'
-    port = '9090'
-
-    def __init__(self, url):
-        result = urlparse(url)
-        self.protocol = result[0]
-        self.location = result[1]
-        self.path = result[2]
-        if self.location.find(':') > -1:
-            self.ip, self.port = result[1].split(':')
-        params = parse_qsl(result[4])
-        for key, value in params:
-            # url has a default.timeout property, but it can not add in python object
-            # so keep the last one
-            pos = key.find('.')
-            if pos > -1:
-                key = key[pos + 1:]
-            print key
-            self.__dict__[key] = value
-
-
 def node_listener(event):
     print event
+    event_queue.put(event)
 
 
-def add_provider_listener(provide_name):
-    children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
-    for child_node in children:
+def handler_urls(urls):
+    for child_node in urls:
         url = urllib.unquote(child_node).decode('utf8')
         if url.startswith('jsonrpc'):
-            provide = JsonProvide(url)
+            provide = ServiceProvider(url)
             service_key = service_provides.get(provide.interface)
             if service_key:
                 service_key[provide.location] = provide
@@ -71,15 +54,52 @@
                 service_provides[provide.interface] = {provide.location: provide}
 
 
+def add_provider_listener(provide_name):
+    #如果已经存在,首先删除原有的服务的集合
+    if provide_name in service_provides:
+        del service_provides[provide_name]
+    children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
+    #全部重新添加
+    handler_urls(children)
+
+
+num_worker_threads = 2
+
+
+def worker():
+    while True:
+        event = event_queue.get()
+        do_event(event)
+        event_queue.task_done()
+
+
+event_queue = Queue.Queue()
+
+
+def do_event(event):
+    # event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
+    # 如果要删除,必须先把/dubbo/和最后的/providers去掉
+    provide_name = event.path[7:event.path.rfind('/')]
+    if provide_name in service_provides:
+        del service_provides[provide_name]
+    if event.state == 'CONNECTED':
+        children = zk.get_children(event.path, watch=node_listener)
+        handler_urls(children)
+    if event.state == 'DELETED':
+        children = zk.get_children(event.path, watch=node_listener)
+        handler_urls(children)
+
+for i in range(num_worker_threads):
+    t = Thread(target=worker)
+    t.daemon = False
+    t.start()
+
+
 zk.start()
+event_queue.join()
 
 if __name__ == '__main__':
     zk.start()
     if zk.exists("/dubbo"):
-        # Print the version of a node and its data
-        # children = zk.get_children("/dubbo")
-        # print "There are {0} children".format(len(children))
-        # for node in children:
-        # print node
         add_provider_listener('com.ofpay.demo.api.UserProvider')
 
diff --git a/dubbo_client/rpc.py b/dubbo_client/rpc.py
index ed24b31..01d4e17 100644
--- a/dubbo_client/rpc.py
+++ b/dubbo_client/rpc.py
@@ -24,7 +24,7 @@
 
 
 class DubboClient(object):
-    clients = []
+    interface = ''
 
     class _Method(object):
 
@@ -36,14 +36,16 @@
             return self.client_instance.call(self.method, *args, **kwargs)
 
     def __init__(self, interface):
+        self.interface = interface
         add_provider_listener(interface)
-        provides = service_provides.get(interface, ())
-        if len(provides) > 0:
-            for location, provide in provides.items():
-                self.clients.append(HttpClient(url="http://{0}{1}".format(location, provide.path)))
 
     def call(self, method, *args, **kwargs):
-        client = random.choice(self.clients)
+        provides = service_provides.get(self.interface, ())
+        if len(provides) == 0:
+            return None
+        location, provide = random.choice(provides.items())
+        print 'location is {0}'.format(location)
+        client = HttpClient(url="http://{0}{1}".format(location, provide.path))
         return client.call(method, *args, **kwargs)
 
     def __call__(self, method, *args, **kwargs):
diff --git a/dubbo_client/tools.py b/dubbo_client/tools.py
deleted file mode 100644
index bfc1c30..0000000
--- a/dubbo_client/tools.py
+++ /dev/null
@@ -1,2 +0,0 @@
-#encoding=utf-8
-__author__ = 'caozupeng'
diff --git a/test_dubbo_client/test_rpclib.py b/test_dubbo_client/test_rpclib.py
index 4649c56..6bf7e04 100644
--- a/test_dubbo_client/test_rpclib.py
+++ b/test_dubbo_client/test_rpclib.py
@@ -1,13 +1,19 @@
 # coding=utf-8
+import time
+
 from dubbo_client import DubboClient
 
+
 __author__ = 'caozupeng'
 
 if __name__ == '__main__':
     service_interface = 'com.ofpay.demo.api.UserProvider'
     dubbo_client = DubboClient(service_interface)
-    print dubbo_client.getUser('A003')
-    print dubbo_client.queryUser(
-        {u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
-    print dubbo_client.queryAll()
-    print dubbo_client.isLimit('MAN', 'Joe')
\ No newline at end of file
+    for i in range(1000):
+        print dubbo_client.getUser('A003')
+        print dubbo_client.queryUser(
+            {u'age': 18, u'time': 1428463514153, u'sex': u'MAN', u'id': u'A003', u'name': u'zhangsan'})
+        print dubbo_client.queryAll()
+        print dubbo_client.isLimit('MAN', 'Joe')
+        print dubbo_client('getUser', 'A005')
+        time.sleep(5)