| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import debug from 'debug'; |
| import ip from 'ip'; |
| import zookeeper from 'node-zookeeper-client'; |
| import qs from 'querystring'; |
| import Context from './context'; |
| import DubboUrl from './dubbo-url'; |
| import { |
| ZookeeperDisconnectedError, |
| ZookeeperExpiredError, |
| ZookeeperTimeoutError, |
| } from './err'; |
| import {go} from './go'; |
| import { |
| ICreateConsumerParam, |
| IObservable, |
| IRegistrySubscriber, |
| IZkClientProps, |
| } from './types'; |
| import {delay, eqSet, isDevEnv, msg, noop, traceErr, traceInfo} from './util'; |
| |
| const log = debug('dubbo:zookeeper'); |
| const ipAddress = ip.address(); |
| |
| export type TAgentAddr = string; |
| export type TDubboInterface = string; |
| |
| export class ZkRegistry implements IObservable<IRegistrySubscriber> { |
| private constructor(props: IZkClientProps) { |
| log(`new:|> %O`, props); |
| this._props = props; |
| //默认dubbo |
| this._props.zkRoot = this._props.zkRoot || 'dubbo'; |
| //保存dubbo接口和服务url之间的映射关系 |
| this._dubboServiceUrlMap = new Map(); |
| this._agentAddrSet = new Set(); |
| //初始化订阅者 |
| this._subscriber = { |
| onData: noop, |
| onError: noop, |
| }; |
| //初始化zookeeper的client |
| this._connect(this._init); |
| } |
| |
| private _agentAddrSet: Set<string>; |
| private _client: zookeeper.Client; |
| private _subscriber: IRegistrySubscriber; |
| private readonly _props: IZkClientProps; |
| private readonly _dubboServiceUrlMap: Map<TDubboInterface, Array<DubboUrl>>; |
| |
| //===========================public method============================= |
| static from(props: IZkClientProps) { |
| return new ZkRegistry(props); |
| } |
| |
| /** |
| * 根据dubbo调用上下文interface, group, version等,获取负载列表 |
| * @param ctx dubbo调用上下文 |
| */ |
| getAgentAddrList(ctx: Context) { |
| const {dubboInterface, version, group} = ctx; |
| return this._dubboServiceUrlMap |
| .get(dubboInterface) |
| .filter(serviceProp => { |
| const isSameVersion = serviceProp.version === version; |
| //如果Group为null,就默认匹配, 不检查group |
| //如果Group不为null,确保group和接口的group一致 |
| const isSameGroup = !group || group === serviceProp.group; |
| return isSameGroup && isSameVersion; |
| }) |
| .map(({host, port}) => `${host}:${port}`); |
| } |
| |
| /** |
| * 根据dubbo调用上下文获取服务提供者的信息 |
| * @param ctx |
| */ |
| getDubboServiceProp(ctx: Context) { |
| let {dubboInterface, version, group, invokeHost, invokePort} = ctx; |
| const dubboServicePropList = this._dubboServiceUrlMap.get(dubboInterface); |
| for (let prop of dubboServicePropList) { |
| const isSameHost = prop.host === invokeHost; |
| const isSamePort = prop.port === invokePort; |
| const isSameVersion = prop.version === version; |
| //如果Group为null,就默认匹配, 不检查group |
| //如果Group不为null,确保group和接口的group一致 |
| const isSameGroup = !group || group === prop.group; |
| |
| if (isSameHost && isSamePort && isSameVersion && isSameGroup) { |
| log('getProviderProps:|> %s', prop); |
| return prop; |
| } |
| } |
| } |
| |
| /** |
| * 订阅者 |
| * @param subscriber |
| */ |
| subscribe(subscriber: IRegistrySubscriber) { |
| this._subscriber = subscriber; |
| return this; |
| } |
| |
| //========================private method========================== |
| private _init = async (err: Error) => { |
| //zookeeper occur error |
| if (err) { |
| log(err); |
| traceErr(err); |
| this._subscriber.onError(err); |
| return; |
| } |
| |
| //zookeeper connected(may be occur many times) |
| const { |
| zkRoot, |
| application: {name}, |
| interfaces, |
| } = this._props; |
| |
| //获取所有provider |
| for (let inf of interfaces) { |
| //当前接口在zookeeper中的路径 |
| const dubboServicePath = `/${zkRoot}/${inf}/providers`; |
| //当前接口路径下的dubbo url |
| const dubboServiceUrls = await this._getDubboServiceUrls( |
| dubboServicePath, |
| inf, |
| ); |
| |
| //init |
| this._dubboServiceUrlMap.set(inf, []); |
| |
| for (let serviceUrl of dubboServiceUrls) { |
| const url = DubboUrl.from(serviceUrl); |
| const {host, port, dubboVersion} = url; |
| this._dubboServiceUrlMap.get(inf).push(url); |
| |
| //写入consume信息 |
| this._createConsumer({ |
| host: host, |
| port: port, |
| name: name, |
| dubboInterface: inf, |
| dubboVersion: dubboVersion, |
| }).then(() => log('create Consumer finish')); |
| } |
| } |
| |
| if (isDevEnv) { |
| log('agentAddrSet: %O', this._allAgentAddrSet); |
| log('dubboServiceUrl:|> %O', this._dubboServiceUrlMap); |
| } |
| |
| this._agentAddrSet = this._allAgentAddrSet; |
| this._subscriber.onData(this._allAgentAddrSet); |
| }; |
| |
| /** |
| * get current all agent address |
| */ |
| get allAgentAddrSet() { |
| return this._agentAddrSet; |
| } |
| |
| /** |
| * 获取所有的负载列表,通过agentAddrMap聚合出来 |
| * 这样有点Reactive的感觉,不需要考虑当中增加删除的动作 |
| */ |
| private get _allAgentAddrSet() { |
| const agentSet = new Set(); |
| for (let urlList of this._dubboServiceUrlMap.values()) { |
| for (let url of urlList) { |
| agentSet.add(url.host + ':' + url.port); |
| } |
| } |
| return agentSet; |
| } |
| |
| /** |
| * 获取所有的provider列表 |
| * @param {string} dubboServicePath |
| * @param dubboInterface |
| * @returns {Promise<Array<string>>} |
| * @private |
| */ |
| private async _getDubboServiceUrls( |
| dubboServicePath: string, |
| dubboInterface: string, |
| ): Promise<Array<string>> { |
| const {res, err} = await go( |
| this._getChildren( |
| dubboServicePath, |
| this._watch(dubboServicePath, dubboInterface), |
| ), |
| ); |
| |
| if (err) { |
| log(`getChildren ${dubboServicePath} error ${err}`); |
| traceErr(err); |
| return []; |
| } |
| |
| if (!res.children || res.children.length === 0) { |
| traceErr( |
| new Error( |
| `zk get DubboSericeUrls result is empty with service path ${dubboServicePath} and interface ${dubboInterface}.`, |
| ), |
| ); |
| } |
| |
| return (res.children || []) |
| .map(child => decodeURIComponent(child)) |
| .filter(child => child.startsWith('dubbo://')); |
| } |
| |
| //========================zookeeper helper========================= |
| /** |
| * connect zookeeper |
| */ |
| private _connect = (callback: (err: Error) => void) => { |
| const {register} = this._props; |
| //debug log |
| log(`connecting zkserver ${register}`); |
| //connect |
| this._client = zookeeper.createClient(register, { |
| retries: 10, |
| sessionTimeout: 60 * 1000, |
| }); |
| |
| //超时检测 |
| //node-zookeeper-client,有个bug,当连不上zk时会无限重连 |
| //手动做一个超时检测 |
| const {retries, sessionTimeout} = (this._client as any).options; |
| const timeId = setTimeout(() => { |
| log(`Could not connect zk ${register}, time out`); |
| this._client.close(); |
| callback( |
| new ZookeeperTimeoutError( |
| `ZooKeeper was connected ${register} time out. `, |
| ), |
| ); |
| }, retries * sessionTimeout); |
| |
| //connected |
| this._client.once('connected', () => { |
| log(`connected to zkserver ${register}`); |
| clearTimeout(timeId); |
| callback(null); |
| msg.emit('sys:ready'); |
| }); |
| |
| //in order to trace connect info |
| this._client.on('connected', () => { |
| traceInfo( |
| `connected to zkserver ${register} current state is ${this._client.getState()}`, |
| ); |
| callback(null); |
| }); |
| |
| //the connection between client and server is dropped. |
| this._client.on('disconnected', () => { |
| log(`zk ${register} had disconnected`); |
| clearTimeout(timeId); |
| callback( |
| new ZookeeperDisconnectedError( |
| `ZooKeeper was disconnected. current state is ${this._client.getState()} `, |
| ), |
| ); |
| }); |
| |
| this._client.on('expired', () => { |
| log(`zk ${register} had disconnected`); |
| callback( |
| new ZookeeperExpiredError( |
| `Zookeeper was session Expired Error current state ${this._client.getState()}`, |
| ), |
| ); |
| |
| //Please FIXEDME |
| this._client.connect(); |
| }); |
| |
| //connect |
| this._client.connect(); |
| }; |
| |
| private _watch(dubboServicePath: string, dubboInterface: string) { |
| //@ts-ignore |
| return async (e: zookeeper.Event) => { |
| log(`trigger watch ${e}`); |
| |
| //会有概率性的查询节点为空,可以延时一些时间 |
| await delay(2000); |
| |
| const dubboServiceUrls = await this._getDubboServiceUrls( |
| dubboServicePath, |
| dubboInterface, |
| ); |
| |
| //clear current dubbo interface |
| const agentAddrList = []; |
| const urls = []; |
| for (let serviceUrl of dubboServiceUrls) { |
| const url = DubboUrl.from(serviceUrl); |
| const {host, port, dubboVersion} = url; |
| agentAddrList.push(`${host}:${port}`); |
| urls.push(url); |
| |
| this._createConsumer({ |
| host: host, |
| port: port, |
| name: this._props.application.name, |
| dubboInterface: dubboInterface, |
| dubboVersion: dubboVersion, |
| }).then(() => log('create consumer finish')); |
| } |
| this._dubboServiceUrlMap.set(dubboInterface, urls); |
| |
| if (agentAddrList.length === 0) { |
| traceErr(new Error(`trigger watch ${e} agentList is empty`)); |
| } else { |
| traceInfo(`trigger watch ${e} agentList ${agentAddrList.join(',')}`); |
| } |
| |
| if (isDevEnv) { |
| log('agentSet:|> %O', this._allAgentAddrSet); |
| log( |
| 'update dubboInterface %s providerList %O', |
| dubboInterface, |
| this._dubboServiceUrlMap.get(dubboInterface), |
| ); |
| } |
| |
| if (!eqSet(this._agentAddrSet, this._allAgentAddrSet)) { |
| this._agentAddrSet = this._allAgentAddrSet; |
| this._subscriber.onData(this._allAgentAddrSet); |
| } else { |
| log('no agent change'); |
| } |
| }; |
| } |
| |
| private _getChildren = ( |
| path: string, |
| watch?: (e: zookeeper.Event) => void, |
| ): Promise<{children: Array<string>; stat: zookeeper.Stat}> => { |
| if (!watch) { |
| watch = () => {}; |
| } |
| return new Promise((resolve, reject) => { |
| this._client.getChildren(path, watch, (err, children, stat) => { |
| if (err) { |
| reject(err); |
| return; |
| } |
| |
| resolve({ |
| children, |
| stat, |
| }); |
| }); |
| }); |
| }; |
| |
| /** |
| * com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry |
| */ |
| private async _createConsumer(params: ICreateConsumerParam) { |
| let {host, port, name, dubboInterface, dubboVersion} = params; |
| |
| const dubboSetting = this._props.dubboSetting.getDubboSetting( |
| dubboInterface, |
| ); |
| |
| if (!dubboSetting) { |
| throw new Error( |
| `Could not find group, version for ${dubboInterface} please check your dubbo setting`, |
| ); |
| } |
| |
| const queryParams = { |
| host, |
| port, |
| interface: dubboInterface, |
| application: name, |
| category: 'consumers', |
| dubbo: dubboVersion, |
| method: '', |
| revision: '', |
| version: dubboSetting.version, |
| group: dubboSetting.group, |
| side: 'consumer', |
| check: 'false', |
| }; |
| |
| //create root comsumer |
| const consumerRoot = `/dubbo/${dubboInterface}/consumers`; |
| const err = await this._createRootConsumer(consumerRoot); |
| if (err) { |
| log('create root consumer: error %o', err); |
| return; |
| } |
| |
| //create comsumer |
| const consumerUrl = |
| consumerRoot + |
| '/' + |
| encodeURIComponent( |
| `consumer://${ipAddress}/${dubboInterface}?${qs.stringify( |
| queryParams, |
| )}`, |
| ); |
| const exist = await go(this._exists(consumerUrl)); |
| if (exist.err) { |
| log(`check consumer url: ${consumerUrl} failed`); |
| return; |
| } |
| |
| if (exist.res) { |
| log(`check consumer url: ${consumerUrl} was existed.`); |
| return; |
| } |
| |
| const create = await go( |
| this._create(consumerUrl, zookeeper.CreateMode.EPHEMERAL), |
| ); |
| |
| if (create.err) { |
| log( |
| `check consumer url: ${decodeURIComponent( |
| consumerUrl, |
| )}创建consumer失败 %o`, |
| create.err, |
| ); |
| return; |
| } |
| |
| log(`create successfully consumer url: ${decodeURIComponent(consumerUrl)}`); |
| } |
| |
| private async _createRootConsumer(consumer: string) { |
| let {res, err} = await go(this._exists(consumer)); |
| //check error |
| if (err) { |
| return err; |
| } |
| |
| // current consumer root path was existed. |
| if (res) { |
| return null; |
| } |
| |
| //create current consumer path |
| ({err} = await go(this._create(consumer, zookeeper.CreateMode.PERSISTENT))); |
| if (err) { |
| return err; |
| } |
| |
| log('create root comsumer %s successfull', consumer); |
| } |
| |
| private _create = (path: string, mode: number): Promise<string> => { |
| return new Promise((resolve, rejec) => { |
| this._client.create(path, mode, (err, path) => { |
| if (err) { |
| rejec(err); |
| return; |
| } |
| resolve(path); |
| }); |
| }); |
| }; |
| |
| private _exists = (path: string): Promise<zookeeper.Stat> => { |
| return new Promise((resolve, reject) => { |
| this._client.exists(path, (err, stat) => { |
| if (err) { |
| reject(err); |
| return; |
| } |
| resolve(stat); |
| }); |
| }); |
| }; |
| } |