blob: 3e1fa6b39d2185dd69944b9225b5037ad3524609 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "network/connection.h"
#include <algorithm>
#include <list>
#include <utility>
#if defined(__APPLE__)
#include <sys/uio.h>
#endif
#include "glog/logging.h"
const sp_int32 __SYSTEM_NETWORK_READ_BATCH_SIZE__ = 1048576; // 1M
const sp_int32 __SYSTEM_NETWORK_DEFAULT_WRITE_BATCH_SIZE__ = 1048576; // 1M
// How many times should we wait to see a buffer full while enqueueing data
// before declaring start of back pressure
const sp_uint8 __SYSTEM_MIN_NUM_ENQUEUES_WITH_BUFFER_FULL__ = 3;
Connection::Connection(ConnectionEndPoint* endpoint, ConnectionOptions* options,
EventLoop* eventLoop)
: BaseConnection(endpoint, options, eventLoop) {
mIncomingPacket = new IncomingPacket(mOptions->max_packet_size_);
mOnNewPacket = NULL;
mOnConnectionBufferEmpty = NULL;
mOnConnectionBufferFull = NULL;
mNumOutstandingBytes = 0;
mIOVectorSize = 1024;
mIOVector = new struct iovec[mIOVectorSize];
mWriteBatchsize = __SYSTEM_NETWORK_DEFAULT_WRITE_BATCH_SIZE__;
mCausedBackPressure = false;
mUnderBackPressure = false;
mNumEnqueuesWithBufferFull = 0;
}
Connection::~Connection() {
if (hasCausedBackPressure()) {
mOnConnectionBufferEmpty(this);
}
delete mIncomingPacket;
{
while (!mOutstandingPackets.empty()) {
auto p = mOutstandingPackets.front();
delete p;
mOutstandingPackets.pop_front();
}
while (!mSentPackets.empty()) {
auto p = mSentPackets.front();
delete p;
mSentPackets.pop();
}
}
while (!mReceivedPackets.empty()) {
auto p = mReceivedPackets.front();
delete p;
mReceivedPackets.pop();
}
delete[] mIOVector;
}
sp_int32 Connection::sendPacket(OutgoingPacket* packet) {
packet->PrepareForWriting();
if (registerForWrite() != 0) return -1;
mOutstandingPackets.push_back(packet);
mNumOutstandingBytes += packet->GetTotalPacketSize();
if (!hasCausedBackPressure()) {
// Are we above the threshold?
if (mNumOutstandingBytes >= mOptions->high_watermark_) {
// Have we been above the threshold enough number of times?
if (++mNumEnqueuesWithBufferFull > __SYSTEM_MIN_NUM_ENQUEUES_WITH_BUFFER_FULL__) {
mNumEnqueuesWithBufferFull = 0;
if (mOnConnectionBufferFull) {
mOnConnectionBufferFull(this);
}
}
} else {
mNumEnqueuesWithBufferFull = 0;
}
}
return 0;
}
void Connection::registerForNewPacket(VCallback<IncomingPacket*> cb) {
mOnNewPacket = std::move(cb);
}
sp_int32 Connection::registerForBackPressure(VCallback<Connection*> cbStarter,
VCallback<Connection*> cbReliever) {
mOnConnectionBufferFull = std::move(cbStarter);
mOnConnectionBufferEmpty = std::move(cbReliever);
return 0;
}
sp_int32 Connection::writeIntoIOVector(sp_int32 maxWrite, sp_int32* toWrite) {
sp_uint32 bytesLeft = maxWrite;
sp_int32 simulWrites = std::min(mIOVectorSize, (sp_int32)mOutstandingPackets.size());
*toWrite = 0;
auto iter = mOutstandingPackets.begin();
for (sp_int32 i = 0; i < simulWrites; ++i) {
mIOVector[i].iov_base = (*iter)->get_header() + (*iter)->position_;
mIOVector[i].iov_len = PacketHeader::get_packet_size((*iter)->get_header()) +
PacketHeader::header_size() - (*iter)->position_;
if (mIOVector[i].iov_len >= bytesLeft) {
mIOVector[i].iov_len = bytesLeft;
}
bytesLeft -= mIOVector[i].iov_len;
*toWrite = *toWrite + mIOVector[i].iov_len;
if (bytesLeft <= 0) {
return i + 1;
}
iter++;
}
return simulWrites;
}
void Connection::afterWriteIntoIOVector(sp_int32 simulWrites, ssize_t numWritten) {
mNumOutstandingBytes -= numWritten;
for (sp_int32 i = 0; i < simulWrites; ++i) {
auto pr = mOutstandingPackets.front();
if (numWritten >= (ssize_t)mIOVector[i].iov_len) {
// This iov structure was completely written as instructed
sp_uint32 bytesLeftForThisPacket = PacketHeader::get_packet_size(pr->get_header()) +
PacketHeader::header_size() - pr->position_;
bytesLeftForThisPacket -= mIOVector[i].iov_len;
if (bytesLeftForThisPacket == 0) {
// This whole packet has been consumed
mSentPackets.push(pr);
mOutstandingPackets.pop_front();
} else {
pr->position_ += mIOVector[i].iov_len;
}
numWritten -= mIOVector[i].iov_len;
} else {
// This iov structure has been partially sent out
pr->position_ += numWritten;
numWritten = 0;
}
if (numWritten <= 0) break;
}
// Check if we reduced the write buffer to something below the back
// pressure threshold
if (hasCausedBackPressure()) {
// Signal pipe free
if (mNumOutstandingBytes <= mOptions->low_watermark_) {
mOnConnectionBufferEmpty(this);
}
}
}
bool Connection::stillHaveDataToWrite() {
if (mOutstandingPackets.empty()) return false;
return true;
}
sp_int32 Connection::writeIntoEndPoint(sp_int32 fd) {
sp_int32 bytesWritten = 0;
while (1) {
sp_int32 stillToWrite = mWriteBatchsize - bytesWritten;
sp_int32 toWrite = 0;
sp_int32 simulWrites = writeIntoIOVector(stillToWrite, &toWrite);
ssize_t numWritten = ::writev(fd, mIOVector, simulWrites);
if (numWritten >= 0) {
afterWriteIntoIOVector(simulWrites, numWritten);
bytesWritten += numWritten;
if (bytesWritten >= mWriteBatchsize) {
// We only write a at max this bytes at a time.
// This is so that others can get a chance
return 0;
}
if (numWritten < toWrite) {
// writev would block otherwise
return 0;
}
if (!stillHaveDataToWrite()) {
// No more packets to write
return 0;
}
} else {
// some error happened in writev
if (errno == EAGAIN || errno == EINTR) {
// we need to retry the write again
LOG(INFO) << "writev said to try again\n";
} else {
LOG(ERROR) << "error happened in writev " << errno << "\n";
return -1;
}
}
}
}
void Connection::handleDataWritten() {
while (!mSentPackets.empty()) {
auto pr = mSentPackets.front();
delete pr;
mSentPackets.pop();
}
}
sp_int32 Connection::readFromEndPoint(sp_int32 fd) {
sp_int32 bytesRead = 0;
while (1) {
sp_int32 read_status = mIncomingPacket->Read(fd);
if (read_status == 0) {
// Packet was succcessfully read.
IncomingPacket* packet = mIncomingPacket;
mIncomingPacket = new IncomingPacket(mOptions->max_packet_size_);
mReceivedPackets.push(packet);
bytesRead += packet->GetTotalPacketSize();
if (bytesRead >= __SYSTEM_NETWORK_READ_BATCH_SIZE__) {
return 0;
}
} else if (read_status > 0) {
// packet was read partially
return 0;
} else {
return -1;
}
}
}
void Connection::handleDataRead() {
while (!mReceivedPackets.empty()) {
IncomingPacket* packet = mReceivedPackets.front();
if (mOnNewPacket) {
mOnNewPacket(packet);
} else {
delete packet;
}
mReceivedPackets.pop();
}
}
sp_int32 Connection::putBackPressure() {
mUnderBackPressure = true;
// For now stop reads from this connection
if (unregisterEndpointForRead() < 0) {
LOG(ERROR) << "Could not start back pressure on connection";
return -1;
}
return 0;
}
sp_int32 Connection::removeBackPressure() {
mUnderBackPressure = false;
// Resume reading from this connection
if (registerEndpointForRead() < 0) {
LOG(ERROR) << "Could not remove back pressure from connection";
return -1;
}
return 0;
}