blob: b90882052507a64635b397a734151cd29385b88b [file]
#include "UDSConnector.h"
#include "Config.h"
#include "YagpStat.h"
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#include <chrono>
#include <thread>
extern "C" {
#include "postgres.h"
#include "cdb/cdbvars.h"
}
UDSConnector::UDSConnector() { GOOGLE_PROTOBUF_VERIFY_VERSION; }
static void inline log_tracing_failure(const yagpcc::SetQueryReq &req,
const std::string &event) {
ereport(LOG,
(errmsg("Query {%d-%d-%d} %s tracing failed with error %s",
req.query_key().tmid(), req.query_key().ssid(),
req.query_key().ccnt(), event.c_str(), strerror(errno))));
}
bool UDSConnector::report_query(const yagpcc::SetQueryReq &req,
const std::string &event) {
sockaddr_un address;
address.sun_family = AF_UNIX;
strcpy(address.sun_path, Config::uds_path().c_str());
bool success = true;
auto sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd != -1) {
if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != -1) {
if (connect(sockfd, (sockaddr *)&address, sizeof(address)) != -1) {
auto data_size = req.ByteSize();
auto total_size = data_size + sizeof(uint32_t);
uint8_t *buf = (uint8_t *)palloc(total_size);
uint32_t *size_payload = (uint32_t *)buf;
*size_payload = data_size;
req.SerializeWithCachedSizesToArray(buf + sizeof(uint32_t));
int64_t sent = 0, sent_total = 0;
do {
sent = send(sockfd, buf + sent_total, total_size - sent_total,
MSG_DONTWAIT);
sent_total += sent;
} while (
sent > 0 && size_t(sent_total) != total_size &&
// the line below is a small throttling hack:
// if a message does not fit a single packet, we take a nap
// before sending the next one.
// Otherwise, MSG_DONTWAIT send might overflow the UDS
(std::this_thread::sleep_for(std::chrono::milliseconds(1)), true));
if (sent < 0) {
log_tracing_failure(req, event);
success = false;
YagpStat::report_bad_send(total_size);
} else {
YagpStat::report_send(total_size);
}
pfree(buf);
} else {
// log the error and go on
log_tracing_failure(req, event);
success = false;
YagpStat::report_bad_connection();
}
} else {
// That's a very important error that should never happen, so make it
// visible to an end-user and admins.
ereport(WARNING,
(errmsg("Unable to create non-blocking socket connection %s",
strerror(errno))));
success = false;
YagpStat::report_error();
}
close(sockfd);
} else {
// log the error and go on
log_tracing_failure(req, event);
success = false;
YagpStat::report_error();
}
return success;
}