blob: f746eacea492e677302ba1313bd507e4113f7b1f [file] [log] [blame]
* Copyright (c) 2006 The Apache Software Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "Cpg.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/log/Statement.h"
#include <vector>
#include <limits>
#include <iterator>
#include <sstream>
#include <unistd.h>
namespace qpid {
namespace cluster {
using namespace std;
Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
void* cpg=0;
check(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
if (!cpg) throw Exception("Cannot get CPG instance.");
return reinterpret_cast<Cpg*>(cpg);
// Global callback functions.
void Cpg::globalDeliver (
cpg_handle_t handle,
struct cpg_name *group,
uint32_t nodeid,
uint32_t pid,
void* msg,
int msg_len)
cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
void Cpg::globalConfigChange(
cpg_handle_t handle,
struct cpg_name *group,
struct cpg_address *members, int nMembers,
struct cpg_address *left, int nLeft,
struct cpg_address *joined, int nJoined
cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
int Cpg::getFd() {
int fd;
check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor");
return fd;
Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) {
cpg_callbacks_t callbacks;
::memset(&callbacks, sizeof(callbacks), 0);
callbacks.cpg_deliver_fn = &globalDeliver;
callbacks.cpg_confchg_fn = &globalConfigChange;
QPID_LOG(info, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
int retries = 6;
while (err == CPG_ERR_TRY_AGAIN && --retries) {
QPID_LOG(notice, "Re-trying CPG initialization.");
err = cpg_initialize(&handle, &callbacks);
check(err, "Failed to initialize CPG.");
check(cpg_context_set(handle, this), "Cannot set CPG context");
// Note: CPG is currently unix-specific. If CPG is ported to
// windows then this needs to be refactored into
// qpid::sys::<platform>
IOHandle::impl->fd = getFd();
Cpg::~Cpg() {
try {
} catch (const std::exception& e) {
QPID_LOG(error, "Exception in Cpg destructor: " << e.what());
void Cpg::join(const std::string& name) {
group = name;
check(cpg_join(handle, &group), cantJoinMsg(group));
void Cpg::leave() {
check(cpg_leave(handle, &group), cantLeaveMsg(group));
bool Cpg::mcast(const iovec* iov, int iovLen) {
// Check for flow control
cpg_flow_control_state_t flowState;
check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
return false;
cpg_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
} while(result == CPG_ERR_TRY_AGAIN);
return true;
void Cpg::shutdown() {
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");
check(cpg_finalize(handle), "Error in shutdown of CPG");
void Cpg::dispatchOne() {
check(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch");
void Cpg::dispatchAll() {
check(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch");
void Cpg::dispatchBlocking() {
check(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch");
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
std::ostringstream os;
os << msg << ": ";
// FIXME aconway 2009-03-11: The commented out cases below are
// because of mistakes in the latest corosync header files.
// The code should be re-instated when that is sorted out.
switch (err) {
case CPG_OK: os << "ok"; break;
// case CPG_ERR_LIBRARY: os << "library"; break;
case CPG_ERR_TIMEOUT: os << "timeout"; break;
case CPG_ERR_TRY_AGAIN: os << "try again"; break;
case CPG_ERR_INVALID_PARAM: os << "invalid param"; break;
case CPG_ERR_NO_MEMORY: os << "no memory"; break;
case CPG_ERR_BAD_HANDLE: os << "bad handle"; break;
case CPG_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break;
case CPG_ERR_NOT_EXIST: os << "not exist"; break;
case CPG_ERR_EXIST: os << "exist"; break;
case CPG_ERR_NOT_SUPPORTED: os << "not supported"; break;
// case CPG_ERR_SECURITY: os << "security"; break;
// case CPG_ERR_TOO_MANY_GROUPS: os << "too many groups"; break;
default: os << ": unknown cpg error " << err;
os << " (" << err << ")";
return os.str();
std::string Cpg::cantJoinMsg(const Name& group) {
return "Cannot join CPG group "+group.str();
std::string Cpg::cantLeaveMsg(const Name& group) {
return "Cannot leave CPG group "+group.str();
std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
MemberId Cpg::self() const {
unsigned int nodeid;
check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
return MemberId(nodeid, getpid());
namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } }
ostream& operator <<(ostream& out, const MemberId& id) {
out << byte(id.first, 0) << "."
<< byte(id.first, 1) << "."
<< byte(id.first, 2) << "."
<< byte(id.first, 3);
return out << ":" << id.second;
ostream& operator<<(ostream& o, const ConnectionId& c) {
return o << c.first << "-" << c.second;
std::string MemberId::str() const {
char s[8];
uint32_t x;
x = htonl(first);
::memcpy(s, &x, 4);
x = htonl(second);
::memcpy(s+4, &x, 4);
return std::string(s,8);
MemberId::MemberId(const std::string& s) {
uint32_t x;
memcpy(&x, &s[0], 4);
first = ntohl(x);
memcpy(&x, &s[4], 4);
second = ntohl(x);
}} // namespace qpid::cluster