blob: c3d09cf625f72d2e9a3f9feca7a74efe17a3999d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_TCRCONNECTION_H_
#define GEODE_TCRCONNECTION_H_
#include <atomic>
#include <chrono>
#include <ace/Semaphore.h>
#include <geode/CacheableBuiltins.hpp>
#include <geode/ExceptionTypes.hpp>
#include <geode/internal/geode_globals.hpp>
#include "Connector.hpp"
#include "DiffieHellman.hpp"
#include "TcrMessage.hpp"
#include "util/synchronized_set.hpp"
#define DEFAULT_TIMEOUT_RETRIES 12
#define PRIMARY_SERVER_TO_CLIENT 101
#define SECONDARY_SERVER_TO_CLIENT 102
#define SUCCESSFUL_SERVER_TO_CLIENT 105
#define UNSUCCESSFUL_SERVER_TO_CLIENT 106
#define CLIENT_TO_SERVER 100
#define REPLY_OK 59
#define REPLY_OK_CS43 58
#define REPLY_REFUSED 60
#define REPLY_INVALID 61
#define REPLY_SSL_ENABLED 21
#define REPLY_AUTHENTICATION_REQUIRED 62
#define REPLY_AUTHENTICATION_FAILED 63
#define REPLY_DUPLICATE_DURABLE_CLIENT 64
#define SECURITY_CREDENTIALS_NONE 0
#define SECURITY_CREDENTIALS_NORMAL 1
#define SECURITY_CREDENTIALS_DHENCRYPT 2
#define SECURITY_MULTIUSER_NOTIFICATIONCHANNEL 3
/** Closes and Deletes connection only if it exists */
#define GF_SAFE_DELETE_CON(x) \
do { \
x->close(); \
delete x; \
x = nullptr; \
} while (0)
namespace apache {
namespace geode {
namespace client {
struct chunkHeader {
int32_t chunkLength;
int8_t flags;
};
struct chunkedResponseHeader {
int32_t messageType;
int32_t numberOfParts;
int32_t transactionId;
chunkHeader header;
};
enum ConnErrType {
CONN_NOERR = 0x0,
CONN_NODATA = 0x1,
CONN_TIMEOUT = 0x3,
CONN_IOERR = 0x4,
CONN_OTHERERR = 0x8
};
enum ServerQueueStatus {
NON_REDUNDANT_SERVER = 0,
REDUNDANT_SERVER = 1,
PRIMARY_SERVER = 2
};
class TcrEndpoint;
class SystemProperties;
class ThinClientPoolDM;
class TcrConnectionManager;
class APACHE_GEODE_EXPORT TcrConnection {
public:
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
/** Create one connection, endpoint is in format of hostname:portno
* It will do handshake with j-server. There're 2 types of handshakes:
* 1) handshake for request
* send following bytes:
* CLIENT_TO_SERVER
* REPLY_OK
* 2 bytes for the length of idenfifier
* a string with "hostname:processId" as identifier
*
* if send succeeds, handshake succeeds. Otherwise, construction
* fails.
*
* 2) handshake for client notification
* send following bytes:
* SERVER_TO_CLIENT
* 1 (4 bytes, we can hard-code)
* 12345 (4 bytes, we can hard-code)
*
* So the total bytes to send are 9
* read one byte from server, it should be CLIENT_TO_SERVER
* Otherwise, construction fails.
* @param ports List of local ports for connections to endpoint
* @param numPorts Size of ports list
*/
bool initTcrConnection(
TcrEndpoint* endpointObj, const char* endpoint,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification = false, bool isSecondary = false,
std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT);
TcrConnection(const TcrConnectionManager& connectionManager,
volatile const bool& isConnected)
: connectionId(0),
m_connectionManager(&connectionManager),
m_dh(nullptr),
m_endpoint(nullptr),
m_endpointObj(nullptr),
m_connected(isConnected),
m_conn(nullptr),
m_hasServerQueue(NON_REDUNDANT_SERVER),
m_queueSize(0),
m_port(0),
m_chunksProcessSema(0),
m_isBeingUsed(false),
m_isUsed(0),
m_poolDM(nullptr) {}
/* destroy the connection */
~TcrConnection();
/**
* send a synchronized request to server.
*
* It will send the buffer, then wait to receive 17 bytes and save in
* msg_header.
* msg_header[0] is message type.
* msg_header[1],msg_header[2],msg_header[3],msg_header[4] will be a 4 bytes
* integer,
* let's say, msgLen, which specifies the length of next read. byteReads some
* number of
* call read again for msgLen bytes, and save the bytes into msg_body.
* concatenate the msg_header and msg_body into buffer, msg. The msg should be
* a '0' ended
* string. i.e. If the msg_header plus msg_body has 100 chars, msg should be a
* 101 char array
* to contain the '0' in the end. We need it to get length of the msg.
* Return the msg.
*
* @param buffer the buffer to send
* @param len length of the data to send
* @param sendTimeoutSec write timeout in sec
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in sec
* @return byte arrary of response. '0' ended.
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket
* operation: 1 write, 2 read
*/
char* sendRequest(
const char* buffer, size_t len, size_t* recvLen,
std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS,
int32_t request = -1);
/**
* send a synchronized request to server for REGISTER_INTEREST_LIST.
*
* @param buffer the buffer to send
* len length of the data to send
* message vector, which will return chunked TcrMessage.
* sendTimeoutSec write timeout in sec
* receiveTimeoutSec read timeout in sec
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket
* operation: 1 write, 2 read
*/
void sendRequestForChunkedResponse(
const TcrMessage& request, size_t len, TcrMessageReply& message,
std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS);
/**
* send an asynchronized request to server. No response is expected.
* we need to use it to send CLOSE_CONNECTION msg
*
* @param buffer the buffer to send
* len length of the data to send
* sendTimeoutSec write timeout in sec
* @return no return. Because it either succeeds, or throw exception.
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket
* operation: 1 write, 2 read
*/
void send(const char* buffer, size_t len,
std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
bool checkConnected = true);
void send(std::chrono::microseconds& timeSpent, const char* buffer,
size_t len,
std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
bool checkConnected = true);
/**
* This method is for receiving client notification. It will read 2 times as
* reading reply in sendRequest()
*
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in sec
* @return byte arrary of response. '0' ended.
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens at any of the 3 socket
* operation: 1 write, 2 read
*/
char* receive(
size_t* recvLen, ConnErrType* opErr,
std::chrono::microseconds receiveTimeoutSec = DEFAULT_READ_TIMEOUT_SECS);
// readMessage is now public
/**
* This method reads a message from the socket connection and returns the byte
* array of response.
* @param recvLen output parameter for length of the received message
* @param receiveTimeoutSec read timeout in seconds
* @param doHeaderTimeoutRetries retry when header receive times out
* @return byte array of response. '0' ended.
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens during read
*/
char* readMessage(size_t* recvLen,
std::chrono::microseconds receiveTimeoutSec,
bool doHeaderTimeoutRetries, ConnErrType* opErr,
bool isNotificationMessage = false, int32_t request = -1);
/**
* This method reads an interest list response message from the socket
* connection and sets the reply message
* parameter.
* @param reply response message
* @param receiveTimeout read timeout
* @param doHeaderTimeoutRetries retry when header receive times out
* @exception GeodeIOException if an I/O error occurs (socket failure).
* @exception TimeoutException if timeout happens during read
*/
void readMessageChunked(TcrMessageReply& reply,
std::chrono::microseconds receiveTimeout,
bool doHeaderTimeoutRetries);
/**
* Send close connection message to the server.
*/
void close();
// Durable clients: return true if server has HA queue.
ServerQueueStatus inline getServerQueueStatus(int32_t& queueSize) {
queueSize = m_queueSize;
return m_hasServerQueue;
}
uint16_t inline getPort() { return m_port; }
TcrEndpoint* getEndpointObject() const { return m_endpointObj; }
bool isBeingUsed() { return m_isBeingUsed; }
bool setAndGetBeingUsed(
volatile bool isBeingUsed,
bool forTransaction); // { m_isBeingUsed = isBeingUsed ;}
// helpers for pool connection manager
void touch();
bool hasExpired(const std::chrono::milliseconds& expiryTime);
bool isIdle(const std::chrono::milliseconds& idleTime);
time_point getLastAccessed();
void updateCreationTime();
int64_t getConnectionId() {
LOGDEBUG("TcrConnection::getConnectionId() = %" PRId64, connectionId);
return connectionId;
}
void setConnectionId(int64_t id) {
LOGDEBUG("Tcrconnection:setConnectionId() = %" PRId64, id);
connectionId = id;
}
const TcrConnectionManager& getConnectionManager() {
return *m_connectionManager;
}
std::shared_ptr<CacheableBytes> encryptBytes(
std::shared_ptr<CacheableBytes> data) {
if (m_dh != nullptr) {
return m_dh->encrypt(data);
} else {
return data;
}
}
std::shared_ptr<CacheableBytes> decryptBytes(
std::shared_ptr<CacheableBytes> data) {
if (m_dh != nullptr) {
return m_dh->decrypt(data);
} else {
return data;
}
}
private:
int64_t connectionId;
const TcrConnectionManager* m_connectionManager;
DiffieHellman* m_dh;
std::chrono::microseconds calculateHeaderTimeout(
std::chrono::microseconds receiveTimeout, bool retry);
chunkedResponseHeader readResponseHeader(std::chrono::microseconds timeout);
chunkHeader readChunkHeader(std::chrono::microseconds timeout);
std::vector<uint8_t> readChunkBody(std::chrono::microseconds timeout,
int32_t chunkLength);
bool processChunk(TcrMessageReply& reply, std::chrono::microseconds timeout,
int32_t chunkLength, int8_t lastChunkAndSecurityFlags);
/**
* To read Intantiator message(which meant for java client), here we are
* ignoring it
*/
void readHandshakeInstantiatorMsg(std::chrono::microseconds connectTimeout);
/**
* Packs the override settings bits into bytes - currently a single byte for
* conflation, remove-unresponsive-client and notify-by-subscription.
*/
uint8_t getOverrides(const SystemProperties* props);
/**
* To read the from stream
*/
int32_t readHandShakeInt(std::chrono::microseconds connectTimeout);
/*
* To read the arraysize
*/
int32_t readHandshakeArraySize(std::chrono::microseconds connectTimeout);
/*
* This function reads "numberOfBytes" and ignores it.
*/
void readHandShakeBytes(int numberOfBytes,
std::chrono::microseconds connectTimeout);
/** Create a normal or SSL connection */
Connector* createConnection(
const char* ipaddr,
std::chrono::microseconds waitSeconds = DEFAULT_CONNECT_TIMEOUT,
int32_t maxBuffSizePool = 0);
/**
* Reads bytes from socket and handles error conditions in case of Handshake.
*/
std::vector<int8_t> readHandshakeData(
int32_t msgLength, std::chrono::microseconds connectTimeout);
/**
* Reads raw bytes (without appending nullptr terminator) from socket and
* handles error conditions in case of Handshake.
*/
std::shared_ptr<CacheableBytes> readHandshakeRawData(
int32_t msgLength, std::chrono::microseconds connectTimeout);
/**
* Reads a string from socket and handles error conditions in case of
* Handshake.
*/
std::shared_ptr<CacheableString> readHandshakeString(
std::chrono::microseconds connectTimeout);
/**
* Reads a byte array (using initial length) from socket and handles error
* conditions in case of Handshake.
*/
std::shared_ptr<CacheableBytes> readHandshakeByteArray(
std::chrono::microseconds connectTimeout);
/**
* Send data to the connection till sendTimeout
*/
ConnErrType sendData(const char* buffer, size_t length,
std::chrono::microseconds sendTimeout,
bool checkConnected = true);
ConnErrType sendData(std::chrono::microseconds& timeSpent, const char* buffer,
size_t length, std::chrono::microseconds sendTimeout,
bool checkConnected = true);
/**
* Read data from the connection till receiveTimeoutSec
*/
ConnErrType receiveData(char* buffer, size_t length,
std::chrono::microseconds receiveTimeoutSec,
bool checkConnected = true,
bool isNotificationMessage = false);
const char* m_endpoint;
TcrEndpoint* m_endpointObj;
volatile const bool& m_connected;
Connector* m_conn;
ServerQueueStatus m_hasServerQueue;
int32_t m_queueSize;
uint16_t m_port;
// semaphore to synchronize with the chunked response processing thread
ACE_Semaphore m_chunksProcessSema;
time_point m_creationTime;
time_point m_lastAccessed;
// Disallow copy constructor and assignment operator.
TcrConnection(const TcrConnection&);
TcrConnection& operator=(const TcrConnection&);
volatile bool m_isBeingUsed;
std::atomic<uint32_t> m_isUsed;
ThinClientPoolDM* m_poolDM;
bool useReplyTimeout(const TcrMessage& request) const;
std::chrono::microseconds sendWithTimeouts(
const char* data, size_t len, std::chrono::microseconds sendTimeout,
std::chrono::microseconds receiveTimeout);
bool replyHasResult(const TcrMessage& request, TcrMessageReply& reply);
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_TCRCONNECTION_H_