Merge pull request #4 from jingpeicomp/master
支持权重和provider disable
diff --git a/.gitignore b/.gitignore
index 2f13ce3..3cb9027 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
*.pyc
-build
-*.egg-info
\ No newline at end of file
+.idea
+*.log
\ No newline at end of file
diff --git a/dist/dubbo-client-1.0.0b2.tar.gz b/dist/dubbo-client-1.0.0b2.tar.gz
deleted file mode 100644
index a889cff..0000000
--- a/dist/dubbo-client-1.0.0b2.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b3.tar.gz b/dist/dubbo-client-1.0.0b3.tar.gz
deleted file mode 100644
index 3f611d6..0000000
--- a/dist/dubbo-client-1.0.0b3.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b4.tar.gz b/dist/dubbo-client-1.0.0b4.tar.gz
deleted file mode 100644
index acfa439..0000000
--- a/dist/dubbo-client-1.0.0b4.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo-client-1.0.0b5.tar.gz b/dist/dubbo-client-1.0.0b5.tar.gz
deleted file mode 100644
index 5c0a555..0000000
--- a/dist/dubbo-client-1.0.0b5.tar.gz
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b2-py2.7.egg b/dist/dubbo_client-1.0.0b2-py2.7.egg
deleted file mode 100644
index f6f452f..0000000
--- a/dist/dubbo_client-1.0.0b2-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b3-py2.7.egg b/dist/dubbo_client-1.0.0b3-py2.7.egg
deleted file mode 100644
index 1a1a312..0000000
--- a/dist/dubbo_client-1.0.0b3-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b4-py2.7.egg b/dist/dubbo_client-1.0.0b4-py2.7.egg
deleted file mode 100644
index a6d546a..0000000
--- a/dist/dubbo_client-1.0.0b4-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dist/dubbo_client-1.0.0b5-py2.7.egg b/dist/dubbo_client-1.0.0b5-py2.7.egg
deleted file mode 100644
index de72a93..0000000
--- a/dist/dubbo_client-1.0.0b5-py2.7.egg
+++ /dev/null
Binary files differ
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
index bc03749..a015eb6 100644
--- a/dubbo_client/common.py
+++ b/dubbo_client/common.py
@@ -12,6 +12,10 @@
port = '9090'
version = ''
group = ''
+ disabled = False
+ weight = 100
+ has_disable_value = False
+ has_weight_value = False
def __init__(self, url):
result = urlparse(url)
@@ -28,4 +32,50 @@
if pos > -1:
key = key[pos + 1:]
# print key
+ if key == 'disabled':
+ value = value.lower() == 'true' if value else False
+ self.has_disable_value = True
+ elif key == 'weight':
+ value = int(value) if value else 100
+ self.has_weight_value = True
self.__dict__[key] = value
+
+ def __repr__(self):
+ return str(self.__dict__)
+
+ def init_default_config(self):
+ """
+ 恢复默认设置,dubbo配置是覆盖形式,如果恢复默认值,那么configurators下的配置会被清空
+ :return:
+ """
+ self.disabled = False
+ self.weight = 100
+
+ def set_config(self, url_list):
+ """
+ 设置自定义dubbo配置
+ :param url_list:
+ :return:
+ """
+ if not url_list:
+ return
+
+ param_list = []
+ for configuration_url in url_list:
+ result = urlparse(configuration_url)
+ params = parse_qsl(result[4])
+ param_list.extend(params)
+ has_disable_value = False
+ has_weight_value = False
+ for key, value in param_list:
+ if key == 'disabled':
+ self.disabled = value.lower() == 'true' if value else False
+ has_disable_value = True
+ if key == 'weight':
+ self.weight = int(value) if value else 100
+ has_weight_value = True
+
+ if not has_disable_value:
+ self.disabled = False
+ if not has_weight_value:
+ self.weight = 100
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index c3b2671..6a09ba5 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,20 +1,21 @@
# coding=utf-8
-import time
+import logging.config
import os
+import os.path
+import random
import socket
import struct
-from threading import Thread
+import threading
+import time
import urllib
-import logging
-import logging.config
-import os.path
-
-from kazoo.protocol.states import KazooState
+from threading import Thread
from kazoo.client import KazooClient
+from kazoo.protocol.states import KazooState
-from dubbo_client.config import ApplicationConfig
from dubbo_client.common import ServiceURL
+from dubbo_client.config import ApplicationConfig
+from dubbo_client.rpcerror import NoProvider
__author__ = 'caozupeng'
@@ -35,7 +36,10 @@
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
- _service_provides = {}
+
+ def __init__(self):
+ self._service_providers = {}
+ self._mutex = threading.Lock()
def _do_event(self, event):
"""
@@ -71,7 +75,7 @@
"""
pass
- def get_provides(self, interface, **kwargs):
+ def get_providers(self, interface, **kwargs):
"""
获取已经注册的服务URL对象
:param interface: com.ofpay.demo.api.UserProvider
@@ -84,6 +88,40 @@
second = self._service_provides.get(interface, {})
return second.get(key, {})
+ def get_random_provider(self, interface, **kwargs):
+ """
+ 根据权重和是否禁用获取一个provider
+ :param interface:
+ :param kwargs:
+ :return:
+ """
+ group = kwargs.get('group', '')
+ version = kwargs.get('version', '')
+ key = self._to_key(interface, version, group)
+ second_dict = self._service_providers.get(interface, {})
+ service_url_list = [service_url for service_url in second_dict.get(key, {}).itervalues() if
+ not service_url.disabled and service_url.weight > 0]
+ if not service_url_list:
+ raise NoProvider('can not find provider', interface)
+
+ total_weight = 0
+ same_weight = True
+ last_service_url = None
+ for service_url in service_url_list:
+ total_weight += service_url.weight
+ if same_weight and last_service_url and last_service_url.weight != service_url.weight:
+ same_weight = False
+ last_service_url = service_url
+
+ if total_weight > 0 and not same_weight:
+ offset = random.randint(0, total_weight - 1)
+ for service_url in service_url_list:
+ offset -= service_url.weight
+ if offset < 0:
+ return service_url
+
+ return random.choice(service_url_list)
+
def event_listener(self, event):
"""
node provides上下线的监听回调函数
@@ -100,19 +138,19 @@
"""
self._do_config_event(event)
- def _to_key(self, interface, versioin, group):
+ def _to_key(self, interface, version, group):
"""
计算存放在内存中的服务的key,以接口、版本、分组计算
:param interface: 接口 类似com.ofpay.demo.DemoProvider
- :param versioin: 版本 1.0
+ :param version: 版本 1.0
:param group: 分组 product
:return: key 字符串
"""
- return '{0}|{1}|{2}'.format(interface, versioin, group)
+ return '{0}|{1}|{2}'.format(interface, version, group)
def _add_node(self, interface, service_url):
key = self._to_key(service_url.interface, service_url.version, service_url.group)
- second_dict = self._service_provides.get(interface)
+ second_dict = self._service_providers.get(interface)
if second_dict:
# 获取最内层的nest的dict
inner_dict = second_dict.get(key)
@@ -122,11 +160,11 @@
second_dict[key] = {service_url.location: service_url}
else:
# create the second dict
- self._service_provides[interface] = {key: {service_url.location: service_url}}
+ self._service_providers[interface] = {key: {service_url.location: service_url}}
def _remove_node(self, interface, service_url):
key = self._to_key(service_url.interface, service_url.version, service_url.group)
- second_dict = self._service_provides.get(interface)
+ second_dict = self._service_providers.get(interface)
if second_dict:
inner_dict = second_dict.get(key)
if inner_dict:
@@ -144,16 +182,59 @@
:param nodes: 节点列表
:return: 不需要返回
"""
- # 如果已经存在,首先删除原有的服务的集合
- if interface in self._service_provides:
- del self._service_provides[interface]
- logger.debug("delete node {0}".format(interface))
- for child_node in nodes:
- node = urllib.unquote(child_node).decode('utf8')
- logger.debug('child of node is {0}'.format(node))
- if node.startswith('jsonrpc'):
- service_url = ServiceURL(node)
- self._add_node(interface, service_url)
+ if self._mutex.acquire():
+ # 存在并发问题,需要线程锁
+ try:
+ # 如果已经存在,首先删除原有的服务的集合
+ if interface in self._service_providers:
+ del self._service_providers[interface]
+ logger.debug("delete node {0}".format(interface))
+ for child_node in nodes:
+ node = urllib.unquote(child_node).decode('utf8')
+ logger.debug('child of node is {0}'.format(node))
+ if node.startswith('jsonrpc'):
+ service_url = ServiceURL(node)
+ self._add_node(interface, service_url)
+ except Exception as e:
+ logger.warn('swap json-rpc provider error %s', str(e))
+ finally:
+ self._mutex.release()
+
+ def _set_provider_configuration(self, interface, nodes):
+ """
+ 设置provider配置
+ :param interface:
+ :param nodes:
+ :return:
+ """
+ if not nodes:
+ return
+ try:
+ configuration_dict = {}
+ for _child_node in nodes:
+ _node = urllib.unquote(_child_node).decode('utf8')
+ if _node.startswith('override'):
+ service_url = ServiceURL(_node)
+ key = self._to_key(interface, service_url.version, service_url.group)
+
+ if key not in configuration_dict:
+ configuration_dict[key] = {}
+ if service_url.location not in configuration_dict[key]:
+ configuration_dict[key][service_url.location] = []
+ configuration_dict[key][service_url.location].append(_node)
+
+ if interface in self._service_providers:
+ provider_dict = self._service_providers.get(interface)
+ for provider_key, second_dict in provider_dict.iteritems():
+ for service_location, service_url in second_dict.iteritems():
+ configuration_service_urls = configuration_dict.get(provider_key, {}).get(service_location)
+ if not configuration_service_urls:
+ service_url.init_default_config()
+ else:
+ service_url.set_config(configuration_service_urls)
+
+ except Exception as e:
+ logger.warn('set provider configuration error %s', str(e))
class ZookeeperRegistry(Registry):
@@ -161,6 +242,7 @@
_connect_state = 'UNCONNECT'
def __init__(self, zk_hosts, application_config=None):
+ Registry.__init__(self)
if application_config:
self._app_config = application_config
self.__zk = KazooClient(hosts=zk_hosts)
@@ -188,17 +270,27 @@
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
# 将zookeeper中查询到的服务节点列表加入到一个dict中
# zookeeper中保持的节点url类似如下
- logger.debug("receive event is {0}, event state is {1}".format(event, event.state))
+ logger.info("receive event is {0}, event state is {1}".format(event, event.state))
provide_name = event.path[7:event.path.rfind('/')]
- if event.state == 'CONNECTED':
+ if event.state in ['CONNECTED', 'DELETED']:
children = self.__zk.get_children(event.path, watch=self.event_listener)
self._compare_swap_nodes(provide_name, self.__unquote(children))
- if event.state == 'DELETED':
- children = self.__zk.get_children(event.path, watch=self.event_listener)
- self._compare_swap_nodes(provide_name, self.__unquote(children))
+ configurators_nodes = self._get_provider_configuration(provide_name)
+ self._set_provider_configuration(provide_name, configurators_nodes)
+ print self._service_providers
def _do_config_event(self, event):
- print event
+ """
+ zk的目录路径为 /dubbo/com.qianmi.pc.api.es.item.EsGoodsQueryProvider/configurators
+ :param event:
+ :return:
+ """
+ logger.info("receive config event is {0}, event state is {1}".format(event, event.state))
+ provide_name = event.path[7:event.path.rfind('/')]
+ configurators_nodes = self._get_provider_configuration(provide_name)
+ self._set_provider_configuration(provide_name, configurators_nodes)
+
+ print self._service_providers
def register(self, interface, **kwargs):
ip = self.__zk._connection._socket.getsockname()[0]
@@ -233,11 +325,28 @@
providers_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.event_listener)
logger.debug("watch node is {0}".format(providers_children))
- configurators_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
- watch=self.configuration_listener)
+ self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
+ watch=self.configuration_listener)
# 全部重新添加
self._compare_swap_nodes(interface, self.__unquote(providers_children))
+ configurators_nodes = self._get_provider_configuration(interface)
+ self._set_provider_configuration(interface, configurators_nodes)
+
+ def _get_provider_configuration(self, interface):
+ """
+ 获取dubbo自定义配置数据,从"/dubbo/{interface}/configurators" 路径下获取配置
+ :param interface:
+ :return:
+ """
+ try:
+ configurators_nodes = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
+ watch=self.configuration_listener)
+ logger.debug("configurators node is {0}".format(configurators_nodes))
+ return self.__unquote(configurators_nodes)
+ except Exception as e:
+ logger.warn("get provider %s configuration error %s", interface, str(e))
+
class MulticastRegistry(Registry):
class _Loop(Thread):
@@ -262,6 +371,7 @@
self.sock.sendto(msg, (self.multicast_group, int(self.multicast_port)))
def __init__(self, address, application_config=None):
+ Registry.__init__(self)
if application_config:
self._app_config = application_config
self.event_loop = self._Loop(address, self.event_listener)
@@ -303,6 +413,6 @@
# registry = MulticastRegistry('224.5.6.7:1234')
registry = ZookeeperRegistry('zookeeper:2181')
registry.subscribe('com.ofpay.demo.api.UserProvider')
- print registry.get_provides('com.ofpay.demo.api.UserProvider')
+ print registry.get_providers('com.ofpay.demo.api.UserProvider')
time.sleep(500)
diff --git a/dubbo_client/rpclib.py b/dubbo_client/rpclib.py
index e9e49f3..03e3719 100644
--- a/dubbo_client/rpclib.py
+++ b/dubbo_client/rpclib.py
@@ -1,11 +1,10 @@
# coding=utf-8
-import random
from urllib2 import HTTPError
from pyjsonrpc import HttpClient, JsonRpcError
from dubbo_client.registry import Registry
-from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors, InternalError, DubboClientError
+from dubbo_client.rpcerror import ConnectionFail, dubbo_client_errors, InternalError, DubboClientError
__author__ = 'caozupeng'
@@ -34,12 +33,9 @@
self.registry.register(interface)
def call(self, method, *args, **kwargs):
- provides = self.registry.get_provides(self.interface, version=self.version, group=self.group)
- if len(provides) == 0:
- raise NoProvider('can not find provide', self.interface)
- ip_port, service_url = random.choice(provides.items())
+ provider = self.registry.get_random_provider(self.interface, version=self.version, group=self.group)
# print service_url.location
- client = HttpClient(url="http://{0}{1}".format(ip_port, service_url.path))
+ client = HttpClient(url="http://{0}{1}".format(provider.location, provider.path))
try:
return client.call(method, *args, **kwargs)
except HTTPError, e:
diff --git a/tests/test_register_config.py b/tests/test_register_config.py
new file mode 100644
index 0000000..57ec186
--- /dev/null
+++ b/tests/test_register_config.py
@@ -0,0 +1,22 @@
+# coding=utf-8
+import time
+
+from dubbo_client import ApplicationConfig
+from dubbo_client import DubboClient, DubboClientError
+from dubbo_client import ZookeeperRegistry
+
+
+def test_config_init():
+ config = ApplicationConfig('test_register_config')
+ service_interface = 'com.ofpay.demo.api.UserProvider'
+ registry = ZookeeperRegistry('172.19.66.49:2181', config)
+ user_provider = DubboClient(service_interface, registry, version='1.0.0')
+ for i in range(10000):
+ try:
+ print user_provider.findOne()
+ except DubboClientError, client_error:
+ print client_error
+ time.sleep(1)
+
+if __name__ == '__main__':
+ test_config_init()
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 9a7ef75..180a00c 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -6,13 +6,13 @@
def multicat():
registry = MulticastRegistry('224.5.6.7:1234')
registry.subscribe('com.ofpay.demo.api.UserProvider')
- print registry.get_provides('com.ofpay.demo.api.UserProvider')
+ print registry.get_providers('com.ofpay.demo.api.UserProvider')
def zookeeper():
registry = ZookeeperRegistry('172.19.65.33:2181')
registry.subscribe('com.ofpay.demo.api.UserProvider')
- print registry.get_provides('com.ofpay.demo.api.UserProvider')
+ print registry.get_providers('com.ofpay.demo.api.UserProvider')
def test_registry():
@@ -29,7 +29,7 @@
"dubbo=2.4.10&environment=product&interface=com.ofpay.demo.api.UserProvider&"
"methods=getUser,queryAll,isLimit,queryUser&owner=wenwu&pid=60402&revision=2.0&"
"side=provider×tamp=1429105028153&version=1.0")
- assert registry._service_provides
+ assert registry._service_providers
if __name__ == '__main__':