增加解码返回数据类型, 修复当网络断开后, 心跳无法检测是否成功
diff --git a/packages/dubbo/src/__tests__/socket-worker-test.ts b/packages/dubbo/src/__tests__/socket-worker-test.ts
new file mode 100644
index 0000000..a25aeca
--- /dev/null
+++ b/packages/dubbo/src/__tests__/socket-worker-test.ts
@@ -0,0 +1,4 @@
+import SocketWorker from '../socket-worker';
+
+const worker = SocketWorker.from('47.110.39.117:8888');
+console.log(worker);
diff --git a/packages/dubbo/src/decode-buffer.ts b/packages/dubbo/src/decode-buffer.ts
index 54d4dc5..1df3e46 100644
--- a/packages/dubbo/src/decode-buffer.ts
+++ b/packages/dubbo/src/decode-buffer.ts
@@ -25,6 +25,11 @@
 const MAGIC_LOW = 0xbb;
 const HEADER_LENGTH = 16;
 const log = debug('dubbo:decode-buffer');
+export const enum DataType {
+  Noop,
+  HeardBeat,
+  Data,
+}
 
 /**
  * 在并发的tcp数据传输中,会出现少包,粘包的现象
@@ -52,7 +57,7 @@
     return new DecodeBuffer(pid);
   }
 
-  receive(data: Buffer) {
+  receive(data: Buffer): DataType {
     //concat bytes
     this._buffer = Buffer.concat([this._buffer, data]);
     let bufferLength = this._buffer.length;
@@ -78,7 +83,7 @@
 
         //没有找到magicHigh或者magicLow
         if (magicHighIndex === -1 || magicLowIndex === -1) {
-          return;
+          return DataType.Noop;
         }
 
         if (
@@ -89,7 +94,7 @@
           this._buffer = this._buffer.slice(magicHighIndex);
           bufferLength = this._buffer.length;
         }
-        return;
+        return DataType.Noop;
       }
 
       if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
@@ -97,7 +102,7 @@
         if (bufferLength < HEADER_LENGTH) {
           //waiting
           log('bufferLength < header length');
-          return;
+          return DataType.Noop;
         }
 
         //取出头部字节
@@ -117,18 +122,19 @@
           log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
           this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
           bufferLength = this._buffer.length;
-          return;
+          return DataType.HeardBeat;
         }
 
         if (HEADER_LENGTH + bodyLength > bufferLength) {
           //waiting
           log('header length + body length > buffer length');
-          return;
+          return DataType.Noop;
         }
         const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
         this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
         bufferLength = this._buffer.length;
         this._subscriber(dataBuffer);
+        return DataType.Data;
       }
     }
   }
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index 23c872b..f5cdd90 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -19,7 +19,7 @@
 import net from 'net';
 import Context from './context';
 import {decode} from './decode';
-import DecodeBuffer from './decode-buffer';
+import DecodeBuffer, {DataType} from './decode-buffer';
 import DubboEncoder from './encode';
 import HeartBeat from './heartbeat';
 import {SOCKET_STATUS} from './socket-status';
@@ -33,7 +33,8 @@
 //重试频率
 const RETRY_TIME = 3000;
 //心跳频率
-const HEART_BEAT = 180 * 1000;
+const HEART_BEAT = 2000;
+const RETRY_HEARD_BEAT_TIME = 20;
 const log = debug('dubbo:socket-worker');
 
 /**
@@ -51,6 +52,7 @@
     this.host = host;
     this.port = port;
     this._retry = RETRY_NUM;
+    this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
     this._status = SOCKET_STATUS.PADDING;
 
     log('new SocketWorker#%d|> %s %s', pid, host + ':' + port, this._status);
@@ -77,6 +79,8 @@
   public readonly port: number;
 
   private _retry: number;
+  private _retryTimeoutId: NodeJS.Timer;
+  private _retryHeartBeat: number;
   private _heartBeatTimer: NodeJS.Timer;
   private _socket: net.Socket;
   private _status: SOCKET_STATUS;
@@ -153,10 +157,14 @@
     //   `SocketWorker#${this.pid} =connecting=> ${this.host}:${this.port}`,
     // );
 
+    if (this._socket) {
+      this._socket.destroy();
+    }
+
     this._socket = new net.Socket();
     // Disable the Nagle algorithm.
-    this._socket.setNoDelay();
-
+    // this._socket.setTimeout(10 * 1000)
+    // this._socket.setKeepAlive(true)
     this._socket
       .connect(
         this.port,
@@ -179,6 +187,7 @@
 
     //reset retry number
     this._retry = RETRY_NUM;
+    this._retryHeartBeat = RETRY_HEARD_BEAT_TIME;
 
     //notifiy subscriber, the socketworker was connected successfully
     this._subscriber.onConnect({
@@ -188,15 +197,26 @@
     });
 
     //heartbeart
+    //when network is close, the connection maybe not close, so check the heart beat times
     this._heartBeatTimer = setInterval(() => {
-      log('emit heartbeat');
-      this._socket.write(HeartBeat.encode());
+      if (this._retryHeartBeat > 0) {
+        log('emit heartbeat');
+        this._retryHeartBeat--;
+        this._socket.write(HeartBeat.encode());
+      } else {
+        this._onClose(false);
+      }
     }, HEART_BEAT);
   };
 
   private _onData = data => {
     log(`SocketWorker#${this.pid}  =receive data=> ${this.host}:${this.port}`);
-    this._decodeBuff.receive(data);
+    const dataType = this._decodeBuff.receive(data);
+    switch (dataType) {
+      case DataType.HeardBeat:
+        this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
+        break;
+    }
   };
 
   private _onError = (error: Error) => {
@@ -238,7 +258,8 @@
       //set current status
       this._status = SOCKET_STATUS.RETRY;
       //retry when delay RETRY_TIME
-      setTimeout(() => {
+      clearTimeout(this._retryTimeoutId);
+      this._retryTimeoutId = setTimeout(() => {
         this._retry--;
         this._initSocket();
       }, RETRY_TIME);