blob: 482d3d49d064dde170b0731f3457a25099c287cb [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HERON_COMMON_SRC_CPP_NETWORK_CONNECTION_H_
#define HERON_COMMON_SRC_CPP_NETWORK_CONNECTION_H_
#include <deque>
#include <functional>
#include <queue>
#include <utility>
#include "network/packet.h"
#include "network/event_loop.h"
#include "network/baseconnection.h"
#include "network/network_error.h"
#include "basics/basics.h"
/**
* A Connection is a placeholder for all network and io related
* information about a socket descriptor. It takes care of async read/write
* of Packets. Typical ways of instantiating connections are described
* below in a client and server settings.
*
* SERVER
* After an accept event, the server creates a Connection object giving it
* an established end point. It then registers for events on this connection
* like handling a new packet and handling close. It then calls the Start method
* of the Connection to begin reads/writes from the connection.
* int fd = accept(...);
* ConnectionEndPoint* endpoint = new ConnectionEndPoint;
* ....init endpoint with fd....
* Connection* conn = new Connection(endpoint, options, ss);
* conn->registerForNewPacket(...);
* conn->RegisterForClose(...);
* conn->Start();
* After this point, the connection invokes the appropriate callbacks when
* events occur.
*
* CLIENT
* After a successful connect, the client creates a Connection object giving it
* an established end point. It register callbacks for events and then invokes
* Start.
* int fd = connect(...);
* ConnectionEndPoint* endpoint = new ConnectionEndPoint;
* .... init the fd and sockaddr of the endpoint ....
* Connection* conn = new Connection(endpoint, options, ss);
* conn->registerForNewPacket(...)
* conn->RegisterForClose(...);
* conn->Start();
* After this point the connection invokes the appropriate callbacks when
* events occur.
*/
class Connection : public BaseConnection {
public:
/**
* `endpoint` is created by the caller, but now the Connection owns it.
* `options` is also created by the caller and the caller owns it. options
* should be active throught the lifetime of the Connection object.
*/
Connection(ConnectionEndPoint* endpoint, ConnectionOptions* options, EventLoop* eventLoop);
virtual ~Connection();
/**
* Add this packet to the list of packets to be sent. The packet in itself can be sent
* later. A zero return value indicates that the packet has been successfully queued to be
* sent. It does not indicate that the packet was sent successfully.
* A negative value of sendPacket indicates an error. The most likely error is improperly
* formatted packet.
*/
sp_int32 sendPacket(OutgoingPacket* packet);
/**
* Invoke the callback cb when a new packet arrives. A pointer to the packet is passed
* to the callback cb. That packet is now owned by the callback and is responsible for
* deleting it.
*/
void registerForNewPacket(VCallback<IncomingPacket*> cb);
/**
* The back pressure starter and reliever are used to communicate to the
* server whether this connection is under a queue build up or not
*/
sp_int32 registerForBackPressure(VCallback<Connection*> cbStarter,
VCallback<Connection*> cbReliever);
sp_int32 getOutstandingPackets() const { return mOutstandingPackets.size(); }
sp_int32 getOutstandingBytes() const { return mNumOutstandingBytes; }
sp_int32 getWriteBatchSize() const { return mWriteBatchsize; }
void setCausedBackPressure() { mCausedBackPressure = true; }
void unsetCausedBackPressure() { mCausedBackPressure = false; }
bool hasCausedBackPressure() const { return mCausedBackPressure; }
bool isUnderBackPressure() const { return mUnderBackPressure; }
sp_int32 putBackPressure();
sp_int32 removeBackPressure();
private:
virtual sp_int32 writeIntoEndPoint(sp_int32 fd);
sp_int32 writeIntoIOVector(sp_int32 maxWrite, sp_int32* toWrite);
void afterWriteIntoIOVector(sp_int32 simumWrites, ssize_t numWritten);
virtual bool stillHaveDataToWrite();
virtual void handleDataWritten();
virtual sp_int32 readFromEndPoint(sp_int32 _fd);
virtual void handleDataRead();
// The queue of outstanding packets that need to be sent. C++11 requires all containers'
// size() should be O(1).
std::deque<OutgoingPacket*> mOutstandingPackets;
sp_int64 mNumOutstandingBytes;
// The queue of packets that have been sent but not yet been reported to the higher layer
std::queue<OutgoingPacket*> mSentPackets;
// The queue of packets that have been received but not yet delivered to the higher layer
std::queue<IncomingPacket*> mReceivedPackets;
// Incompletely read next packet
IncomingPacket* mIncomingPacket;
// The user registered callbacks
VCallback<IncomingPacket*> mOnNewPacket;
// This call back gets registered from the Server and gets called once the conneciton pipe
// becomes free (outstanding bytes go to 0)
VCallback<Connection*> mOnConnectionBufferEmpty;
// This call back gets registered from the Server and gets called once the conneciton pipe
// becomes full (outstanding bytes exceed threshold)
VCallback<Connection*> mOnConnectionBufferFull;
sp_int32 mIOVectorSize;
struct iovec* mIOVector;
// How many bytes do we want to write in one batch
sp_int32 mWriteBatchsize;
// Have we caused back pressure?
bool mCausedBackPressure;
// Are our reads being throttled?
bool mUnderBackPressure;
// How many times have we enqueued data and found that we had outstanding bytes >
// HWM of back pressure threshold
sp_uint8 mNumEnqueuesWithBufferFull;
};
#endif // HERON_COMMON_SRC_CPP_NETWORK_CONNECTION_H_