增加解码返回数据类型, 修复当网络断开后, 心跳无法检测是否成功
diff --git a/packages/dubbo/src/__tests__/socket-worker-test.ts b/packages/dubbo/src/__tests__/socket-worker-test.ts
new file mode 100644
index 0000000..a25aeca
--- /dev/null
+++ b/packages/dubbo/src/__tests__/socket-worker-test.ts
@@ -0,0 +1,4 @@
+import SocketWorker from '../socket-worker';
+
+const worker = SocketWorker.from('47.110.39.117:8888');
+console.log(worker);
diff --git a/packages/dubbo/src/decode-buffer.ts b/packages/dubbo/src/decode-buffer.ts
index 54d4dc5..1df3e46 100644
--- a/packages/dubbo/src/decode-buffer.ts
+++ b/packages/dubbo/src/decode-buffer.ts
@@ -25,6 +25,11 @@
const MAGIC_LOW = 0xbb;
const HEADER_LENGTH = 16;
const log = debug('dubbo:decode-buffer');
+export const enum DataType {
+ Noop,
+ HeardBeat,
+ Data,
+}
/**
* 在并发的tcp数据传输中,会出现少包,粘包的现象
@@ -52,7 +57,7 @@
return new DecodeBuffer(pid);
}
- receive(data: Buffer) {
+ receive(data: Buffer): DataType {
//concat bytes
this._buffer = Buffer.concat([this._buffer, data]);
let bufferLength = this._buffer.length;
@@ -78,7 +83,7 @@
//没有找到magicHigh或者magicLow
if (magicHighIndex === -1 || magicLowIndex === -1) {
- return;
+ return DataType.Noop;
}
if (
@@ -89,7 +94,7 @@
this._buffer = this._buffer.slice(magicHighIndex);
bufferLength = this._buffer.length;
}
- return;
+ return DataType.Noop;
}
if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
@@ -97,7 +102,7 @@
if (bufferLength < HEADER_LENGTH) {
//waiting
log('bufferLength < header length');
- return;
+ return DataType.Noop;
}
//取出头部字节
@@ -117,18 +122,19 @@
log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
- return;
+ return DataType.HeardBeat;
}
if (HEADER_LENGTH + bodyLength > bufferLength) {
//waiting
log('header length + body length > buffer length');
- return;
+ return DataType.Noop;
}
const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
this._subscriber(dataBuffer);
+ return DataType.Data;
}
}
}
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index 23c872b..f5cdd90 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -19,7 +19,7 @@
import net from 'net';
import Context from './context';
import {decode} from './decode';
-import DecodeBuffer from './decode-buffer';
+import DecodeBuffer, {DataType} from './decode-buffer';
import DubboEncoder from './encode';
import HeartBeat from './heartbeat';
import {SOCKET_STATUS} from './socket-status';
@@ -33,7 +33,8 @@
//重试频率
const RETRY_TIME = 3000;
//心跳频率
-const HEART_BEAT = 180 * 1000;
+const HEART_BEAT = 2000;
+const RETRY_HEARD_BEAT_TIME = 20;
const log = debug('dubbo:socket-worker');
/**
@@ -51,6 +52,7 @@
this.host = host;
this.port = port;
this._retry = RETRY_NUM;
+ this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
this._status = SOCKET_STATUS.PADDING;
log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
@@ -77,6 +79,8 @@
public readonly port: number;
private _retry: number;
+ private _retryTimeoutId: NodeJS.Timer;
+ private _retryHeartBeat: number;
private _heartBeatTimer: NodeJS.Timer;
private _socket: net.Socket;
private _status: SOCKET_STATUS;
@@ -153,10 +157,14 @@
// `SocketWorker#${this.pid} =connecting=> ${this.host}:${this.port}`,
// );
+ if (this._socket) {
+ this._socket.destroy();
+ }
+
this._socket = new net.Socket();
// Disable the Nagle algorithm.
- this._socket.setNoDelay();
-
+ // this._socket.setTimeout(10 * 1000)
+ // this._socket.setKeepAlive(true)
this._socket
.connect(
this.port,
@@ -179,6 +187,7 @@
//reset retry number
this._retry = RETRY_NUM;
+ this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
//notifiy subscriber, the socketworker was connected successfully
this._subscriber.onConnect({
@@ -188,15 +197,26 @@
});
//heartbeart
+ //when network is close, the connection maybe not close, so check the heart beat times
this._heartBeatTimer = setInterval(() => {
- log('emit heartbeat');
- this._socket.write(HeartBeat.encode());
+ if (this._retryHeartBeat > 0) {
+ log('emit heartbeat');
+ this._retryHeartBeat--;
+ this._socket.write(HeartBeat.encode());
+ } else {
+ this._onClose(false);
+ }
}, HEART_BEAT);
};
private _onData = data => {
log(`SocketWorker#${this.pid} =receive data=> ${this.host}:${this.port}`);
- this._decodeBuff.receive(data);
+ const dataType = this._decodeBuff.receive(data);
+ switch (dataType) {
+ case DataType.HeardBeat:
+ this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
+ break;
+ }
};
private _onError = (error: Error) => {
@@ -238,7 +258,8 @@
//set current status
this._status = SOCKET_STATUS.RETRY;
//retry when delay RETRY_TIME
- setTimeout(() => {
+ clearTimeout(this._retryTimeoutId);
+ this._retryTimeoutId = setTimeout(() => {
this._retry--;
this._initSocket();
}, RETRY_TIME);