blob: 54d4dc5c76d0d7740be2bab9b8c5c0dd72e59151 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import debug from 'debug';
import {fromBytes4} from './byte';
import HeartBeat from './heartbeat';
import {IObservable, TDecodeBuffSubscriber} from './types';
import {noop} from './util';
const MAGIC_HIGH = 0xda;
const MAGIC_LOW = 0xbb;
const HEADER_LENGTH = 16;
const log = debug('dubbo:decode-buffer');
/**
* 在并发的tcp数据传输中,会出现少包,粘包的现象
* 好在tcp的传输是可以保证顺序的
* 我们需要抽取一个buff来统一处理这些数据
*/
export default class DecodeBuffer
implements IObservable<TDecodeBuffSubscriber> {
/**
* 初始化一个DecodeBuffer
* @param pid socket-worker的pid
*/
private constructor(pid: number) {
log('new DecodeBuffer');
this._pid = pid;
this._buffer = Buffer.alloc(0);
this._subscriber = noop;
}
private readonly _pid: number;
private _buffer: Buffer;
private _subscriber: Function;
static from(pid: number) {
return new DecodeBuffer(pid);
}
receive(data: Buffer) {
//concat bytes
this._buffer = Buffer.concat([this._buffer, data]);
let bufferLength = this._buffer.length;
while (bufferLength >= HEADER_LENGTH) {
//判断buffer 0, 1 是不是dubbo的magic high , magic low
const magicHigh = this._buffer[0];
const magicLow = this._buffer[1];
//如果不是magichight magiclow 做个容错
if (magicHigh != MAGIC_HIGH || magicLow != MAGIC_LOW) {
log(this._buffer);
log(
`receive server data error, buffer[0] is 0xda ${magicHigh ==
0xda} buffer[1] is 0xbb ${magicLow == 0xbb}`,
);
const magicHighIndex = this._buffer.indexOf(magicHigh);
const magicLowIndex = this._buffer.indexOf(magicLow);
log(`magicHigh index#${magicHighIndex}`);
log(`magicLow index#${magicLowIndex}`);
//没有找到magicHigh或者magicLow
if (magicHighIndex === -1 || magicLowIndex === -1) {
return;
}
if (
magicHighIndex !== -1 &&
magicLowIndex !== -1 &&
magicLowIndex - magicHighIndex === 1
) {
this._buffer = this._buffer.slice(magicHighIndex);
bufferLength = this._buffer.length;
}
return;
}
if (magicHigh === MAGIC_HIGH && magicLow === MAGIC_LOW) {
//数据量还不够头部的长度
if (bufferLength < HEADER_LENGTH) {
//waiting
log('bufferLength < header length');
return;
}
//取出头部字节
const header = this._buffer.slice(0, HEADER_LENGTH);
//计算body的长度字节位置[12-15]
const bodyLengthBuff = Buffer.from([
header[12],
header[13],
header[14],
header[15],
]);
const bodyLength = fromBytes4(bodyLengthBuff);
log('body length', bodyLength);
//判断是不是心跳
if (HeartBeat.isHeartBeat(header)) {
log(`SocketWorker#${this._pid} <=receive= heartbeat data.`);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
return;
}
if (HEADER_LENGTH + bodyLength > bufferLength) {
//waiting
log('header length + body length > buffer length');
return;
}
const dataBuffer = this._buffer.slice(0, HEADER_LENGTH + bodyLength);
this._buffer = this._buffer.slice(HEADER_LENGTH + bodyLength);
bufferLength = this._buffer.length;
this._subscriber(dataBuffer);
}
}
}
clearBuffer() {
//reduce memory alloc
if (this._buffer.length > 0) {
this._buffer = Buffer.alloc(0);
}
}
subscribe(subscriber: TDecodeBuffSubscriber) {
this._subscriber = subscriber;
return this;
}
}