Merge pull request #4 from jingpeicomp/master

支持权重和provider disable
diff --git a/.gitignore b/.gitignore
index 2f13ce3..3cb9027 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
 *.pyc
-build
-*.egg-info
\ No newline at end of file
+.idea
+*.log
\ No newline at end of file
diff --git a/dist/dubbo-client-1.0.0b2.tar.gz b/dist/dubbo-client-1.0.0b2.tar.gz
deleted file mode 100644
index a889cff..0000000
--- a/dist/dubbo-client-1.0.0b2.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b3.tar.gz b/dist/dubbo-client-1.0.0b3.tar.gz
deleted file mode 100644
index 3f611d6..0000000
--- a/dist/dubbo-client-1.0.0b3.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b4.tar.gz b/dist/dubbo-client-1.0.0b4.tar.gz
deleted file mode 100644
index acfa439..0000000
--- a/dist/dubbo-client-1.0.0b4.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b5.tar.gz b/dist/dubbo-client-1.0.0b5.tar.gz
deleted file mode 100644
index 5c0a555..0000000
--- a/dist/dubbo-client-1.0.0b5.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b2-py2.7.egg b/dist/dubbo_client-1.0.0b2-py2.7.egg
deleted file mode 100644
index f6f452f..0000000
--- a/dist/dubbo_client-1.0.0b2-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b3-py2.7.egg b/dist/dubbo_client-1.0.0b3-py2.7.egg
deleted file mode 100644
index 1a1a312..0000000
--- a/dist/dubbo_client-1.0.0b3-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b4-py2.7.egg b/dist/dubbo_client-1.0.0b4-py2.7.egg
deleted file mode 100644
index a6d546a..0000000
--- a/dist/dubbo_client-1.0.0b4-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b5-py2.7.egg b/dist/dubbo_client-1.0.0b5-py2.7.egg
deleted file mode 100644
index de72a93..0000000
--- a/dist/dubbo_client-1.0.0b5-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
index bc03749..a015eb6 100644
--- a/dubbo_client/common.py
+++ b/dubbo_client/common.py
@@ -12,6 +12,10 @@
     port = '9090'
     version = ''
     group = ''
+    disabled = False
+    weight = 100
+    has_disable_value = False
+    has_weight_value = False
 
     def __init__(self, url):
         result = urlparse(url)
@@ -28,4 +32,50 @@
             if pos > -1:
                 key = key[pos + 1:]
             # print key
+            if key == 'disabled':
+                value = value.lower() == 'true' if value else False
+                self.has_disable_value = True
+            elif key == 'weight':
+                value = int(value) if value else 100
+                self.has_weight_value = True
             self.__dict__[key] = value
+
+    def __repr__(self):
+        return str(self.__dict__)
+
+    def init_default_config(self):
+        """
+        恢复默认设置,dubbo配置是覆盖形式,如果恢复默认值,那么configurators下的配置会被清空
+        :return:
+        """
+        self.disabled = False
+        self.weight = 100
+
+    def set_config(self, url_list):
+        """
+        设置自定义dubbo配置
+        :param url_list:
+        :return:
+        """
+        if not url_list:
+            return
+
+        param_list = []
+        for configuration_url in url_list:
+            result = urlparse(configuration_url)
+            params = parse_qsl(result[4])
+            param_list.extend(params)
+        has_disable_value = False
+        has_weight_value = False
+        for key, value in param_list:
+            if key == 'disabled':
+                self.disabled = value.lower() == 'true' if value else False
+                has_disable_value = True
+            if key == 'weight':
+                self.weight = int(value) if value else 100
+                has_weight_value = True
+
+        if not has_disable_value:
+            self.disabled = False
+        if not has_weight_value:
+            self.weight = 100
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index c3b2671..6a09ba5 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,20 +1,21 @@
 # coding=utf-8
-import time
+import logging.config
 import os
+import os.path
+import random
 import socket
 import struct
-from threading import Thread
+import threading
+import time
 import urllib
-import logging
-import logging.config
-import os.path
-
-from kazoo.protocol.states import KazooState
+from threading import Thread
 
 from kazoo.client import KazooClient
+from kazoo.protocol.states import KazooState
 
-from dubbo_client.config import ApplicationConfig
 from dubbo_client.common import ServiceURL
+from dubbo_client.config import ApplicationConfig
+from dubbo_client.rpcerror import NoProvider
 
 __author__ = 'caozupeng'
 
@@ -35,7 +36,10 @@
     dict 格式为{interface:{providername:{ip+port:service_url}}}
 
     """
-    _service_provides = {}
+
+    def __init__(self):
+        self._service_providers = {}
+        self._mutex = threading.Lock()
 
     def _do_event(self, event):
         """
@@ -71,7 +75,7 @@
         """
         pass
 
-    def get_provides(self, interface, **kwargs):
+    def get_providers(self, interface, **kwargs):
         """
         获取已经注册的服务URL对象
         :param interface: com.ofpay.demo.api.UserProvider
@@ -84,6 +88,40 @@
         second = self._service_provides.get(interface, {})
         return second.get(key, {})
 
+    def get_random_provider(self, interface, **kwargs):
+        """
+        根据权重和是否禁用获取一个provider
+        :param interface:
+        :param kwargs:
+        :return:
+        """
+        group = kwargs.get('group', '')
+        version = kwargs.get('version', '')
+        key = self._to_key(interface, version, group)
+        second_dict = self._service_providers.get(interface, {})
+        service_url_list = [service_url for service_url in second_dict.get(key, {}).itervalues() if
+                            not service_url.disabled and service_url.weight > 0]
+        if not service_url_list:
+            raise NoProvider('can not find provider', interface)
+
+        total_weight = 0
+        same_weight = True
+        last_service_url = None
+        for service_url in service_url_list:
+            total_weight += service_url.weight
+            if same_weight and last_service_url and last_service_url.weight != service_url.weight:
+                same_weight = False
+            last_service_url = service_url
+
+        if total_weight > 0 and not same_weight:
+            offset = random.randint(0, total_weight - 1)
+            for service_url in service_url_list:
+                offset -= service_url.weight
+                if offset < 0:
+                    return service_url
+
+        return random.choice(service_url_list)
+
     def event_listener(self, event):
         """
         node provides上下线的监听回调函数
@@ -100,19 +138,19 @@
         """
         self._do_config_event(event)
 
-    def _to_key(self, interface, versioin, group):
+    def _to_key(self, interface, version, group):
         """
         计算存放在内存中的服务的key,以接口、版本、分组计算
         :param interface: 接口 类似com.ofpay.demo.DemoProvider
-        :param versioin: 版本 1.0
+        :param version: 版本 1.0
         :param group:  分组 product
         :return: key 字符串
         """
-        return '{0}|{1}|{2}'.format(interface, versioin, group)
+        return '{0}|{1}|{2}'.format(interface, version, group)
 
     def _add_node(self, interface, service_url):
         key = self._to_key(service_url.interface, service_url.version, service_url.group)
-        second_dict = self._service_provides.get(interface)
+        second_dict = self._service_providers.get(interface)
         if second_dict:
             # 获取最内层的nest的dict
             inner_dict = second_dict.get(key)
@@ -122,11 +160,11 @@
                 second_dict[key] = {service_url.location: service_url}
         else:
             # create the second dict
-            self._service_provides[interface] = {key: {service_url.location: service_url}}
+            self._service_providers[interface] = {key: {service_url.location: service_url}}
 
     def _remove_node(self, interface, service_url):
         key = self._to_key(service_url.interface, service_url.version, service_url.group)
-        second_dict = self._service_provides.get(interface)
+        second_dict = self._service_providers.get(interface)
         if second_dict:
             inner_dict = second_dict.get(key)
             if inner_dict:
@@ -144,16 +182,59 @@
         :param nodes: 节点列表
         :return: 不需要返回
         """
-        # 如果已经存在,首先删除原有的服务的集合
-        if interface in self._service_provides:
-            del self._service_provides[interface]
-            logger.debug("delete node {0}".format(interface))
-        for child_node in nodes:
-            node = urllib.unquote(child_node).decode('utf8')
-            logger.debug('child of node is {0}'.format(node))
-            if node.startswith('jsonrpc'):
-                service_url = ServiceURL(node)
-                self._add_node(interface, service_url)
+        if self._mutex.acquire():
+            # 存在并发问题,需要线程锁
+            try:
+                # 如果已经存在,首先删除原有的服务的集合
+                if interface in self._service_providers:
+                    del self._service_providers[interface]
+                    logger.debug("delete node {0}".format(interface))
+                for child_node in nodes:
+                    node = urllib.unquote(child_node).decode('utf8')
+                    logger.debug('child of node is {0}'.format(node))
+                    if node.startswith('jsonrpc'):
+                        service_url = ServiceURL(node)
+                        self._add_node(interface, service_url)
+            except Exception as e:
+                logger.warn('swap json-rpc provider error %s', str(e))
+            finally:
+                self._mutex.release()
+
+    def _set_provider_configuration(self, interface, nodes):
+        """
+        设置provider配置
+        :param interface:
+        :param nodes:
+        :return:
+        """
+        if not nodes:
+            return
+        try:
+            configuration_dict = {}
+            for _child_node in nodes:
+                _node = urllib.unquote(_child_node).decode('utf8')
+                if _node.startswith('override'):
+                    service_url = ServiceURL(_node)
+                    key = self._to_key(interface, service_url.version, service_url.group)
+
+                    if key not in configuration_dict:
+                        configuration_dict[key] = {}
+                    if service_url.location not in configuration_dict[key]:
+                        configuration_dict[key][service_url.location] = []
+                    configuration_dict[key][service_url.location].append(_node)
+
+            if interface in self._service_providers:
+                provider_dict = self._service_providers.get(interface)
+                for provider_key, second_dict in provider_dict.iteritems():
+                    for service_location, service_url in second_dict.iteritems():
+                        configuration_service_urls = configuration_dict.get(provider_key, {}).get(service_location)
+                        if not configuration_service_urls:
+                            service_url.init_default_config()
+                        else:
+                            service_url.set_config(configuration_service_urls)
+
+        except Exception as e:
+            logger.warn('set provider configuration error %s', str(e))
 
 
 class ZookeeperRegistry(Registry):
@@ -161,6 +242,7 @@
     _connect_state = 'UNCONNECT'
 
     def __init__(self, zk_hosts, application_config=None):
+        Registry.__init__(self)
         if application_config:
             self._app_config = application_config
         self.__zk = KazooClient(hosts=zk_hosts)
@@ -188,17 +270,27 @@
         # 如果要删除,必须先把/dubbo/和最后的/providers去掉
         # 将zookeeper中查询到的服务节点列表加入到一个dict中
         # zookeeper中保持的节点url类似如下
-        logger.debug("receive event is {0}, event state is {1}".format(event, event.state))
+        logger.info("receive event is {0}, event state is {1}".format(event, event.state))
         provide_name = event.path[7:event.path.rfind('/')]
-        if event.state == 'CONNECTED':
+        if event.state in ['CONNECTED', 'DELETED']:
             children = self.__zk.get_children(event.path, watch=self.event_listener)
             self._compare_swap_nodes(provide_name, self.__unquote(children))
-        if event.state == 'DELETED':
-            children = self.__zk.get_children(event.path, watch=self.event_listener)
-            self._compare_swap_nodes(provide_name, self.__unquote(children))
+            configurators_nodes = self._get_provider_configuration(provide_name)
+            self._set_provider_configuration(provide_name, configurators_nodes)
+        print self._service_providers
 
     def _do_config_event(self, event):
-        print event
+        """
+        zk的目录路径为 /dubbo/com.qianmi.pc.api.es.item.EsGoodsQueryProvider/configurators
+        :param event:
+        :return:
+        """
+        logger.info("receive config event is {0}, event state is {1}".format(event, event.state))
+        provide_name = event.path[7:event.path.rfind('/')]
+        configurators_nodes = self._get_provider_configuration(provide_name)
+        self._set_provider_configuration(provide_name, configurators_nodes)
+
+        print self._service_providers
 
     def register(self, interface, **kwargs):
         ip = self.__zk._connection._socket.getsockname()[0]
@@ -233,11 +325,28 @@
         providers_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
                                                     watch=self.event_listener)
         logger.debug("watch node is {0}".format(providers_children))
-        configurators_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
-                                                        watch=self.configuration_listener)
+        self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
+                               watch=self.configuration_listener)
         # 全部重新添加
         self._compare_swap_nodes(interface, self.__unquote(providers_children))
 
+        configurators_nodes = self._get_provider_configuration(interface)
+        self._set_provider_configuration(interface, configurators_nodes)
+
+    def _get_provider_configuration(self, interface):
+        """
+        获取dubbo自定义配置数据,从"/dubbo/{interface}/configurators" 路径下获取配置
+        :param interface:
+        :return:
+        """
+        try:
+            configurators_nodes = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
+                                                         watch=self.configuration_listener)
+            logger.debug("configurators node is {0}".format(configurators_nodes))
+            return self.__unquote(configurators_nodes)
+        except Exception as e:
+            logger.warn("get provider %s configuration error %s", interface, str(e))
+
 
 class MulticastRegistry(Registry):
     class _Loop(Thread):
@@ -262,6 +371,7 @@
             self.sock.sendto(msg, (self.multicast_group, int(self.multicast_port)))
 
     def __init__(self, address, application_config=None):
+        Registry.__init__(self)
         if application_config:
             self._app_config = application_config
         self.event_loop = self._Loop(address, self.event_listener)
@@ -303,6 +413,6 @@
     # registry = MulticastRegistry('224.5.6.7:1234')
     registry = ZookeeperRegistry('zookeeper:2181')
     registry.subscribe('com.ofpay.demo.api.UserProvider')
-    print registry.get_provides('com.ofpay.demo.api.UserProvider')
+    print registry.get_providers('com.ofpay.demo.api.UserProvider')
 
     time.sleep(500)
diff --git a/dubbo_client/rpclib.py b/dubbo_client/rpclib.py
index e9e49f3..03e3719 100644
--- a/dubbo_client/rpclib.py
+++ b/dubbo_client/rpclib.py
@@ -1,11 +1,10 @@
 # coding=utf-8
-import random
 from urllib2 import HTTPError
 
 from pyjsonrpc import HttpClient, JsonRpcError
 
 from dubbo_client.registry import Registry
-from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors, InternalError, DubboClientError
+from dubbo_client.rpcerror import ConnectionFail, dubbo_client_errors, InternalError, DubboClientError
 
 __author__ = 'caozupeng'
 
@@ -34,12 +33,9 @@
         self.registry.register(interface)
 
     def call(self, method, *args, **kwargs):
-        provides = self.registry.get_provides(self.interface, version=self.version, group=self.group)
-        if len(provides) == 0:
-            raise NoProvider('can not find provide', self.interface)
-        ip_port, service_url = random.choice(provides.items())
+        provider = self.registry.get_random_provider(self.interface, version=self.version, group=self.group)
         # print service_url.location
-        client = HttpClient(url="http://{0}{1}".format(ip_port, service_url.path))
+        client = HttpClient(url="http://{0}{1}".format(provider.location, provider.path))
         try:
             return client.call(method, *args, **kwargs)
         except HTTPError, e:
diff --git a/tests/test_register_config.py b/tests/test_register_config.py
new file mode 100644
index 0000000..57ec186
--- /dev/null
+++ b/tests/test_register_config.py
@@ -0,0 +1,22 @@
+# coding=utf-8
+import time
+
+from dubbo_client import ApplicationConfig
+from dubbo_client import DubboClient, DubboClientError
+from dubbo_client import ZookeeperRegistry
+
+
+def test_config_init():
+    config = ApplicationConfig('test_register_config')
+    service_interface = 'com.ofpay.demo.api.UserProvider'
+    registry = ZookeeperRegistry('172.19.66.49:2181', config)
+    user_provider = DubboClient(service_interface, registry, version='1.0.0')
+    for i in range(10000):
+        try:
+            print user_provider.findOne()
+        except DubboClientError, client_error:
+            print client_error
+        time.sleep(1)
+
+if __name__ == '__main__':
+    test_config_init()
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 9a7ef75..180a00c 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -6,13 +6,13 @@
 def multicat():
     registry = MulticastRegistry('224.5.6.7:1234')
     registry.subscribe('com.ofpay.demo.api.UserProvider')
-    print registry.get_provides('com.ofpay.demo.api.UserProvider')
+    print registry.get_providers('com.ofpay.demo.api.UserProvider')
 
 
 def zookeeper():
     registry = ZookeeperRegistry('172.19.65.33:2181')
     registry.subscribe('com.ofpay.demo.api.UserProvider')
-    print registry.get_provides('com.ofpay.demo.api.UserProvider')
+    print registry.get_providers('com.ofpay.demo.api.UserProvider')
 
 
 def test_registry():
@@ -29,7 +29,7 @@
                        "dubbo=2.4.10&environment=product&interface=com.ofpay.demo.api.UserProvider&"
                        "methods=getUser,queryAll,isLimit,queryUser&owner=wenwu&pid=60402&revision=2.0&"
                        "side=provider&timestamp=1429105028153&version=1.0")
-    assert registry._service_provides
+    assert registry._service_providers
 
 
 if __name__ == '__main__':