Merge pull request #151 from AtarisMio/patch-filter-private-field

过滤private field不翻译
diff --git a/packages/dubbo/src/decode-buffer.ts b/packages/dubbo/src/decode-buffer.ts
index 3760b46..3fdb578 100644
--- a/packages/dubbo/src/decode-buffer.ts
+++ b/packages/dubbo/src/decode-buffer.ts
@@ -17,7 +17,6 @@
 
 import debug from 'debug';
 import {fromBytes4} from './byte';
-import HeartBeat from './heartbeat';
 import {IObservable, TDecodeBuffSubscriber} from './types';
 import {noop} from './util';
 
@@ -25,11 +24,6 @@
 const MAGIC_LOW = 0xbb;
 const HEADER_LENGTH = 16;
 const log = debug('dubbo:decode-buffer');
-export const enum DataType {
-  Noop,
-  HeardBeat,
-  Data,
-}
 
 /**
  * 在并发的tcp数据传输中,会出现少包,粘包的现象
@@ -40,24 +34,17 @@
   implements IObservable<TDecodeBuffSubscriber> {
   /**
    * 初始化一个DecodeBuffer
-   * @param pid socket-worker的pid
    */
-  private constructor(pid: number) {
+  constructor() {
     log('new DecodeBuffer');
-    this._pid = pid;
     this._buffer = Buffer.alloc(0);
     this._subscriber = noop;
   }
 
-  private readonly _pid: number;
   private _buffer: Buffer;
   private _subscriber: Function;
 
-  static from(pid: number) {
-    return new DecodeBuffer(pid);
-  }
-
-  receive(data: Buffer): DataType {
+  receive(data: Buffer) {
     //concat bytes
     this._buffer = Buffer.concat([this._buffer, data]);
     let bufferLength = this._buffer.length;
@@ -76,33 +63,41 @@
             0xda} buffer[1] is 0xbb ${magicLow == 0xbb}`,
         );
 
-        const magicHighIndex = this._buffer.indexOf(magicHigh);
-        const magicLowIndex = this._buffer.indexOf(magicLow);
+        const magicHighIndex = this._buffer.indexOf(MAGIC_HIGH);
+        const magicLowIndex = this._buffer.indexOf(MAGIC_LOW);
         log(`magicHigh index#${magicHighIndex}`);
         log(`magicLow index#${magicLowIndex}`);
 
-        //没有找到magicHigh或者magicLow
-        if (magicHighIndex === -1 || magicLowIndex === -1) {
-          return DataType.Noop;
+        if (magicHighIndex === -1) {
+          // 没有找到magicHigh,则将整个buffer清空
+          this._buffer = this._buffer.slice(bufferLength);
+        } else if (magicLowIndex === -1) {
+          if (magicHighIndex === bufferLength - 1) {
+            // 如果magicHigh是buffer最后一位,则整个buffer只保留最后一位
+            this._buffer = this._buffer.slice(magicHighIndex);
+          } else {
+            // 如果magicHigh不是buffer最后一位,而且整个buffer里没有magicLow,则清空buffer
+            this._buffer = this._buffer.slice(bufferLength);
+          }
+        } else {
+          if (magicLowIndex - magicHighIndex === 1) {
+            // magicHigh和magicLow在buffer中间相邻位置,则buffer移动到magicHigh的位置
+            this._buffer = this._buffer.slice(magicHighIndex);
+          } else {
+            // magicHigh和magicLow不相邻,则buffer移动到magicHigh的下一个位置
+            this._buffer = this._buffer.slice(magicHighIndex + 1);
+          }
         }
-
-        if (
-          magicHighIndex !== -1 &&
-          magicLowIndex !== -1 &&
-          magicLowIndex - magicHighIndex === 1
-        ) {
-          this._buffer = this._buffer.slice(magicHighIndex);
-          bufferLength = this._buffer.length;
+        bufferLength = this._buffer.length;
+        if (bufferLength < HEADER_LENGTH) {
+          return;
         }
-        return DataType.Noop;
-      }
-
-      if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
+      } else {
         //数据量还不够头部的长度
         if (bufferLength < HEADER_LENGTH) {
           //waiting
           log('bufferLength < header length');
-          return DataType.Noop;
+          return;
         }
 
         //取出头部字节
@@ -117,18 +112,10 @@
         const bodyLength = fromBytes4(bodyLengthBuff);
         log('body length', bodyLength);
 
-        //判断是不是心跳
-        if (HeartBeat.isHeartBeat(header)) {
-          log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
-          this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
-          bufferLength = this._buffer.length;
-          return DataType.HeardBeat;
-        }
-
         if (HEADER_LENGTH + bodyLength > bufferLength) {
           //waiting
           log('header length + body length > buffer length');
-          return DataType.Noop;
+          return;
         }
         const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
         this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
@@ -136,7 +123,6 @@
         this._subscriber(dataBuffer);
       }
     }
-    return DataType.Data;
   }
 
   clearBuffer() {
diff --git a/packages/dubbo/src/dubbo.ts b/packages/dubbo/src/dubbo.ts
index b2a14ed..0791199 100644
--- a/packages/dubbo/src/dubbo.ts
+++ b/packages/dubbo/src/dubbo.ts
@@ -89,7 +89,9 @@
     this._registryService(props.service);
     log('interfaces:|>', this._interfaces);
 
-    this._readyResolve = noop;
+    this._readyPromise = new Promise(resolve => {
+      this._readyResolve = resolve;
+    });
     this._subscriber = {
       onTrace: noop,
     };
@@ -116,6 +118,7 @@
   }
 
   private _interfaces: Array<string>;
+  private _readyPromise: Promise<void>;
   private _readyResolve: Function;
   private _subscriber: IDubboSubscriber;
   private readonly _queue: Queue;
@@ -237,9 +240,7 @@
    * 其他的框架类似
    */
   ready() {
-    return new Promise(resolve => {
-      this._readyResolve = resolve;
-    });
+    return this._readyPromise;
   }
 
   subscribe(subscriber: IDubboSubscriber) {
diff --git a/packages/dubbo/src/registry/zookeeper.ts b/packages/dubbo/src/registry/zookeeper.ts
index fe60f1b..53fdc90 100644
--- a/packages/dubbo/src/registry/zookeeper.ts
+++ b/packages/dubbo/src/registry/zookeeper.ts
@@ -114,6 +114,7 @@
    * 重连
    */
   private _reconnect() {
+    clearInterval(this._checkTimer);
     if (this._client) {
       this._client.close();
     }
@@ -213,23 +214,26 @@
     });
 
     //the connection between client and server is dropped.
-    this._client.on('disconnected', () => {
+    this._client.once('disconnected', () => {
       log(`zk ${register} had disconnected`);
       clearTimeout(timeId);
-      callback(
+      traceErr(
         new ZookeeperDisconnectedError(
           `ZooKeeper was disconnected. current state is ${this._client.getState()} `,
         ),
       );
+      this._reconnect();
     });
 
-    this._client.on('expired', () => {
+    this._client.once('expired', () => {
+      clearTimeout(timeId);
       log(`zk ${register} had session expired`);
-      callback(
+      traceErr(
         new ZookeeperExpiredError(
           `Zookeeper was session Expired Error current state ${this._client.getState()}`,
         ),
       );
+      this._client.close();
     });
 
     //connect
@@ -254,21 +258,13 @@
         return;
       }
 
-      //clear current dubbo interface
-      const agentAddrList = [];
-      const urls = [];
-      for (let serviceUrl of dubboServiceUrls) {
-        const url = DubboUrl.from(serviceUrl);
-        const {host, port} = url;
-        agentAddrList.push(`${host}:${port}`);
-        urls.push(url);
-      }
-
-      this._dubboServiceUrlMap.set(dubboInterface, urls);
-
-      if (agentAddrList.length === 0) {
+      const urls = dubboServiceUrls.map(serviceUrl => DubboUrl.from(serviceUrl));
+      if (urls.length === 0) {
         traceErr(new Error(`trigger watch ${e} agentList is empty`));
+        return;
       }
+      //clear current dubbo interface
+      this._dubboServiceUrlMap.set(dubboInterface, urls);
 
       if (isDevEnv) {
         log('agentSet:|> %O', this._allAgentAddrSet);
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index 66ba5b7..d1a9400 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -19,7 +19,7 @@
 import net from 'net';
 import Context from './context';
 import {decode} from './decode';
-import DecodeBuffer, {DataType} from './decode-buffer';
+import DecodeBuffer from './decode-buffer';
 import DubboEncoder from './encode';
 import HeartBeat from './heartbeat';
 import {SOCKET_STATUS} from './socket-status';
@@ -66,7 +66,7 @@
     };
 
     //init decodeBuffer
-    this._decodeBuff = DecodeBuffer.from(pid).subscribe(
+    this._decodeBuff = new DecodeBuffer().subscribe(
       this._onSubscribeDecodeBuff,
     );
 
@@ -212,12 +212,7 @@
 
   private _onData = data => {
     log(`SocketWorker#${this.pid}  =receive data=> ${this.host}:${this.port}`);
-    const dataType = this._decodeBuff.receive(data);
-    switch (dataType) {
-      case DataType.HeardBeat:
-        this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
-        break;
-    }
+    this._decodeBuff.receive(data);
   };
 
   private _onError = (error: Error) => {
@@ -277,8 +272,13 @@
   };
 
   private _onSubscribeDecodeBuff = (data: Buffer) => {
-    const json = decode(data);
-    log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
-    this._subscriber.onData(json);
+    if (HeartBeat.isHeartBeat(data)) {
+      log(`SocketWorker#${this.pid} <=receive= heartbeat data.`);
+      this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
+    } else {
+      const json = decode(data);
+      log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
+      this._subscriber.onData(json);
+    }
   };
 }