refactor: 优化debug日志
diff --git a/packages/dubbo/src/consumer/directly-dubbo.ts b/packages/dubbo/src/consumer/directly-dubbo.ts
index 3c8b19f..aeb43e5 100644
--- a/packages/dubbo/src/consumer/directly-dubbo.ts
+++ b/packages/dubbo/src/consumer/directly-dubbo.ts
@@ -187,7 +187,7 @@
this._socketWorker.write(ctx);
break;
case SOCKET_STATUS.CLOSED:
- this.fail(requestId, new Error(`SocketWorker had closed.`));
+ this.fail(requestId, new Error(`socket-worker had closed.`));
break;
}
}
@@ -210,7 +210,7 @@
};
private onClose = () => {
- log('SocketWorker was closed');
+ log('socket-worker was closed');
this._socketStatus = SOCKET_STATUS.CLOSED;
//failed all
for (let ctx of this._queue.values()) {
diff --git a/packages/dubbo/src/consumer/scheduler.ts b/packages/dubbo/src/consumer/scheduler.ts
index 6f2a16e..c1e485d 100644
--- a/packages/dubbo/src/consumer/scheduler.ts
+++ b/packages/dubbo/src/consumer/scheduler.ts
@@ -210,11 +210,11 @@
}
private _handleOnConnect = ({pid, host, port}) => {
- log(`scheduler receive SocketWorker connect pid#${pid} ${host}:${port}`);
+ log(`scheduler receive socket-worker connect pid#${pid} ${host}:${port}`);
const agentHost = `${host}:${port}`;
this._status = STATUS.READY;
traceInfo(
- `scheduler receive SocketWorker connect pid#${pid} ${host}:${port}`,
+ `scheduler receive socket-worker connect pid#${pid} ${host}:${port}`,
);
for (let ctx of this._queue.requestQueue.values()) {
@@ -249,7 +249,7 @@
* 处理某一个SocketWorker被关闭的状态
*/
private _handleOnClose = ({pid}) => {
- log(`SocketWorker#${pid} was close`);
+ log(`socket-worker#${pid} was close`);
//查询之前哪些接口的方法被pid调用, 然后直接failfast
const {requestQueue} = this._queue;
@@ -257,7 +257,7 @@
if (ctx.pid === pid) {
this._handleFailed(
requestId,
- new SocketError(`SocketWorker#${pid} had closed.`),
+ new SocketError(`socket-worker#${pid} had closed.`),
);
}
}
diff --git a/packages/dubbo/src/consumer/socket-worker.ts b/packages/dubbo/src/consumer/socket-worker.ts
index 63fe0f0..f22188b 100644
--- a/packages/dubbo/src/consumer/socket-worker.ts
+++ b/packages/dubbo/src/consumer/socket-worker.ts
@@ -49,7 +49,7 @@
this._retry = RETRY_NUM;
this._status = SOCKET_STATUS.PADDING;
- log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
+ log('new socket-worker#%d|> %s %s', pid, host + ':' + port, this._status);
//init subscriber
this._subscriber = {
@@ -89,7 +89,7 @@
* @param ctx dubbo context
*/
write(ctx: RequestContext) {
- log(`SocketWorker#${this.pid} =invoked=> ${ctx.requestId}`);
+ log(`socket-worker#${this.pid} =invoked=> ${ctx.requestId}`);
statistics['pid#' + this.pid] = ++statistics['pid#' + this.pid];
// update heartbeat lastWriteTimestamp
@@ -141,7 +141,7 @@
//==========================private method================================
private _initSocket() {
- log(`SocketWorker#${this.pid} =connecting=> ${this.host}:${this.port}`);
+ log(`socket-worker#${this.pid} =connecting=> ${this.host}:${this.port}`);
if (this._socket) {
this._socket.destroy();
@@ -158,26 +158,30 @@
)
.on('data', () => {
log(
- `SocketWorker#${this.pid} =receive data=> ${this.host}:${this.port}`,
+ `socket-worker#${this.pid} =receive data=> ${this.host}:${
+ this.port
+ }`,
);
})
.on('error', this._onError)
.on('close', this._onClose);
// init decode
- DecodeBuffer.from(this._socket).subscribe(data => {
- if (HeartBeat.isHeartBeat(data)) {
- log(`SocketWorker#${this.pid} <=receive= heartbeat data.`);
- } else {
- const json = decodeDubboResponse(data);
- log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
- this._subscriber.onData(json);
- }
- });
+ DecodeBuffer.from(this._socket, `socket-worker#${this.pid}`).subscribe(
+ data => {
+ if (HeartBeat.isHeartBeat(data)) {
+ log(`socket-worker#${this.pid} <=receive= heartbeat data.`);
+ } else {
+ const json = decodeDubboResponse(data);
+ log(`socket-worker#${this.pid} <=received=> dubbo result %O`, json);
+ this._subscriber.onData(json);
+ }
+ },
+ );
}
private _onConnected = () => {
- log(`SocketWorker#${this.pid} <=connected=> ${this.host}:${this.port}`);
+ log(`socket-worker#${this.pid} <=connected=> ${this.host}:${this.port}`);
//set current status
this._status = SOCKET_STATUS.CONNECTED;
@@ -200,7 +204,7 @@
private _onError = (error: Error) => {
log(
- `SocketWorker#${this.pid} <=occur error=> ${this.host}:${
+ `socket-worker#${this.pid} <=occur error=> ${this.host}:${
this.port
} ${error}`,
);
@@ -208,7 +212,7 @@
private _onClose = (hadError: boolean) => {
log(
- `SocketWorker#${this.pid} <=closed=> ${this.host}:${
+ `socket-worker#${this.pid} <=closed=> ${this.host}:${
this.port
} hasError: ${hadError} retry: ${this._retry}`,
);
diff --git a/packages/dubbo/src/serialization/decode-buffer.ts b/packages/dubbo/src/serialization/decode-buffer.ts
index 9701ae0..6b9b265 100644
--- a/packages/dubbo/src/serialization/decode-buffer.ts
+++ b/packages/dubbo/src/serialization/decode-buffer.ts
@@ -39,8 +39,8 @@
private _transport: Socket;
private _subscriber: Function;
- constructor(transport: Socket) {
- log('new DecodeBuffer');
+ constructor(transport: Socket, private flag: string) {
+ log('%s new DecodeBuffer', this.flag);
this._subscriber = noop;
this._buffer = Buffer.alloc(0);
this._transport = transport;
@@ -54,8 +54,8 @@
});
}
- static from(transport: Socket) {
- return new DecodeBuffer(transport);
+ static from(transport: Socket, flag: string) {
+ return new DecodeBuffer(transport, flag);
}
receive(data: Buffer) {
@@ -70,17 +70,16 @@
//如果不是magichight magiclow 做个容错
if (magicHigh != DUBBO_MAGIC_HIGH || magicLow != DUBBO_MAGIC_LOW) {
- log(this._buffer);
-
log(
- `receive server data error, buffer[0] is 0xda ${magicHigh ==
+ `%s receive server data error, buffer[0] is 0xda ${magicHigh ==
0xda} buffer[1] is 0xbb ${magicLow == 0xbb}`,
+ this.flag,
);
const magicHighIndex = this._buffer.indexOf(DUBBO_MAGIC_HIGH);
const magicLowIndex = this._buffer.indexOf(DUBBO_MAGIC_LOW);
- log(`magicHigh index#${magicHighIndex}`);
- log(`magicLow index#${magicLowIndex}`);
+ log(`%s magicHigh index#${magicHighIndex}`, this.flag);
+ log(`%s magicLow index#${magicLowIndex}`, this.flag);
if (magicHighIndex === -1) {
// 没有找到magicHigh,则将整个buffer清空
@@ -110,7 +109,7 @@
//数据量还不够头部的长度
if (bufferLength < DUBBO_HEADER_LENGTH) {
//waiting
- log('bufferLength < header length');
+ log('%s bufferLength < header length', this.flag);
return;
}
@@ -124,11 +123,11 @@
header[15],
]);
const bodyLength = fromBytes4(bodyLengthBuff);
- log('body length', bodyLength);
+ log('%s body length %d', this.flag, bodyLength);
if (DUBBO_HEADER_LENGTH + bodyLength > bufferLength) {
//waiting
- log('header length + body length > buffer length');
+ log('%s header length + body length > buffer length', this.flag);
return;
}
const dataBuffer = this._buffer.slice(
diff --git a/packages/dubbo/src/serialization/heartbeat.ts b/packages/dubbo/src/serialization/heartbeat.ts
index 583ff0f..7609c29 100644
--- a/packages/dubbo/src/serialization/heartbeat.ts
+++ b/packages/dubbo/src/serialization/heartbeat.ts
@@ -59,6 +59,9 @@
this._transport = transport;
this._onTimeout = onTimeout || noop;
+ const who = this._type === 'request' ? 'dubbo-consumer' : 'dubbo-server';
+ log('%s init heartbeat manager', who);
+
// init heartbaet
this.init();
}
@@ -123,7 +126,8 @@
* encode heartbeat
*/
encode(): Buffer {
- log('encode heartbeat');
+ const who = this._type === 'request' ? 'dubbo-consumer' : 'dubbo-server';
+ log('%s encode heartbeat', who);
const buffer = Buffer.alloc(DUBBO_HEADER_LENGTH + 1);
diff --git a/packages/dubbo/src/server/server.ts b/packages/dubbo/src/server/server.ts
index 5338658..97dbbce 100644
--- a/packages/dubbo/src/server/server.ts
+++ b/packages/dubbo/src/server/server.ts
@@ -100,6 +100,7 @@
}
private _handleSocketRequest = (socket: Socket) => {
+ log('tcp socket establish connection');
// init heartbeat
const heartbeat = HeartBeat.from({
type: 'response',
@@ -107,7 +108,7 @@
onTimeout: () => socket.destroy(),
});
- DecodeBuffer.from(socket).subscribe(async data => {
+ DecodeBuffer.from(socket, 'dubbo-server').subscribe(async data => {
if (HeartBeat.isHeartBeat(data)) {
log(`receive socket client heartbeat`);
heartbeat.emit();