Merge pull request #151 from AtarisMio/patch-filter-private-field
过滤private field不翻译
diff --git a/packages/dubbo/src/decode-buffer.ts b/packages/dubbo/src/decode-buffer.ts
index 3760b46..3fdb578 100644
--- a/packages/dubbo/src/decode-buffer.ts
+++ b/packages/dubbo/src/decode-buffer.ts
@@ -17,7 +17,6 @@
import debug from 'debug';
import {fromBytes4} from './byte';
-import HeartBeat from './heartbeat';
import {IObservable, TDecodeBuffSubscriber} from './types';
import {noop} from './util';
@@ -25,11 +24,6 @@
const MAGIC_LOW = 0xbb;
const HEADER_LENGTH = 16;
const log = debug('dubbo:decode-buffer');
-export const enum DataType {
- Noop,
- HeardBeat,
- Data,
-}
/**
* 在并发的tcp数据传输中,会出现少包,粘包的现象
@@ -40,24 +34,17 @@
implements IObservable<TDecodeBuffSubscriber> {
/**
* 初始化一个DecodeBuffer
- * @param pid socket-worker的pid
*/
- private constructor(pid: number) {
+ constructor() {
log('new DecodeBuffer');
- this._pid = pid;
this._buffer = Buffer.alloc(0);
this._subscriber = noop;
}
- private readonly _pid: number;
private _buffer: Buffer;
private _subscriber: Function;
- static from(pid: number) {
- return new DecodeBuffer(pid);
- }
-
- receive(data: Buffer): DataType {
+ receive(data: Buffer) {
//concat bytes
this._buffer = Buffer.concat([this._buffer, data]);
let bufferLength = this._buffer.length;
@@ -76,33 +63,41 @@
0xda} buffer[1] is 0xbb ${magicLow == 0xbb}`,
);
- const magicHighIndex = this._buffer.indexOf(magicHigh);
- const magicLowIndex = this._buffer.indexOf(magicLow);
+ const magicHighIndex = this._buffer.indexOf(MAGIC_HIGH);
+ const magicLowIndex = this._buffer.indexOf(MAGIC_LOW);
log(`magicHigh index#${magicHighIndex}`);
log(`magicLow index#${magicLowIndex}`);
- //没有找到magicHigh或者magicLow
- if (magicHighIndex === -1 || magicLowIndex === -1) {
- return DataType.Noop;
+ if (magicHighIndex === -1) {
+ // 没有找到magicHigh,则将整个buffer清空
+ this._buffer = this._buffer.slice(bufferLength);
+ } else if (magicLowIndex === -1) {
+ if (magicHighIndex === bufferLength - 1) {
+ // 如果magicHigh是buffer最后一位,则整个buffer只保留最后一位
+ this._buffer = this._buffer.slice(magicHighIndex);
+ } else {
+ // 如果magicHigh不是buffer最后一位,而且整个buffer里没有magicLow,则清空buffer
+ this._buffer = this._buffer.slice(bufferLength);
+ }
+ } else {
+ if (magicLowIndex - magicHighIndex === 1) {
+ // magicHigh和magicLow在buffer中间相邻位置,则buffer移动到magicHigh的位置
+ this._buffer = this._buffer.slice(magicHighIndex);
+ } else {
+ // magicHigh和magicLow不相邻,则buffer移动到magicHigh的下一个位置
+ this._buffer = this._buffer.slice(magicHighIndex + 1);
+ }
}
-
- if (
- magicHighIndex !== -1 &&
- magicLowIndex !== -1 &&
- magicLowIndex - magicHighIndex === 1
- ) {
- this._buffer = this._buffer.slice(magicHighIndex);
- bufferLength = this._buffer.length;
+ bufferLength = this._buffer.length;
+ if (bufferLength < HEADER_LENGTH) {
+ return;
}
- return DataType.Noop;
- }
-
- if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
+ } else {
//数据量还不够头部的长度
if (bufferLength < HEADER_LENGTH) {
//waiting
log('bufferLength < header length');
- return DataType.Noop;
+ return;
}
//取出头部字节
@@ -117,18 +112,10 @@
const bodyLength = fromBytes4(bodyLengthBuff);
log('body length', bodyLength);
- //判断是不是心跳
- if (HeartBeat.isHeartBeat(header)) {
- log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
- this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
- bufferLength = this._buffer.length;
- return DataType.HeardBeat;
- }
-
if (HEADER_LENGTH + bodyLength > bufferLength) {
//waiting
log('header length + body length > buffer length');
- return DataType.Noop;
+ return;
}
const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
@@ -136,7 +123,6 @@
this._subscriber(dataBuffer);
}
}
- return DataType.Data;
}
clearBuffer() {
diff --git a/packages/dubbo/src/dubbo.ts b/packages/dubbo/src/dubbo.ts
index b2a14ed..0791199 100644
--- a/packages/dubbo/src/dubbo.ts
+++ b/packages/dubbo/src/dubbo.ts
@@ -89,7 +89,9 @@
this._registryService(props.service);
log('interfaces:|>', this._interfaces);
- this._readyResolve = noop;
+ this._readyPromise = new Promise(resolve => {
+ this._readyResolve = resolve;
+ });
this._subscriber = {
onTrace: noop,
};
@@ -116,6 +118,7 @@
}
private _interfaces: Array<string>;
+ private _readyPromise: Promise<void>;
private _readyResolve: Function;
private _subscriber: IDubboSubscriber;
private readonly _queue: Queue;
@@ -237,9 +240,7 @@
* 其他的框架类似
*/
ready() {
- return new Promise(resolve => {
- this._readyResolve = resolve;
- });
+ return this._readyPromise;
}
subscribe(subscriber: IDubboSubscriber) {
diff --git a/packages/dubbo/src/registry/zookeeper.ts b/packages/dubbo/src/registry/zookeeper.ts
index fe60f1b..53fdc90 100644
--- a/packages/dubbo/src/registry/zookeeper.ts
+++ b/packages/dubbo/src/registry/zookeeper.ts
@@ -114,6 +114,7 @@
* 重连
*/
private _reconnect() {
+ clearInterval(this._checkTimer);
if (this._client) {
this._client.close();
}
@@ -213,23 +214,26 @@
});
//the connection between client and server is dropped.
- this._client.on('disconnected', () => {
+ this._client.once('disconnected', () => {
log(`zk ${register} had disconnected`);
clearTimeout(timeId);
- callback(
+ traceErr(
new ZookeeperDisconnectedError(
`ZooKeeper was disconnected. current state is ${this._client.getState()} `,
),
);
+ this._reconnect();
});
- this._client.on('expired', () => {
+ this._client.once('expired', () => {
+ clearTimeout(timeId);
log(`zk ${register} had session expired`);
- callback(
+ traceErr(
new ZookeeperExpiredError(
`Zookeeper was session Expired Error current state ${this._client.getState()}`,
),
);
+ this._client.close();
});
//connect
@@ -254,21 +258,13 @@
return;
}
- //clear current dubbo interface
- const agentAddrList = [];
- const urls = [];
- for (let serviceUrl of dubboServiceUrls) {
- const url = DubboUrl.from(serviceUrl);
- const {host, port} = url;
- agentAddrList.push(`${host}:${port}`);
- urls.push(url);
- }
-
- this._dubboServiceUrlMap.set(dubboInterface, urls);
-
- if (agentAddrList.length === 0) {
+ const urls = dubboServiceUrls.map(serviceUrl => DubboUrl.from(serviceUrl));
+ if (urls.length === 0) {
traceErr(new Error(`trigger watch ${e} agentList is empty`));
+ return;
}
+ //clear current dubbo interface
+ this._dubboServiceUrlMap.set(dubboInterface, urls);
if (isDevEnv) {
log('agentSet:|> %O', this._allAgentAddrSet);
diff --git a/packages/dubbo/src/socket-worker.ts b/packages/dubbo/src/socket-worker.ts
index 66ba5b7..d1a9400 100644
--- a/packages/dubbo/src/socket-worker.ts
+++ b/packages/dubbo/src/socket-worker.ts
@@ -19,7 +19,7 @@
import net from 'net';
import Context from './context';
import {decode} from './decode';
-import DecodeBuffer, {DataType} from './decode-buffer';
+import DecodeBuffer from './decode-buffer';
import DubboEncoder from './encode';
import HeartBeat from './heartbeat';
import {SOCKET_STATUS} from './socket-status';
@@ -66,7 +66,7 @@
};
//init decodeBuffer
- this._decodeBuff = DecodeBuffer.from(pid).subscribe(
+ this._decodeBuff = new DecodeBuffer().subscribe(
this._onSubscribeDecodeBuff,
);
@@ -212,12 +212,7 @@
private _onData = data => {
log(`SocketWorker#${this.pid} =receive data=> ${this.host}:${this.port}`);
- const dataType = this._decodeBuff.receive(data);
- switch (dataType) {
- case DataType.HeardBeat:
- this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
- break;
- }
+ this._decodeBuff.receive(data);
};
private _onError = (error: Error) => {
@@ -277,8 +272,13 @@
};
private _onSubscribeDecodeBuff = (data: Buffer) => {
- const json = decode(data);
- log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
- this._subscriber.onData(json);
+ if (HeartBeat.isHeartBeat(data)) {
+ log(`SocketWorker#${this.pid} <=receive= heartbeat data.`);
+ this._retryHeartBeat = RETRY_HEARD_BEAT_TIME; //reset heart beat times
+ } else {
+ const json = decode(data);
+ log(`SocketWorker#${this.pid} <=received=> dubbo result %O`, json);
+ this._subscriber.onData(json);
+ }
};
}