blob: 11fc65633f61b7c4f65d56007248d3c31b93cae1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "UDPIpc.hpp"
#include <sstream>
#include <string>
#include <vector>
#include "FwkStrCvt.hpp"
#include "GsRandom.hpp"
#include "config.h"
namespace apache {
namespace geode {
namespace client {
namespace testframework {
bool UDPMessage::ping(ACE_SOCK_Dgram& io, ACE_INET_Addr& who) {
clear();
setCmd(ACK_REQUEST);
setSender(who);
return send(io);
}
std::string UDPMessage::dump(int32_t max) {
char buf[1024];
std::string dmp("Dump of message parts: ");
sprintf(buf, "\nTag: %u\nCmd: %u %s\nId: %u\nLength: %u\nMessage:\n",
m_hdr.tag, m_hdr.cmd, cmdString(m_hdr.cmd), m_hdr.id,
ntohl(m_hdr.length));
dmp += buf;
if (!m_msg.empty()) {
if (max > 0) {
dmp += m_msg.substr(0, max);
} else {
dmp += m_msg;
}
} else {
dmp += "No Message.";
}
dmp += "\nEnd of Dump";
return dmp;
}
bool UDPMessage::receiveFrom(ACE_SOCK_Dgram& io,
const ACE_Time_Value* timeout) {
bool ok = true;
ACE_Time_Value wait(2);
if (!timeout) {
timeout = &wait;
}
iovec buffs;
int32_t red = static_cast<int32_t>(io.recv(&buffs, m_sender, 0, timeout));
if (red < 0) {
if (errno != ETIME) {
FWKEXCEPTION("UDPMessage::receiveFrom: Failed, errno: " << errno);
}
return false;
}
uint32_t len = buffs.iov_len;
if (len < UDP_HEADER_SIZE) { // We must at least have a header
FWKEXCEPTION("UDPMessage::receiveFrom: Failed, header length: " << len);
}
clear();
memcpy(&m_hdr, buffs.iov_base, UDP_HEADER_SIZE);
if (m_hdr.tag != UDP_MSG_TAG) {
FWKEXCEPTION("UDPMessage::receiveFrom: Failed, invalid tag: " << m_hdr.tag);
}
char* ptr = reinterpret_cast<char*>(buffs.iov_base);
ptr += UDP_HEADER_SIZE;
len -= UDP_HEADER_SIZE;
uint32_t sent = ntohl(m_hdr.length);
if (sent != len) {
FWKEXCEPTION("UDPMessage::receiveFrom: Failed, expected "
<< sent << " bytes, received " << len);
}
if (len > 0) {
m_msg = std::string(ptr, len);
}
delete[] reinterpret_cast<char*>(buffs.iov_base);
// FWKINFO( "UDPMessage::receiveFrom: " << dump( 50 ) );;
if (needToAck()) {
UDPMessage ack(ACK);
ok = (ok && ack.sendTo(io, m_sender));
}
return ok;
}
bool UDPMessage::sendTo(ACE_SOCK_Dgram& io, ACE_INET_Addr& who) {
setSender(who);
return send(io);
}
bool UDPMessage::send(ACE_SOCK_Dgram& io) {
bool ok = true;
int32_t tot = 0;
int32_t vcnt = 1;
iovec buffs[2];
m_hdr.length = 0;
buffs[0].iov_base = reinterpret_cast<char*>(&m_hdr);
buffs[0].iov_len = UDP_HEADER_SIZE;
tot += UDP_HEADER_SIZE;
if (!m_msg.empty()) {
auto len = static_cast<uint32_t>(m_msg.size());
m_hdr.length = htonl(len);
buffs[1].iov_base = const_cast<char*>(m_msg.c_str());
buffs[1].iov_len = len;
vcnt = 2;
tot += len;
}
// FWKINFO( "UDPMessage::send: " << dump( 50 ) );;
int32_t sent = static_cast<int32_t>(io.send(buffs, vcnt, m_sender));
if (sent < 0) {
FWKEXCEPTION("UDPMessage::send: Failed, errno: " << errno);
}
if (sent != tot) {
ok = false;
FWKSEVERE("UDPMessage::send: Failed to completely send, " << sent << ", "
<< tot);
}
if (needToAck()) {
UDPMessage ack;
ok = (ok && ack.receiveFrom(io));
}
return ok;
}
UDPMessageClient::UDPMessageClient(std::string server)
: m_server(server.c_str()) {
int32_t result = -1;
int32_t tries = 100;
ACE_INET_Addr* client = new ACE_INET_Addr();
while ((result < 0) && (tries > 0)) {
uint32_t port = GsRandom::random(1111u, 31111u) + tries;
client->set(port, "localhost");
result = m_io.open(*client);
}
delete client;
if (result < 0) {
FWKEXCEPTION("Client failed to open io, " << errno);
}
const ACE_Time_Value timeout(20);
UDPMessage msg;
tries = 3;
bool pingMsg = false;
while (!pingMsg && tries-- > 0) {
try {
pingMsg = msg.ping(m_io, m_server);
} catch (...) {
continue;
}
}
if (pingMsg) {
// if ( msg.ping( m_io, m_server ) ) {
msg.setSender(m_server);
bool connectionOK = false;
tries = 10;
while (!connectionOK && (--tries > 0)) {
try {
msg.clear();
msg.setCmd(ADDR_REQUEST);
if (msg.sendTo(m_io, m_server)) {
if (msg.receiveFrom(m_io, &timeout)) {
std::string newConn = msg.what();
ACE_INET_Addr conn(newConn.c_str());
if (msg.ping(m_io, conn)) { // We have a working addr
m_server = conn;
connectionOK = true;
tries = 0;
}
}
}
} catch (...) {
continue;
}
}
if (!connectionOK) {
FWKEXCEPTION(
"UDPMessageClient failed to establish connection to server.");
}
} else {
FWKEXCEPTION("Failed to contact " << server);
}
}
int32_t Receiver::doTask() {
auto msg = std::unique_ptr<UDPMessage>(new UDPMessage());
UDPMessage cmsg;
try {
while (*m_run) {
if (isListener()) {
cmsg.clear();
ACE_Time_Value wait(30); // Timeout is relative time.
if (cmsg.receiveFrom(*m_io, &wait)) {
if (cmsg.getCmd() == ADDR_REQUEST) {
auto&& addr = m_addrs.front();
m_addrs.pop_front();
m_addrs.push_back(addr);
cmsg.clear();
cmsg.setMessage(addr);
cmsg.setCmd(ADDR_RESPONSE);
cmsg.send(*m_io);
}
}
} else {
msg->clear();
ACE_Time_Value timeout(2);
if (msg->receiveFrom(
*m_io, &timeout)) { // Timeout is relative time, send ack.
if (msg->getCmd() == ADDR_REQUEST) {
auto&& addr = m_addrs.front();
m_addrs.pop_front();
m_addrs.push_back(addr);
cmsg.clear();
cmsg.setMessage(addr);
cmsg.setCmd(ADDR_RESPONSE);
cmsg.send(*m_io);
}
if (msg->length() > 0) {
m_queues->putInbound(msg.release());
msg.reset(new UDPMessage());
}
}
}
}
} catch (FwkException& ex) {
FWKSEVERE("Receiver::doTask() caught exception: " << ex.what());
} catch (...) {
FWKSEVERE("Receiver::doTask() caught unknown exception");
}
return 0;
}
void Receiver::initialize() {
int32_t tries = 100;
uint16_t port = m_basePort;
int32_t lockResult = m_mutex.tryacquire();
int32_t result = -1;
if (lockResult != -1) { // The listener thread
ACE_INET_Addr addr(port, "localhost");
m_listener = ACE_Thread::self();
result = m_io->open(addr);
} else {
while ((result < 0) && (--tries > 0)) {
port += ++m_offset;
ACE_INET_Addr addr(port, "localhost");
result = m_io->open(addr);
if (result == 0) {
char hbuff[256];
char* hst = &hbuff[0];
char* fqdn = ACE_OS::getenv("GF_FQDN");
if (fqdn) {
hst = fqdn;
} else {
addr.get_host_name(hbuff, 255);
}
char buff[1024];
sprintf(buff, "%s:%u", hst, port);
m_addrs.push_back(buff);
}
}
}
if (result < 0) {
FWKEXCEPTION("Server failed to open io, " << errno << ", on port " << port);
}
}
int32_t STReceiver::doTask() {
auto msg = std::unique_ptr<UDPMessage>(new UDPMessage());
try {
while (*m_run) {
msg->clear();
ACE_Time_Value timeout(2); // Timeout is relative time
if (msg->receiveFrom(m_io, &timeout)) {
if (msg->getCmd() == ADDR_REQUEST) {
msg->clear();
msg->setMessage(m_addr);
msg->setCmd(ADDR_RESPONSE);
msg->send(m_io);
} else {
if (msg->length() > 0) {
m_queues->putInbound(msg.release());
msg.reset(new UDPMessage());
}
}
}
}
} catch (FwkException& ex) {
FWKSEVERE("STReceiver::doTask() caught exception: " << ex.what());
} catch (...) {
FWKSEVERE("STReceiver::doTask() caught unknown exception");
}
return 0;
}
void STReceiver::initialize() {
int32_t result = -1;
ACE_INET_Addr addr(m_basePort, "localhost");
result = m_io.open(addr);
if (result == 0) {
char hbuff[256];
char* hst = &hbuff[0];
char* fqdn = ACE_OS::getenv("GF_FQDN");
if (fqdn) {
hst = fqdn;
} else {
addr.get_host_name(hbuff, 255);
}
char buff[1024];
sprintf(buff, "%s:%u", hst, m_basePort);
m_addr = buff;
}
if (result < 0) {
FWKEXCEPTION("STReceiver::initialize failed to open io, "
<< errno << ", on port " << m_basePort);
}
}
int32_t Responder::doTask() {
try {
while (*m_run) {
UDPMessage* msg = m_queues->getOutbound();
if (msg) {
msg->send(*m_io);
delete msg;
}
}
} catch (FwkException& ex) {
FWKSEVERE("Responder::doTask() caught exception: " << ex.what());
} catch (...) {
FWKSEVERE("Responder::doTask() caught unknown exception");
}
return 0;
}
void Responder::initialize() {
int32_t result = -1;
int32_t tries = 100;
while ((result < 0) && (--tries > 0)) {
uint16_t port = ++m_offset + 111 + m_basePort;
result = m_io->open(ACE_INET_Addr(port, "localhost"));
if (result < 0) {
FWKWARN("Server failed to open io, " << errno << ", on port " << port);
}
}
if (result < 0) {
FWKEXCEPTION("Server failed to open io, " << errno);
}
}
} // namespace testframework
} // namespace client
} // namespace geode
} // namespace apache