blob: ecd60ab4a6e211ec35dbc469bab38637fd03bfbb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "network/connection.h"
#include <algorithm>
#include <list>
#include <utility>
#if defined(__APPLE__)
#include <sys/uio.h>
#endif
#include "glog/logging.h"
#include "network/regevent.h"
// How many times should we wait to see a buffer full while enqueueing data
// before declaring start of back pressure
const sp_uint8 __SYSTEM_MIN_NUM_ENQUEUES_WITH_BUFFER_FULL__ = 3;
Connection::Connection(ConnectionEndPoint* endpoint, ConnectionOptions* options,
std::shared_ptr<EventLoop> eventLoop)
: BaseConnection(endpoint, options, eventLoop) {
mIncomingPacket = new IncomingPacket(mOptions->max_packet_size_);
mOnNewPacket = NULL;
mOnConnectionBufferEmpty = NULL;
mOnConnectionBufferFull = NULL;
mCausedBackPressure = false;
mUnderBackPressure = false;
mNumEnqueuesWithBufferFull = 0;
}
Connection::~Connection() {
if (hasCausedBackPressure()) {
mOnConnectionBufferEmpty(this);
}
delete mIncomingPacket;
}
sp_int32 Connection::sendPacket(OutgoingPacket* packet) {
struct evbuffer* packet_buffer = packet->release_buffer();
delete packet;
if (write(packet_buffer) < 0) return -1;
if (!hasCausedBackPressure()) {
// Are we above the threshold?
if (getOutstandingBytes() >= mOptions->high_watermark_) {
// Have we been above the threshold enough number of times?
if (++mNumEnqueuesWithBufferFull > __SYSTEM_MIN_NUM_ENQUEUES_WITH_BUFFER_FULL__) {
mNumEnqueuesWithBufferFull = 0;
if (mOnConnectionBufferFull) {
mOnConnectionBufferFull(this);
}
}
} else {
mNumEnqueuesWithBufferFull = 0;
}
}
return 0;
}
void Connection::registerForNewPacket(VCallback<IncomingPacket*> cb) {
mOnNewPacket = std::move(cb);
}
sp_int32 Connection::registerForBackPressure(VCallback<Connection*> cbStarter,
VCallback<Connection*> cbReliever) {
mOnConnectionBufferFull = std::move(cbStarter);
mOnConnectionBufferEmpty = std::move(cbReliever);
return 0;
}
void Connection::releiveBackPressure() {
// Check if we reduced the write buffer to something below the back
// pressure threshold
if (hasCausedBackPressure()) {
mOnConnectionBufferEmpty(this);
}
}
sp_int32 Connection::readFromEndPoint(struct bufferevent* _buffer) {
std::queue<IncomingPacket*> received_packets;
while (1) {
sp_int32 read_status = mIncomingPacket->Read(_buffer);
if (read_status == 0) {
// Packet was succcessfully read.
IncomingPacket* packet = mIncomingPacket;
mIncomingPacket = new IncomingPacket(mOptions->max_packet_size_);
received_packets.push(packet);
} else if (read_status > 0) {
// packet was read partially
break;
} else {
return -1;
}
}
while (!received_packets.empty()) {
IncomingPacket* packet = received_packets.front();
if (mOnNewPacket) {
mOnNewPacket(packet);
} else {
delete packet;
}
received_packets.pop();
}
return 0;
}
sp_int32 Connection::putBackPressure() {
mUnderBackPressure = true;
// For now stop reads from this connection
if (unregisterEndpointForRead() < 0) {
LOG(ERROR) << "Could not start back pressure on connection";
return -1;
}
return 0;
}
sp_int32 Connection::removeBackPressure() {
mUnderBackPressure = false;
// Resume reading from this connection
if (registerEndpointForRead() < 0) {
LOG(ERROR) << "Could not remove back pressure from connection";
return -1;
}
return 0;
}