blob: c083e4ee0f2488944a10018049fcee2afa9c1fe0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/MessageGroupManager.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Messages.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/framing/Array.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/TypeCode.h"
#include "qpid/types/Variant.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
using namespace qpid::broker;
namespace {
const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
const std::string GROUP_HEADER_KEY("group_header_key");
const std::string GROUP_STATE_KEY("group_state");
const std::string GROUP_ID_KEY("group_id");
const std::string GROUP_MSG_COUNT("msg_count");
const std::string GROUP_TIMESTAMP("timestamp");
const std::string GROUP_CONSUMER("consumer");
}
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
/** return an iterator to the message at position, or members.end() if not found */
MessageGroupManager::GroupState::MessageFifo::iterator
MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position)
{
MessageState mState(position);
MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
return (found->position == position) ? found : members.end();
}
void MessageGroupManager::unFree( const GroupState& state )
{
GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
assert( pos != freeGroups.end() && pos->second == &state );
freeGroups.erase( pos );
}
void MessageGroupManager::own( GroupState& state, const std::string& owner )
{
state.owner = owner;
unFree( state );
}
void MessageGroupManager::disown( GroupState& state )
{
state.owner.clear();
assert(state.members.size());
assert(freeGroups.find(state.members.front().position) == freeGroups.end());
freeGroups[state.members.front().position] = &state;
}
MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m )
{
uint32_t thisMsg = m.getSequence().getValue();
if (cachedGroup && lastMsg == thisMsg) {
hits++;
return *cachedGroup;
}
std::string group = m.getPropertyAsString(groupIdHeader);
if (group.empty()) group = defaultGroupId; //empty group is reserved
if (cachedGroup && group == lastGroup) {
hits++;
lastMsg = thisMsg;
return *cachedGroup;
}
misses++;
GroupState& found = messageGroups[group];
if (found.group.empty())
found.group = group; // new group, assign name
lastMsg = thisMsg;
lastGroup = group;
cachedGroup = &found;
return found;
}
void MessageGroupManager::enqueued( const Message& m )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(m);
GroupState::MessageState mState(m.getSequence());
state.members.push_back(mState);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.group << " total=" << total );
if (total == 1) {
// newly created group, no owner
assert(freeGroups.find(m.getSequence()) == freeGroups.end());
freeGroups[m.getSequence()] = &state;
}
}
void MessageGroupManager::acquired( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(m);
GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence());
assert(gm != state.members.end());
gm->acquired = true;
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.group << " acquired=" << state.acquired );
}
void MessageGroupManager::requeued( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(m);
assert( state.acquired != 0 );
state.acquired -= 1;
GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
assert(i != state.members.end());
i->acquired = false;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
disown(state);
}
QPID_LOG( trace, "group queue " << qName <<
": requeued message to group id=" << state.group << " acquired=" << state.acquired );
}
void MessageGroupManager::dequeued( const Message& m )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(m);
GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
assert(i != state.members.end());
if (i->acquired) {
assert( state.acquired != 0 );
state.acquired -= 1;
}
// special case if qm is first (oldest) message in the group:
// may need to re-insert it back on the freeGroups list, as the index will change
bool reFreeNeeded = false;
if (i == state.members.begin()) {
if (!state.owned()) {
// will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
// if on freelist, it is indexed by first member, which is about to be removed!
unFree(state);
reFreeNeeded = true;
}
state.members.pop_front();
} else {
state.members.erase(i);
}
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": dequeued message from group id=" << state.group << " total=" << total );
if (total == 0) {
QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group);
if (cachedGroup == &state) {
cachedGroup = 0;
}
std::string key(state.group);
messageGroups.erase( key );
} else if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
disown(state);
MessageDeque* md = dynamic_cast<MessageDeque*>(&messages);
if (md) {
md->resetCursors();
} else {
QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type");
}
} else if (reFreeNeeded) {
disown(state);
}
}
MessageGroupManager::~MessageGroupManager()
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
}
bool MessageGroupManager::acquire(const std::string& consumer, Message& m)
{
if (m.getState() == AVAILABLE) {
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
GroupState& state = findGroup(m);
if (!state.owned()) {
own( state, consumer );
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << consumer << " has acquired group id=" << state.group);
}
if (state.owner == consumer) {
m.setState(ACQUIRED);
return true;
} else {
return false;
}
} else {
return false;
}
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
{
/** Add a description of the current state of the message groups for this queue.
FORMAT:
{ "qpid.message_group_queue":
{ "group_header_key" : "<KEY>",
"group_state" :
[ { "group_id" : "<name>",
"msg_count" : <int>,
"timestamp" : <absTime>,
"consumer" : <consumer name> },
{...} // one for each known group
]
}
}
**/
assert(status.find(GROUP_QUERY_KEY) == status.end());
qpid::types::Variant::Map state;
qpid::types::Variant::List groups;
state[GROUP_HEADER_KEY] = groupIdHeader;
for (GroupMap::const_iterator g = messageGroups.begin();
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
info[GROUP_ID_KEY] = g->first;
info[GROUP_MSG_COUNT] = (uint64_t) g->second.members.size();
// set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present
info[GROUP_TIMESTAMP] = 0;
if (g->second.members.size() != 0) {
Message* m = messages.find(g->second.members.front().position, 0);
if (m && m->getTimestamp()) {
info[GROUP_TIMESTAMP] = m->getTimestamp();
}
}
info[GROUP_CONSUMER] = g->second.owner;
groups.push_back(info);
}
state[GROUP_STATE_KEY] = groups;
status[GROUP_QUERY_KEY] = state;
}
boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
Messages& messages,
const QueueSettings& settings )
{
boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) );
QPID_LOG( debug, "Configured Queue '" << qName <<
"' for message grouping using header key '" << settings.groupKey << "'" <<
" (timestamp=" << settings.addTimestamp << ")");
return manager;
}
std::string MessageGroupManager::defaultGroupId;
void MessageGroupManager::setDefaults(const std::string& groupId) // static
{
defaultGroupId = groupId;
}
namespace {
const std::string GROUP_NAME("name");
const std::string GROUP_OWNER("owner");
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
const std::string GROUP_STATE("group-state");
}