blob: 34b11b816e889801f50eef4a990b7966faddd6c2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/framing/FieldTable.h"
#include "qpid/types/Variant.h"
#include "qpid/log/Statement.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageGroupManager.h"
using namespace qpid::broker;
namespace {
const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
const std::string GROUP_HEADER_KEY("group_header_key");
const std::string GROUP_STATE_KEY("group_state");
const std::string GROUP_ID_KEY("group_id");
const std::string GROUP_MSG_COUNT("msg_count");
const std::string GROUP_TIMESTAMP("timestamp");
const std::string GROUP_CONSUMER("consumer");
}
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
/** class ConsumerState **/
/** the consumer owns the given group */
void MessageGroupManager::ConsumerState::addGroup(const GroupState& group)
{
ownedGroups += 1;
pendingMsgs += (group.totalMsgs() - group.acquiredMsgs());
}
/** the consumer releases the given group */
void MessageGroupManager::ConsumerState::removeGroup(const GroupState& group)
{
assert(ownedGroups != 0);
ownedGroups -= 1;
uint32_t del = (group.totalMsgs() - group.acquiredMsgs());
assert(del <= pendingMsgs);
pendingMsgs -= del;
}
/** notify the consumer that a new message has arrived at one if its owned groups */
void MessageGroupManager::ConsumerState::msgAvailable(const GroupState&,
const QueuedMessage& )
{
assert(ownedGroups != 0);
pendingMsgs += 1;
}
/** notify the consumer that an available message has been acquired */
void MessageGroupManager::ConsumerState::msgAcquired(const GroupState&,
const QueuedMessage& )
{
assert(pendingMsgs != 0);
pendingMsgs -= 1;
}
void MessageGroupManager::consumerAdded( const Consumer& c)
{
const std::string& name = c.getName();
ConsumerState& state = consumers[name];
state.setName(name);
state.uncancel(); // just in case old consumer resubcribed
QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " added.");
}
void MessageGroupManager::consumerRemoved( const Consumer& c)
{
const std::string& name = c.getName();
ConsumerMap::iterator cs = consumers.find( name );
assert(cs != consumers.end());
ConsumerState& state = cs->second;
state.cancel();
if (state.groupCount() == 0) {
assert(state.remainingMsgs() == 0);
consumers.erase(cs);
QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " removed.");
}
}
/** GroupFifo */
void MessageGroupManager::GroupFifo::addGroup(const GroupState& group)
{
assert(group.totalMsgs() != 0);
const framing::SequenceNumber& next = group.nextMsg();
assert(fifo.find(next) == fifo.end());
fifo[next] = &group;
}
void MessageGroupManager::GroupFifo::removeGroup(const GroupState& group)
{
Fifo::iterator pos = fifo.find( group.nextMsg() );
assert( pos != fifo.end() && pos->second == &group );
fifo.erase( pos );
}
const MessageGroupManager::GroupState& MessageGroupManager::GroupFifo::nextGroup() const
{
return *(fifo.begin()->second);
}
/** GroupState */
void MessageGroupManager::GroupState::setOwner( ConsumerState& consumer )
{
assert(owner == 0);
owner = &consumer;
owner->addGroup( *this );
}
void MessageGroupManager::GroupState::resetOwner()
{
assert(owner);
owner->removeGroup( *this );
owner = 0;
}
const qpid::framing::SequenceNumber& MessageGroupManager::GroupState::nextMsg() const
{
assert(members.size() != 0);
return members.front();
}
void MessageGroupManager::GroupState::enqueueMsg(const QueuedMessage& msg)
{
members.push_back(msg.position);
if (owner) {
owner->msgAvailable(*this, msg);
}
}
void MessageGroupManager::GroupState::acquireMsg(const QueuedMessage& msg)
{
assert(members.size()); // there are msgs present
acquired += 1;
if (owner) {
owner->msgAcquired(*this, msg);
}
}
void MessageGroupManager::GroupState::requeueMsg(const QueuedMessage& msg)
{
assert(acquired != 0);
acquired -= 1;
if (owner) {
owner->msgAvailable(*this, msg);
}
}
void MessageGroupManager::GroupState::dequeueMsg(const QueuedMessage& msg)
{
assert( members.size() != 0 );
assert( acquired != 0 );
acquired -= 1;
// likely to be at or near begin() if dequeued in order
if (members.front() == msg.position) {
members.pop_front();
} else {
unsigned long diff = msg.position.getValue() - members.front().getValue();
long maxEnd = diff < members.size() ? (diff + 1) : members.size();
GroupState::PositionFifo::iterator i =
std::lower_bound(members.begin(), members.begin()+maxEnd, msg.position);
assert(i != members.end() && *i == msg.position);
members.erase(i);
}
}
void MessageGroupManager::GroupState::getPositions(framing::Array& positions) const
{
for (PositionFifo::const_iterator p = members.begin();
p != members.end(); ++p)
positions.push_back(framing::Array::ValuePtr(new framing::IntegerValue( *p )));
}
void MessageGroupManager::GroupState::setPositions(const framing::Array& positions)
{
members.clear();
for (framing::Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
members.push_back((*p)->getIntegerValue<uint32_t, 4>());
}
MessageGroupManager::GroupState& MessageGroupManager::findGroup(const QueuedMessage& qm)
{
uint32_t thisMsg = qm.position.getValue();
if (cachedGroup && lastMsg == thisMsg) {
hits++;
return *cachedGroup;
}
std::string group = defaultGroupId;
const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
if (headers) {
qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
if (id && id->convertsTo<std::string>()) {
std::string tmp = id->get<std::string>();
if (!tmp.empty()) // empty group is reserved
group = tmp;
}
}
if (cachedGroup && group == lastGroup) {
hits++;
lastMsg = thisMsg;
return *cachedGroup;
}
misses++;
cachedGroup = &messageGroups[group];
if (cachedGroup->getName().empty())
cachedGroup->setName(group); // new group, assign name
lastMsg = thisMsg;
lastGroup = group;
return *cachedGroup;
}
void MessageGroupManager::deleteGroup(GroupState& group)
{
if (cachedGroup == &group)
cachedGroup = 0;
std::string name = group.getName();
messageGroups.erase(name);
}
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
state.enqueueMsg(qm);
uint32_t total = state.totalMsgs();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.getName() << " total=" << total );
if (total == 1) {
// newly created group, no owner
freeGroups.addGroup(state);
}
}
void MessageGroupManager::acquired( const QueuedMessage& qm )
{
GroupState& state = findGroup(qm);
state.acquireMsg(qm);
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
void MessageGroupManager::requeued( const QueuedMessage& qm )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
state.requeueMsg(qm);
if (state.acquiredMsgs() == 0 && state.getOwner()) {
disownGroup(state);
freeGroups.addGroup(state);
}
QPID_LOG( trace, "group queue " << qName <<
": requeued message to group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
GroupState& group = findGroup(qm);
bool freeNeeded = false;
if (group.isFree()) { // dequeue is occuring via mgmt, not subscriber!
const framing::SequenceNumber next = group.nextMsg();
if (next == qm.position) {
/* we are about to remove the head message of this group. This message is
* used to index the freeGroups fifo, so we must temporarily remove it from
* the fifo until we are done updating the head message.
*/
freeGroups.removeGroup(group);
freeNeeded = true;
}
}
group.dequeueMsg(qm);
uint32_t total = group.totalMsgs();
QPID_LOG( trace, "group queue " << qName <<
": dequeued message from group id=" << group.getName() << " total=" << total );
// if no more outstanding acquired messages, free the group from the consumer
if (group.acquiredMsgs() == 0 && group.getOwner()) {
// group is now available again
disownGroup(group);
freeNeeded = true;
}
QPID_LOG( trace, "group queue " << qName <<
": dequeued message from group id=" << group.getName() << " total=" << total );
if (total == 0) {
QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << group.getName());
deleteGroup(group);
} else if (freeNeeded) {
freeGroups.addGroup(group);
}
}
/** remove the owner of the group */
void MessageGroupManager::disownGroup(GroupState& group)
{
ConsumerState& owner = *group.getOwner();
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << owner.getName() << " released group id=" << group.getName());
group.resetOwner();
if (owner.cancelled() && owner.groupCount() == 0) {
// this owner has unsubscribed, we can release it now.
std::string name = owner.getName();
consumers.erase(name);
QPID_LOG( error, "group queue " << qName << ": consumer " << name << " removed.");
}
}
namespace {
unsigned long found = 0;
unsigned long failed = 0;
unsigned long missCount = 0;
unsigned long earlyRet = 0;
}
MessageGroupManager::~MessageGroupManager()
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
if (messages.empty())
return false;
ConsumerState& cState = consumers.find(c->getName())->second;
next.position = c->position;
if (freeGroups.groupCount() != 0) {
const framing::SequenceNumber& nextFree = freeGroups.nextGroup().nextMsg();
if (nextFree <= next.position) { // a free message is older than current
next.position = nextFree;
--next.position;
}
} else if (cState.remainingMsgs() == 0) { // no more msgs from owned groups
earlyRet += 1;
return false;
}
int count = 1;
while (messages.next( next.position, next )) {
GroupState& group = findGroup(next);
if (group.getOwner() == &cState) {
found += 1;
return true;
} else if (group.isFree()) {
if (group.nextMsg() == next.position) { // only take from head!
found += 1;
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.getName()
<< "'s head message still pending. pos=" << group.nextMsg());
}
count += 1;
}
failed += 1;
missCount += 1;
return false;
}
bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
GroupState& state = findGroup(qm);
if (state.isFree()) {
freeGroups.removeGroup(state);
ConsumerMap::iterator cs = consumers.find( consumer );
assert(cs != consumers.end());
ConsumerState& owner = cs->second;
state.setOwner( owner );
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << consumer << " has acquired group id=" << state.getName());
return true;
}
return state.getOwner()->getName() == consumer;
}
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
if (!messages.empty() && messages.next(c->position, next))
return true;
return false;
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
{
/** Add a description of the current state of the message groups for this queue.
FORMAT:
{ "qpid.message_group_queue":
{ "group_header_key" : "<KEY>",
"group_state" :
[ { "group_id" : "<name>",
"msg_count" : <int>,
"timestamp" : <absTime>,
"consumer" : <consumer name> },
{...} // one for each known group
]
}
}
**/
assert(status.find(GROUP_QUERY_KEY) == status.end());
qpid::types::Variant::Map state;
qpid::types::Variant::List groups;
state[GROUP_HEADER_KEY] = groupIdHeader;
for (GroupMap::const_iterator g = messageGroups.begin();
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
info[GROUP_ID_KEY] = g->first;
info[GROUP_MSG_COUNT] = g->second.totalMsgs();
info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
if (g->second.getOwner()) {
info[GROUP_CONSUMER] = g->second.getOwner()->getName();
} else {
info[GROUP_CONSUMER] = std::string("");
}
groups.push_back(info);
}
state[GROUP_STATE_KEY] = groups;
status[GROUP_QUERY_KEY] = state;
}
boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
Messages& messages,
const qpid::framing::FieldTable& settings )
{
boost::shared_ptr<MessageGroupManager> empty;
if (settings.isSet(qpidMessageGroupKey)) {
// @todo: remove once "sticky" consumers are supported - see QPID-3347
if (!settings.isSet(qpidSharedGroup)) {
QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
return empty;
}
std::string headerKey = settings.getAsString(qpidMessageGroupKey);
if (headerKey.empty()) {
QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
return empty;
}
unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
QPID_LOG( debug, "Configured Queue '" << qName <<
"' for message grouping using header key '" << headerKey << "'" <<
" (timestamp=" << timestamp << ")");
return manager;
}
return empty;
}
std::string MessageGroupManager::defaultGroupId;
void MessageGroupManager::setDefaults(const std::string& groupId) // static
{
defaultGroupId = groupId;
}
/** Cluster replication:
state map format:
{ "group-state": [ {"name": <group-name>,
"owner": <consumer-name>-or-empty,
"acquired-ct": <acquired count>,
"positions": [Seqnumbers, ... ]},
{...}
]
}
*/
namespace {
const std::string GROUP_NAME("name");
const std::string GROUP_OWNER("owner");
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
const std::string GROUP_STATE("group-state");
const std::string OWNER_STATE("owner-state");
const std::string CANCELLED("cancelled");
const std::string YES("yes");
const std::string NO("no");
const std::string OWNER_NAME("name");
}
/** Runs on UPDATER to snapshot current state */
void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
{
using namespace qpid::framing;
state.clear();
framing::Array groupState(TYPE_CODE_MAP);
for (GroupMap::const_iterator g = messageGroups.begin();
g != messageGroups.end(); ++g) {
framing::FieldTable group;
group.setString(GROUP_NAME, g->first);
if (g->second.getOwner()) {
group.setString(GROUP_OWNER, g->second.getOwner()->getName());
} else {
group.setString(GROUP_OWNER, std::string(""));
}
group.setInt(GROUP_ACQUIRED_CT, g->second.acquiredMsgs());
framing::Array positions(TYPE_CODE_UINT32);
g->second.getPositions(positions);
group.setArray(GROUP_POSITIONS, positions);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
framing::Array ownerState(TYPE_CODE_MAP);
for (ConsumerMap::const_iterator c = consumers.begin();
c != consumers.end(); ++c) {
framing::FieldTable owner;
owner.setString(OWNER_NAME, c->first);
owner.setString(CANCELLED, c->second.cancelled() ? YES : NO);
ownerState.push_back(framing::Array::ValuePtr(new FieldTableValue(owner)));
}
state.setArray(OWNER_STATE, ownerState);
QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
}
/** called on UPDATEE to set state from snapshot */
void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
{
using namespace qpid::framing;
consumers.clear();
messageGroups.clear();
freeGroups.clear();
cachedGroup = 0;
// set up the known owners
framing::Array ownerState(TYPE_CODE_MAP);
bool ok = state.getArray(OWNER_STATE, ownerState);
if (!ok) {
QPID_LOG(error, "Unable to find message group owner state information for queue \"" <<
qName << "\": cluster inconsistency error!");
return;
}
for (framing::Array::const_iterator c = ownerState.begin(); c != ownerState.end(); ++c) {
framing::FieldTable ownerMap;
ok = framing::getEncodedValue<FieldTable>(*c, ownerMap);
if (!ok) {
QPID_LOG(error, "Invalid message group owner information for queue \"" <<
qName << "\": table encoding error!");
return;
}
if (!ownerMap.isSet(OWNER_NAME) || !ownerMap.isSet(CANCELLED)) {
QPID_LOG(error, "Invalid message group owner information for queue \"" <<
qName << "\": fields missing error!");
return;
}
const std::string name = ownerMap.getAsString(OWNER_NAME);
ConsumerState& owner = consumers[name];
owner.setName(name);
if (ownerMap.getAsString(CANCELLED) == YES) {
owner.cancel();
}
}
// set up the known groups
framing::Array groupState(TYPE_CODE_MAP);
ok = state.getArray(GROUP_STATE, groupState);
if (!ok) {
QPID_LOG(error, "Unable to find message group state information for queue \"" <<
qName << "\": cluster inconsistency error!");
return;
}
for (framing::Array::const_iterator g = groupState.begin(); g != groupState.end(); ++g) {
framing::FieldTable groupMap;
ok = framing::getEncodedValue<FieldTable>(*g, groupMap);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": table encoding error!");
return;
}
if (!groupMap.isSet(GROUP_NAME) || !groupMap.isSet(GROUP_OWNER) || !groupMap.isSet(GROUP_ACQUIRED_CT)) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": fields missing error!");
return;
}
// replicate the group state
std::string name = groupMap.getAsString(GROUP_NAME);
MessageGroupManager::GroupState& group = messageGroups[name];
assert(group.getName().empty());
group.setName(name);
group.setAcquired(groupMap.getAsInt(GROUP_ACQUIRED_CT));
framing::Array positions(TYPE_CODE_UINT32);
ok = groupMap.getArray(GROUP_POSITIONS, positions);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": position encoding error!");
return;
}
group.setPositions(positions);
const std::string ownerName = groupMap.getAsString(GROUP_OWNER);
if (!ownerName.empty()) {
ConsumerState& owner = consumers[ownerName];
group.setOwner(owner);
} else {
freeGroups.addGroup(group);
}
}
QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
}