blob: 0202730b3614a0d453ede21fb4a301ee44ff648b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import debug from 'debug';
import {SocketError} from './err';
import {MSG_TYPE, msg} from './msg';
import SocketPool from './socket-pool';
import {IObservable, ISocketSubscriber} from './types';
import {TAgentHostPort} from './zookeeper';
const noop = () => {};
const log = debug('dubbo:server-agent');
/**
* 机器agent和socket-pool的管理容器
*/
export class ServerAgent implements IObservable<ISocketSubscriber> {
constructor() {
log('new ServerAgent');
this._agentHostSet = new Set();
this._serverAgentMap = new Map();
this._subscriber = {
onConnect: noop,
onData: noop,
onClose: noop,
};
}
private _agentHostSet: Set<string>;
private _subscriber: ISocketSubscriber;
private readonly _serverAgentMap: Map<TAgentHostPort, SocketPool>;
from = (agentHostList: Set<string>) => {
//获取负载host:port列表
//根据负载创建连接池
process.nextTick(() => {
for (let agentHost of agentHostList) {
if (this._agentHostSet.has(agentHost)) {
continue;
}
log(`new ServerAgent with ${agentHost} -> socket pool`);
const socketPool = SocketPool.from(agentHost).subscribe({
onConnect: this._subscriber.onConnect,
onData: this._subscriber.onData,
onClose: ({pid}) => {
this.clearClosedPool();
this._subscriber.onClose({pid});
},
});
this._serverAgentMap.set(agentHost, socketPool);
}
log(
'current ServerAgent includes agentHosts: %O',
this._serverAgentMap.keys(),
);
});
return this;
};
/**
* 查询一组负载可用的agent
* @param agentHostPorts
*/
getAvailableSocketAgents(
agentHostPorts: Array<TAgentHostPort>,
): Array<SocketPool> {
let availableList = [];
for (let agentHostPort of agentHostPorts) {
const socketPool = this._serverAgentMap.get(agentHostPort);
if (socketPool && socketPool.hasAvaliableNodes) {
availableList.push(socketPool);
}
}
return availableList;
}
getAvailableSocketAgent(agentHostPorts: Array<TAgentHostPort>) {
const availableList = this.getAvailableSocketAgents(agentHostPorts);
const len = availableList.length;
if (len === 1) {
return availableList[0];
}
return availableList[Math.floor(Math.random() * len)];
}
hasAvailableSocketAgent(agentHostPorts: Array<TAgentHostPort>) {
return this.getAvailableSocketAgents(agentHostPorts).length > 0;
}
clearClosedPool = () => {
for (let [agentHost, socketPool] of this._serverAgentMap) {
if (socketPool.isAllClose) {
//如果全部关闭
log(
`${agentHost}'s pool socket-worker had all closed. delete ${agentHost}`,
);
//通知外部,销毁了这个socket-pool
msg.emit(
MSG_TYPE.SYS_ERR,
new SocketError(
`${agentHost}'s pool socket-worker had all closed. delete ${agentHost}`,
),
);
this._serverAgentMap.delete(agentHost);
}
}
log('SocketAgent current agentHost->', this._serverAgentMap.keys());
};
subscribe(subscriber: ISocketSubscriber) {
this._subscriber = subscriber;
return this;
}
}
export default new ServerAgent();