# coding=utf-8
import time

__author__ = 'caozupeng'
import os
import socket
import struct
from threading import Thread

from dubbo_client.config import ApplicationConfig

import urllib

from kazoo.protocol.states import KazooState
from kazoo.client import KazooClient

from dubbo_client.common import ServiceURL


class Registry(object):
    """
    所有注册过的服务端将在这里
    interface=com.ofpay.demo.DemoService
    location = ip:port/url 比如 172.19.20.111:38080/com.ofpay.demo.DemoService2
    providername = servicename|version|group
    dict 格式为{interface:{providername:{ip+port:service_url}}}

    """
    _service_provides = {}

    def _do_event(self, event):
        """
        protect方法，处理回调，留给子类实现
        :param event:
        :return:
        """
        pass

    def register(self, interface, **kwargs):
        """
        客户端注册到注册中心，亮出自己的身份
        :param interface:
        :param kwargs:
        :return:
        """
        pass

    def subscribe(self, interface, **kwargs):
        """
        监听注册中心的服务上下线
        :param provide_name: 类似com.ofpay.demo.api.UserProvider这样的服务名
        :param kwargs: version , group
        :return: 无返回
        """
        pass

    def get_provides(self, interface, **kwargs):
        """
        获取已经注册的服务URL对象
        :param interface: com.ofpay.demo.api.UserProvider
        :param default:
        :return: 返回一个dict的服务集合
        """
        group = kwargs.get('group', '')
        version = kwargs.get('version', '')
        key = self._to_key(interface, version, group)
        second = self._service_provides.get(interface, {})
        return second.get(key, {})

    def event_listener(self, event):
        """
        node provides上下线的监听回调函数
        :param event:
        :return:
        """
        self._do_event(event)

    def _to_key(self, interface, versioin, group):
        """
        计算存放在内存中的服务的key，以接口、版本、分组计算
        :param interface: 接口 类似com.ofpay.demo.DemoProvider
        :param versioin: 版本 1.0
        :param group:  分组 product
        :return: key 字符串
        """
        return '{0}|{1}|{2}'.format(interface, versioin, group)

    def _add_node(self, interface, service_url):
        key = self._to_key(service_url.interface, service_url.version, service_url.group)
        second_dict = self._service_provides.get(interface)
        if second_dict:
            # 获取最内层的nest的dict
            inner_dict = second_dict.get(key)
            if inner_dict:
                inner_dict[service_url.location] = service_url
            else:
                second_dict[key] = {service_url.location: service_url}
        else:
            # create the second dict
            self._service_provides[interface] = {key: {service_url.location: service_url}}

    def _remove_node(self, interface, service_url):
        key = self._to_key(service_url.interface, service_url.version, service_url.group)
        second_dict = self._service_provides.get(interface)
        if second_dict:
            inner_dict = second_dict.get(key)
            if inner_dict:
                del inner_dict[service_url.location]

    def _compare_swap_nodes(self, interface, nodes):
        """
        比较，替换现有内存中的节点信息，节点url类似如下
        jsonrpc://192.168.2.1:38080/com.ofpay.demo.api.UserProvider?
        anyhost=true&application=demo-provider&default.timeout=10000&dubbo=2.4.10&
        environment=product&interface=com.ofpay.demo.api.UserProvider&
        methods=getUser,queryAll,queryUser,isLimit&owner=wenwu&pid=61578&
        side=provider&timestamp=1428904600188
        首先将url转为ServiceUrl对象，然保持到缓存中
        :param nodes: 节点列表
        :return: 不需要返回
        """
        # 如果已经存在，首先删除原有的服务的集合
        if interface in self._service_provides:
            del self._service_provides[interface]
        for child_node in nodes:
            node = urllib.unquote(child_node).decode('utf8')
            if node.startswith('jsonrpc'):
                service_url = ServiceURL(node)
                self._add_node(interface, service_url)


class ZookeeperRegistry(Registry):
    _app_config = ApplicationConfig('default_app')
    _connect_state = 'UNCONNECT'

    def __init__(self, zk_hosts, application_config=None):
        if application_config:
            self._app_config = application_config
        self.__zk = KazooClient(hosts=zk_hosts)
        self.__zk.add_listener(self.__state_listener)
        self.__zk.start()

    def __state_listener(self, state):
        if state == KazooState.LOST:
            # Register somewhere that the session was lost
            self._connect_state = state
        elif state == KazooState.SUSPENDED:
            # Handle being disconnected from Zookeeper
            # print 'disconnect from zookeeper'
            self._connect_state = state
        else:
            # Handle being connected/reconnected to Zookeeper
            # print 'connected'
            self._connect_state = state

    def __unquote(self, origin_nodes):
        return (urllib.unquote(child_node).decode('utf8') for child_node in origin_nodes if child_node)

    def _do_event(self, event):
        # event.path 是类似/dubbo/com.ofpay.demo.api.UserProvider/providers 这样的
        # 如果要删除，必须先把/dubbo/和最后的/providers去掉
        # 将zookeeper中查询到的服务节点列表加入到一个dict中
        # zookeeper中保持的节点url类似如下
        provide_name = event.path[7:event.path.rfind('/')]
        if event.state == 'CONNECTED':
            children = self.__zk.get_children(event.path, watch=self.event_listener)
            self._compare_swap_nodes(provide_name, self.__unquote(children))
        if event.state == 'DELETED':
            children = self.__zk.get_children(event.path, watch=self.event_listener)
            self._compare_swap_nodes(provide_name, self.__unquote(children))

    def register(self, interface, **kwargs):
        ip = self.__zk._connection._socket.getsockname()[0]
        params = {
            'interface': interface,
            'application': self._app_config.name,
            'application.version': self._app_config.version,
            'category': 'consumer',
            'dubbo': 'dubbo-client-py-1.0.0',
            'environment': self._app_config.environment,
            'interface': interface,
            'method': '',
            'owner': self._app_config.owner,
            'side': 'consumer',
            'pid': os.getpid(),
            'version': '1.0'
        }
        url = 'consumer://{0}/{1}?{2}'.format(ip, interface, urllib.urlencode(params))
        # print urllib.quote(url, safe='')

        consumer_path = '{0}/{1}/{2}'.format('dubbo', interface, 'consumers')
        self.__zk.ensure_path(consumer_path)
        self.__zk.create(consumer_path + '/' + urllib.quote(url, safe=''), ephemeral=True)

    def subscribe(self, interface, **kwargs):
        """
        监听注册中心的服务上下线
        :param interface: 类似com.ofpay.demo.api.UserProvider这样的服务名
        :return: 无返回
        """
        version = kwargs.get('version', '')
        group = kwargs.get('group', '')
        children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
                                          watch=self.event_listener)
        # 全部重新添加
        self._compare_swap_nodes(interface, self.__unquote(children))


class MulticastRegistry(Registry):
    class _Loop(Thread):
        def __init__(self, address, callback):
            Thread.__init__(self)
            self.multicast_group, self.multicast_port = address.split(':')
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
            # in osx we should use SO_REUSEPORT instead of SO_REUSEADDRESS
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            self.sock.bind(('', int(self.multicast_port)))
            mreq = struct.pack("4sl", socket.inet_aton(self.multicast_group), socket.INADDR_ANY)
            self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
            self.callback = callback

        def run(self):
            while True:
                event = self.sock.recv(10240)
                print event
                self.callback(event.rstrip())

        def set_mssage(self, msg):
            self.sock.sendto(msg, (self.multicast_group, int(self.multicast_port)))


    def __init__(self, address, application_config=None):
        if application_config:
            self._app_config = application_config
        self.event_loop = self._Loop(address, self.event_listener)
        self.event_loop.setDaemon(True)
        self.event_loop.start()
        # self.event_loop.set_mssage('subscribe provider://172.19.3.111:38081/com.ofpay.demo.api.UserProvider?anyhost=true&application=jsonrpcdemo&category=configurators&check=false&default.timeout=10000&dubbo=2.4.10&environment=product&interface=com.ofpay.demo.api.UserProvider&methods=getUser,queryAll,queryUser,isLimit&owner=wenwu&pid=63590&side=provider&timestamp=1429149716694')

    def _do_event(self, event):
        if event.startswith('register'):
            url = event[9:]
            if url.startswith('jsonrpc'):
                service_provide = ServiceURL(url)
                self._add_node(service_provide.interface, service_provide)
        if event.startswith('unregister'):
            url = event[11:]
            if url.startswith('jsonrpc'):
                service_provide = ServiceURL(url)
                self._remove_node(service_provide.interface, service_provide)


if __name__ == '__main__':
    # zk = KazooClient(hosts='192.168.59.103:2181')
    # zk.start()
    # parent_node = '{0}/{1}/{2}'.format('dubbo', 'com.ofpay.demo.api.UserProvider', '')
    # nodes = zk.get_children(parent_node)
    # for child_node in nodes:
    #     node = urllib.unquote(child_node).decode('utf8')
    #     print node
        # zk.delete(parent_node+'/'+child_node, recursive=True)
    registry = MulticastRegistry('224.5.6.7:1234')
    time.sleep(50)