将与和注册中心类型无关的代码抽象到父类
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 7c42a7d..2ec862a 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,4 +1,6 @@
# coding=utf-8
+import time
+
__author__ = 'caozupeng'
import os
import socket
@@ -16,9 +18,22 @@
class Registry(object):
+ """
+ 所有注册过的服务端将在这里
+ interface=com.ofpay.demo.DemoService
+ location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
+ providername = servicename|version|group
+ dict 格式为{interface:{providername:{ip+port:service_url}}}
+
+ """
_service_provides = {}
def _do_event(self, event):
+ """
+ protect方法,处理回调,留给子类实现
+ :param event:
+ :return:
+ """
pass
def register(self, interface, **kwargs):
@@ -61,12 +76,40 @@
self._do_event(event)
def _to_key(self, interface, versioin, group):
+ """
+ 计算存放在内存中的服务的key,以接口、版本、分组计算
+ :param interface: 接口 类似com.ofpay.demo.DemoProvider
+ :param versioin: 版本 1.0
+ :param group: 分组 product
+ :return: key 字符串
+ """
return '{0}|{1}|{2}'.format(interface, versioin, group)
- def _handler_nodes(self, interface, nodes):
+ def _add_node(self, interface, service_url):
+ key = self._to_key(service_url.interface, service_url.version, service_url.group)
+ second_dict = self._service_provides.get(interface)
+ if second_dict:
+ # 获取最内层的nest的dict
+ inner_dict = second_dict.get(key)
+ if inner_dict:
+ inner_dict[service_url.location] = service_url
+ else:
+ second_dict[key] = {service_url.location: service_url}
+ else:
+ # create the second dict
+ self._service_provides[interface] = {key: {service_url.location: service_url}}
+
+ def _remove_node(self, interface, service_url):
+ key = self._to_key(service_url.interface, service_url.version, service_url.group)
+ second_dict = self._service_provides.get(interface)
+ if second_dict:
+ inner_dict = second_dict.get(key)
+ if inner_dict:
+ del inner_dict[service_url.location]
+
+ def _compare_swap_nodes(self, interface, nodes):
"""
- 将zookeeper中查询到的服务节点列表加入到一个dict中
- zookeeper中保持的节点url类似如下
+ 比较,替换现有内存中的节点信息,节点url类似如下
jsonrpc://192.168.2.1:38080/com.ofpay.demo.api.UserProvider?
anyhost=true&application=demo-provider&default.timeout=10000&dubbo=2.4.10&
environment=product&interface=com.ofpay.demo.api.UserProvider&
@@ -83,29 +126,10 @@
node = urllib.unquote(child_node).decode('utf8')
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
- key = self._to_key(service_url.interface, service_url.version, service_url.group)
- second_dict = self._service_provides.get(interface)
- if second_dict:
- # 获取最内层的nest的dict
- inner_dict = second_dict.get(key)
- if inner_dict:
- inner_dict[service_url.location] = service_url
- else:
- second_dict[key] = {service_url.location: service_url}
- else:
- # create the second dict
- self._service_provides[interface] = {key: {service_url.location: service_url}}
+ self._add_node(interface, service_url)
class ZookeeperRegistry(Registry):
- """
- 所有注册过的服务端将在这里
- interface=com.ofpay.demo.DemoService
- location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
- providername = servicename|version|group
- dict 格式为{interface:{providername:{ip+port:service_url}}}
-
- """
_app_config = ApplicationConfig('default_app')
_connect_state = 'UNCONNECT'
@@ -129,16 +153,21 @@
# print 'connected'
self._connect_state = state
+ def __unquote(self, origin_nodes):
+ return (urllib.unquote(child_node).decode('utf8') for child_node in origin_nodes if child_node)
+
def _do_event(self, event):
# event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
+ # 将zookeeper中查询到的服务节点列表加入到一个dict中
+ # zookeeper中保持的节点url类似如下
provide_name = event.path[7:event.path.rfind('/')]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.event_listener)
- self._handler_nodes(provide_name, children)
+ self._compare_swap_nodes(provide_name, self.__unquote(children))
if event.state == 'DELETED':
children = self.__zk.get_children(event.path, watch=self.event_listener)
- self._handler_nodes(provide_name, children)
+ self._compare_swap_nodes(provide_name, self.__unquote(children))
def register(self, interface, **kwargs):
ip = self.__zk._connection._socket.getsockname()[0]
@@ -160,6 +189,7 @@
# print urllib.quote(url, safe='')
consumer_path = '{0}/{1}/{2}'.format('dubbo', interface, 'consumers')
+ self.__zk.ensure_path(consumer_path)
self.__zk.create(consumer_path + '/' + urllib.quote(url, safe=''), ephemeral=True)
def subscribe(self, interface, **kwargs):
@@ -173,51 +203,61 @@
children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.event_listener)
# 全部重新添加
- self._handler_nodes(interface, children)
+ self._compare_swap_nodes(interface, self.__unquote(children))
class MulticastRegistry(Registry):
-
class _Loop(Thread):
def __init__(self, address, callback):
Thread.__init__(self)
- multicast_group, multicast_port = address.split(':')
+ self.multicast_group, self.multicast_port = address.split(':')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ # in osx we should use SO_REUSEPORT instead of SO_REUSEADDRESS
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('', int(multicast_port)))
- mreq = struct.pack("4sl", socket.inet_aton(multicast_group), socket.INADDR_ANY)
+ self.sock.bind(('', int(self.multicast_port)))
+ mreq = struct.pack("4sl", socket.inet_aton(self.multicast_group), socket.INADDR_ANY)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.callback = callback
def run(self):
while True:
event = self.sock.recv(10240)
- # print event
+ print event
self.callback(event.rstrip())
+ def set_mssage(self, msg):
+ self.sock.sendto(msg, (self.multicast_group, int(self.multicast_port)))
+
+
def __init__(self, address, application_config=None):
if application_config:
self._app_config = application_config
- self._Loop(address, self.event_listener).start()
+ self.event_loop = self._Loop(address, self.event_listener)
+ self.event_loop.setDaemon(True)
+ self.event_loop.start()
+ # self.event_loop.set_mssage('subscribe provider://172.19.3.111:38081/com.ofpay.demo.api.UserProvider?anyhost=true&application=jsonrpcdemo&category=configurators&check=false&default.timeout=10000&dubbo=2.4.10&environment=product&interface=com.ofpay.demo.api.UserProvider&methods=getUser,queryAll,queryUser,isLimit&owner=wenwu&pid=63590&side=provider×tamp=1429149716694')
def _do_event(self, event):
if event.startswith('register'):
url = event[9:]
- # print url
- service_provide = ServiceURL(url)
- self._handler_nodes(service_provide.interface, (url,))
- # print self._service_provides
-
+ if url.startswith('jsonrpc'):
+ service_provide = ServiceURL(url)
+ self._add_node(service_provide.interface, service_provide)
+ if event.startswith('unregister'):
+ url = event[11:]
+ if url.startswith('jsonrpc'):
+ service_provide = ServiceURL(url)
+ self._remove_node(service_provide.interface, service_provide)
if __name__ == '__main__':
# zk = KazooClient(hosts='192.168.59.103:2181')
# zk.start()
- # parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', 'consumers')
+ # parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', '')
# nodes = zk.get_children(parent_node)
# for child_node in nodes:
# node = urllib.unquote(child_node).decode('utf8')
# print node
# zk.delete(parent_node+'/'+child_node, recursive=True)
- # registry = MulticastRegistry('224.5.6.7:1234')
- pass
\ No newline at end of file
+ registry = MulticastRegistry('224.5.6.7:1234')
+ time.sleep(50)
\ No newline at end of file