blob: b12fe28da448855a8f2a3a19b9fb3c7f7cac92d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import debug from 'debug';
import config from '../common/config';
import RequestContext from './request-context';
import DubboUrl from './dubbo-url';
import {
DubboMethodParamHessianTypeError,
DubboTimeoutError,
} from '../common/err';
import SocketWorker from './socket-worker';
import statistics from './statistics';
import {IObservable, TQueueObserver, TRequestId} from '../types';
import {isDevEnv, noop, traceErr} from '../common/util';
import {DEFAULT_DUBBO_PROTOCOL_VERSION} from '../serialization/constants';
const log = debug('dubbo:queue');
/**
* Node的异步特性就会让我们在思考问题的时候,要转换一下思考问题的思维
* 无论是zookeeper的连接,还是socket的创建都是异步的特性。
* 但是请求的incoming的时候,整体可能还没有初始化结束,如果我们试图去阻塞
* 就会导致整个编程架构很痛苦。
* 所有简单的处理就是,每次处理请求incoming的时候先把请求参数推入队列,然后
* 等待后面的资源初始化结束进行处理,如果超过超时时间就自动进行timeout超时处理
*/
export default class Queue implements IObservable<TQueueObserver> {
private constructor() {
log('new Queue');
//调用队列-保持调用的requestId和参数
this._requestQueue = new Map();
//订阅者,当有新的dubbo请求添加到队列中,通知schedule进行处理
this._subscriber = noop;
}
//订阅者
private _subscriber: Function;
//请求队列
private readonly _requestQueue: Map<TRequestId, RequestContext>;
private _clear(requestId) {
log(`clear invoke and schedule queue #${requestId}`);
this._requestQueue.delete(requestId);
if (isDevEnv) {
log('current schedule queue', this.scheduleQueue);
this._showStatistics();
}
}
/**
* static factory method
*/
static create() {
return new Queue();
}
add = (ctx: RequestContext) => {
return new Promise((resolve, reject) => {
ctx.resolve = resolve;
ctx.reject = reject;
//hessian参数检测
if (!Queue._checkMethodArgsHessianType(ctx)) {
return;
}
//timeout超时检测
this._checkTimeout(ctx);
//add queue
const {requestId, dubboInterface} = ctx.request;
log(`add queue,requestId#${requestId}, interface: ${dubboInterface}`);
//设置调用队列
this._requestQueue.set(requestId, ctx);
if (isDevEnv) {
log(`current schedule queue =>`, this.scheduleQueue);
}
//通知scheduler
this._subscriber(requestId, ctx);
});
};
/**
* 获取当前请求队列
*/
get requestQueue() {
return this._requestQueue;
}
/**
* 获取当前调度队列
*/
get scheduleQueue() {
const schedule = {};
for (let [requestId, ctx] of this._requestQueue) {
schedule[requestId] = ctx.pid;
}
return schedule;
}
subscribe(cb: Function) {
this._subscriber = cb;
return this;
}
allFailed(err: Error) {
for (let [requestId, ctx] of this._requestQueue) {
const {
reject,
request: {dubboInterface, methodName},
} = ctx;
log(
'queue schedule failed requestId#%d, %s#%s err: %s',
requestId,
dubboInterface,
methodName,
err,
);
ctx.cleanTimeout();
reject(err);
}
this._requestQueue.clear();
}
failed(requestId: TRequestId, err: Error, attachments: Object = {}) {
const ctx = this._requestQueue.get(requestId);
if (!ctx) {
return;
}
const {
uuid,
request: {dubboInterface, methodName},
} = ctx;
log('queue schedule failed requestId#%d, err: %s', requestId, err);
err.message = `uuid: ${uuid} invoke ${dubboInterface}#${methodName} was error, ${
err.message
}`;
//删除该属性,不然会导致JSON.Stringify失败
if (err['cause']) {
delete err['cause']['cause'];
}
//dubbo2.6.3
ctx.providerAttachments = attachments;
ctx.cleanTimeout();
ctx.reject(err);
this._clear(requestId);
}
consume(requestId: TRequestId, node: SocketWorker, providerMeta: DubboUrl) {
const ctx = this._requestQueue.get(requestId);
if (!ctx) {
return;
}
const {request} = ctx;
const {dubboInterface, version} = request;
log(`staring schedule ${requestId}#${dubboInterface}#${version}`);
//merge dubboVersion timeout group
request.dubboVersion =
request.dubboVersion ||
providerMeta.dubboVersion ||
DEFAULT_DUBBO_PROTOCOL_VERSION;
request.group = request.group || providerMeta.group;
request.path = providerMeta.path;
try {
node.write(ctx);
} catch (err) {
this.failed(requestId, err);
traceErr(err);
}
if (isDevEnv) {
log(`current schedule queue ==>`, this.scheduleQueue);
}
}
resolve(requestId: TRequestId, res: any, attachments: Object = {}) {
const ctx = this._requestQueue.get(requestId);
if (!ctx) {
return;
}
log('resolve requestId#%d, res: %O', requestId, res);
//dubbo2.6.3
ctx.providerAttachments = attachments;
ctx.cleanTimeout();
ctx.resolve(res);
this._clear(requestId);
}
private _showStatistics() {
//调度完成,显示调度结果
if (this._requestQueue.size === 0) {
log('invoke statistics==>%o', statistics);
}
}
/**
* 检测方法参数是不是都是hessian格式
* @param ctx
*/
private static _checkMethodArgsHessianType(ctx: RequestContext) {
if (ctx.isMethodArgsHessianType) {
return true;
}
const {dubboInterface, methodArgs, methodName} = ctx.request;
statistics.paramCheckErrCount++;
log(
`${dubboInterface} method: ${methodName} not all arguments are valid hessian type arguments: => %O`,
methodArgs,
);
ctx.reject(
new DubboMethodParamHessianTypeError(
`err: ${dubboInterface}#${methodName} not all arguments are valid hessian type`,
),
);
return false;
}
/**
* 超时检测
* @param ctx
*/
private _checkTimeout(ctx: RequestContext) {
//先获取上下文设置的超时时间,如果没有设置就获取最大超时时间
const timeout = (ctx.timeout || config.dubboInvokeTimeout) * 1000;
log('check timeout: ctx.timeout-> %d @timeout: %d', ctx.timeout, timeout);
ctx.timeoutId = setTimeout(() => {
statistics.timeoutErrCount++;
const {requestId, dubboInterface, methodName} = ctx.request;
log(`err: ${dubboInterface}#${methodName} remote invoke timeout`);
this.failed(
requestId,
new DubboTimeoutError(
`err:${dubboInterface}#${methodName} remote invoke timeout`,
),
);
}, timeout);
}
}