将Registry代码合并为一个Class
diff --git a/dubbo_client/__init__.py b/dubbo_client/__init__.py
index 7feb134..0d2608a 100644
--- a/dubbo_client/__init__.py
+++ b/dubbo_client/__init__.py
@@ -3,4 +3,9 @@
from rpc import (
DubboClient,
)
-from rpcerror import *
\ No newline at end of file
+from rpcerror import *
+
+from registry import (
+ Registry,
+ ZookeeperRegistry,
+)
\ No newline at end of file
diff --git a/dubbo_client/common.py b/dubbo_client/common.py
index ff103ba..2b49e6c 100644
--- a/dubbo_client/common.py
+++ b/dubbo_client/common.py
@@ -25,5 +25,5 @@
pos = key.find('.')
if pos > -1:
key = key[pos + 1:]
- print key
+ # print key
self.__dict__[key] = value
\ No newline at end of file
diff --git a/dubbo_client/registry.py b/dubbo_client/registry.py
index 025782a..d64be6e 100644
--- a/dubbo_client/registry.py
+++ b/dubbo_client/registry.py
@@ -1,6 +1,4 @@
# coding=utf-8
-import Queue
-from threading import Thread
import urllib
from kazoo.protocol.states import KazooState
@@ -10,96 +8,105 @@
__author__ = 'caozupeng'
from kazoo.client import KazooClient
-import logging
-
-logging.basicConfig()
-zk = KazooClient(hosts='172.19.65.33:2181', read_only=True)
-"""
-所有注册过的服务端将在这里
-格式为{providername:{ip+port:service}}
-providername = group_version_servicename
-"""
-service_provides = {}
-def state_listener(state):
- if state == KazooState.LOST:
- # Register somewhere that the session was lost
- print 'session lost'
- elif state == KazooState.SUSPENDED:
- # Handle being disconnected from Zookeeper
- print 'disconnect from zookeeper'
- else:
- # Handle being connected/reconnected to Zookeeper
- print 'connected'
+class Registry(object):
+ def add_provider_listener(self, provide_name):
+ """
+ 监听注册中心的服务上下线
+ :param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
+ :return: 无返回
+ """
+ pass
+
+ def get_provides(self, provide_name, default=None):
+ """
+ 获取已经注册的服务URL对象
+ :param provide_name: com.ofpay.demo.api.UserProvider
+ :param default:
+ :return: 返回一个dict的服务集合
+ """
+ pass
-zk.add_listener(state_listener)
+class ZookeeperRegistry(Registry):
+ """
+ 所有注册过的服务端将在这里
+ 格式为{providername:{ip+port:service}}
+ providername = group_version_servicename
+ """
+ __service_provides = {}
+ __connect_state = 'UNCONNECT'
+ def __init__(self, hosts):
+ self.__zk = KazooClient(hosts=hosts, read_only=True)
+ self.__zk.add_listener(self.__state_listener)
+ self.__zk.start()
-def node_listener(event):
- # print event
- event_queue.put(event)
+ def __state_listener(self, state):
+ if state == KazooState.LOST:
+ # Register somewhere that the session was lost
+ self.__connect_state = state
+ elif state == KazooState.SUSPENDED:
+ # Handle being disconnected from Zookeeper
+ print 'disconnect from zookeeper'
+ self.__connect_state = state
+ else:
+ # Handle being connected/reconnected to Zookeeper
+ print 'connected'
+ self.__connect_state = state
+ def __event_listener(self, event):
+ self.__do_event(event)
-def handler_urls(urls):
- for child_node in urls:
- url = urllib.unquote(child_node).decode('utf8')
- if url.startswith('jsonrpc'):
- provide = ServiceProvider(url)
- service_key = service_provides.get(provide.interface)
- if service_key:
- service_key[provide.location] = provide
- else:
- service_provides[provide.interface] = {provide.location: provide}
+ def __handler_urls(self, urls):
+ for child_node in urls:
+ url = urllib.unquote(child_node).decode('utf8')
+ if url.startswith('jsonrpc'):
+ provide = ServiceProvider(url)
+ service_key = self.__service_provides.get(provide.interface)
+ if service_key:
+ service_key[provide.location] = provide
+ else:
+ self.__service_provides[provide.interface] = {provide.location: provide}
+ def __do_event(self, event):
+ # event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
+ # 如果要删除,必须先把/dubbo/和最后的/providers去掉
+ provide_name = event.path[7:event.path.rfind('/')]
+ if provide_name in self.__service_provides:
+ del self.__service_provides[provide_name]
+ if event.state == 'CONNECTED':
+ children = self.__zk.get_children(event.path, watch=self.__event_listener)
+ self.__handler_urls(children)
+ if event.state == 'DELETED':
+ children = self.__zk.get_children(event.path, watch=self.__event_listener)
+ self.__handler_urls(children)
-def add_provider_listener(provide_name):
- #如果已经存在,首先删除原有的服务的集合
- if provide_name in service_provides:
- del service_provides[provide_name]
- children = zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'), watch=node_listener)
- #全部重新添加
- handler_urls(children)
+ def add_provider_listener(self, provide_name):
+ """
+ 监听注册中心的服务上下线
+ :param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
+ :return: 无返回
+ """
+ # 如果已经存在,首先删除原有的服务的集合
+ if provide_name in self.__service_provides:
+ del self.__service_provides[provide_name]
+ children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', provide_name, 'providers'),
+ watch=self.__event_listener)
+ # 全部重新添加
+ self.__handler_urls(children)
+ def get_provides(self, provide_name, default=None):
+ """
+ 获取已经注册的服务URL对象
+ :param provide_name: com.ofpay.demo.api.UserProvider
+ :param default:
+ :return: 返回一个dict的服务集合
+ """
+ return self.__service_provides.get(provide_name, default)
-num_worker_threads = 2
-
-
-def worker():
- while True:
- event = event_queue.get()
- do_event(event)
- event_queue.task_done()
-
-
-event_queue = Queue.Queue()
-
-
-def do_event(event):
- # event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
- # 如果要删除,必须先把/dubbo/和最后的/providers去掉
- provide_name = event.path[7:event.path.rfind('/')]
- if provide_name in service_provides:
- del service_provides[provide_name]
- if event.state == 'CONNECTED':
- children = zk.get_children(event.path, watch=node_listener)
- handler_urls(children)
- if event.state == 'DELETED':
- children = zk.get_children(event.path, watch=node_listener)
- handler_urls(children)
-
-for i in range(num_worker_threads):
- t = Thread(target=worker)
- t.daemon = True
- t.start()
-
-
-zk.start()
-event_queue.join()
if __name__ == '__main__':
- zk.start()
- if zk.exists("/dubbo"):
- add_provider_listener('com.ofpay.demo.api.UserProvider')
+ pass
diff --git a/dubbo_client/rpc.py b/dubbo_client/rpc.py
index 6bbd3ba..888d985 100644
--- a/dubbo_client/rpc.py
+++ b/dubbo_client/rpc.py
@@ -1,30 +1,15 @@
-import httplib
-import json
import random
from urllib2 import HTTPError
+
from pyjsonrpc import HttpClient, JsonRpcError
-from dubbo_client.registry import service_provides, add_provider_listener
+
from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors
__author__ = 'caozupeng'
-def raw_client(service_interface, app_params):
- headers = {"Content-type": "application/json-rpc",
- "Accept": "text/json"}
- provides = service_provides.get(service_interface, ())
- if len(provides) > 0:
- location, first = provides.items().pop()
- h1 = httplib.HTTPConnection(first.ip, port=int(first.port))
- h1.request("POST", first.path, json.dumps(app_params), headers)
- response = h1.getresponse()
- return response.read(), None
- else:
- return None, 'can not find the provide of {0}'.format(service_interface)
-
-
class DubboClient(object):
interface = ''
@@ -37,12 +22,13 @@
def __call__(self, *args, **kwargs):
return self.client_instance.call(self.method, *args, **kwargs)
- def __init__(self, interface):
+ def __init__(self, interface, registry):
self.interface = interface
- add_provider_listener(interface)
+ self.registry = registry
+ self.registry.add_provider_listener(interface)
def call(self, method, *args, **kwargs):
- provides = service_provides.get(self.interface, {})
+ provides = self.registry.get_provides(self.interface, {})
if len(provides) == 0:
raise NoProvider('can not find provide', self.interface)
location, provide = random.choice(provides.items())
@@ -70,14 +56,4 @@
if __name__ == '__main__':
- app_params = {
- "jsonrpc": "2.0",
- "method": "getUser",
- "params": ["A001"],
- "id": 1
- }
- service_interface = 'com.ofpay.demo.api.UserProvider'
- add_provider_listener(service_interface)
- ret, error = raw_client(service_interface, app_params)
- if not error:
- print json.loads(ret, encoding='utf-8')
\ No newline at end of file
+ pass
\ No newline at end of file
diff --git a/test_dubbo_client/test_registry.py b/test_dubbo_client/test_registry.py
index b07546a..8cef5b2 100644
--- a/test_dubbo_client/test_registry.py
+++ b/test_dubbo_client/test_registry.py
@@ -1,11 +1,8 @@
-from dubbo_client.registry import zk
+from dubbo_client import ZookeeperRegistry
__author__ = 'caozupeng'
if __name__ == '__main__':
- if zk.exists("/dubbo"):
- # Print the version of a node and its data
- children = zk.get_children("/dubbo")
- print "There are {0} children".format(len(children))
- for node in children:
- print node
\ No newline at end of file
+ registry = ZookeeperRegistry('172.19.65.33:2181')
+ registry.add_provider_listener('com.ofpay.demo.api.UserProvider')
+ print registry.get_provides('com.ofpay.demo.api.UserProvider')
\ No newline at end of file
diff --git a/test_dubbo_client/test_rpclib.py b/test_dubbo_client/test_rpclib.py
index 2346359..553dce9 100644
--- a/test_dubbo_client/test_rpclib.py
+++ b/test_dubbo_client/test_rpclib.py
@@ -2,13 +2,15 @@
import time
from dubbo_client import DubboClient, DubboClientError
+from dubbo_client import ZookeeperRegistry
__author__ = 'caozupeng'
if __name__ == '__main__':
service_interface = 'com.ofpay.demo.api.UserProvider'
- dubbo_client = DubboClient(service_interface)
+ registry = ZookeeperRegistry('172.19.65.33:2181')
+ dubbo_client = DubboClient(service_interface, registry)
for i in range(1000):
try:
print dubbo_client.getUser('A003')