blob: 5de2b516d59615c2b3ef654ca1585aa027e0d6bf [file] [log] [blame]
#ifndef CPG_H
#define CPG_H
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
#include "qpid/cluster/types.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include <boost/scoped_ptr.hpp>
#include <cassert>
#include <string.h>
namespace qpid {
namespace cluster {
/**
* Lightweight C++ interface to cpg.h operations.
*
* Manages a single CPG handle, initialized in ctor, finialzed in destructor.
* On error all functions throw Cpg::Exception.
*
*/
class Cpg : public sys::IOHandle {
public:
struct Exception : public ::qpid::Exception {
Exception(const std::string& msg) : ::qpid::Exception(msg) {}
};
struct Name : public cpg_name {
Name() { length = 0; }
Name(const char* s) { copy(s, strlen(s)); }
Name(const char* s, size_t n) { copy(s,n); }
Name(const std::string& s) { copy(s.data(), s.size()); }
void copy(const char* s, size_t n) {
assert(n < CPG_MAX_NAME_LENGTH);
memcpy(value, s, n);
length=n;
}
std::string str() const { return std::string(value, length); }
};
static std::string str(const cpg_name& n) {
return std::string(n.value, n.length);
}
struct Handler {
virtual ~Handler() {};
virtual void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
uint32_t /*pid*/,
void* /*msg*/,
int /*msg_len*/) = 0;
virtual void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
struct cpg_address */*left*/, int /*nLeft*/,
struct cpg_address */*joined*/, int /*nJoined*/
) = 0;
};
/** Open a CPG handle.
*@param handler for CPG events.
*/
Cpg(Handler&);
/** Destructor calls shutdown if not already calledx. */
~Cpg();
/** Disconnect from CPG */
void shutdown();
/** Dispatch CPG events.
*@param type one of
* - CPG_DISPATCH_ONE - dispatch exactly one event.
* - CPG_DISPATCH_ALL - dispatch all available events, don't wait.
* - CPG_DISPATCH_BLOCKING - blocking dispatch loop.
*/
void dispatch(cpg_dispatch_t type);
void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
void join(const std::string& group);
void leave();
/** Multicast to the group. NB: must not be called concurrently.
*
*@return true if the message was multi-cast, false if
* it was not sent due to flow control.
*/
bool mcast(const iovec* iov, int iovLen);
cpg_handle_t getHandle() const { return handle; }
MemberId self() const;
int getFd();
bool isFlowControlEnabled();
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
}
static Cpg* cpgFromHandle(cpg_handle_t);
static void globalDeliver(
cpg_handle_t handle,
struct cpg_name *group,
uint32_t nodeid,
uint32_t pid,
void* msg,
int msg_len);
static void globalConfigChange(
cpg_handle_t handle,
struct cpg_name *group,
struct cpg_address *members, int nMembers,
struct cpg_address *left, int nLeft,
struct cpg_address *joined, int nJoined
);
cpg_handle_t handle;
Handler& handler;
bool isShutdown;
Name group;
sys::Mutex dispatchLock;
};
inline bool operator==(const cpg_name& a, const cpg_name& b) {
return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
}
inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
}} // namespace qpid::cluster
#endif /*!CPG_H*/