blob: 53194cf064c91296e8d119d488c83924bb415358 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "QueueSettings.h"
#include "QueueFlowLimit.h"
#include "MessageGroupManager.h"
#include "qpid/types/Variant.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
namespace qpid {
namespace broker {
namespace {
const std::string MAX_COUNT("qpid.max_count");
const std::string MAX_SIZE("qpid.max_size");
const std::string MAX_FILE_COUNT("qpid.file_count");
const std::string MAX_FILE_SIZE("qpid.file_size");
const std::string POLICY_TYPE("qpid.policy_type");
const std::string POLICY_TYPE_REJECT("reject");
const std::string POLICY_TYPE_RING("ring");
const std::string POLICY_TYPE_SELF_DESTRUCT("self-destruct");
const std::string NO_LOCAL("no-local");
const std::string BROWSE_ONLY("qpid.browse-only");
const std::string TRACE_ID("qpid.trace.id");
const std::string TRACE_EXCLUDES("qpid.trace.exclude");
const std::string LVQ_KEY("qpid.last_value_queue_key");
const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
const std::string ALERT_REPEAT_GAP("qpid.alert_repeat_gap");
const std::string ALERT_COUNT("qpid.alert_count");
const std::string ALERT_SIZE("qpid.alert_size");
const std::string ALERT_COUNT_UP("qpid.alert_count_up");
const std::string ALERT_SIZE_UP("qpid.alert_size_up");
const std::string ALERT_COUNT_DOWN("qpid.alert_count_down");
const std::string ALERT_SIZE_DOWN("qpid.alert_size_down");
const std::string PRIORITIES("qpid.priorities");
const std::string FAIRSHARE("qpid.fairshare");
const std::string FAIRSHARE_ALIAS("x-qpid-fairshare");
const std::string PAGING("qpid.paging");
const std::string MAX_PAGES("qpid.max_pages_loaded");
const std::string PAGE_FACTOR("qpid.page_factor");
const std::string FILTER("qpid.filter");
const std::string LIFETIME_POLICY("qpid.lifetime-policy");
const std::string DELETE_IF_UNUSED_KEY("delete-if-unused");
const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty");
const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
const std::string SEQUENCING("qpid.queue_msg_sequence");
bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
{
if (key.find(basename) == 0) {
qpid::types::Variant index(key.substr(basename.size()+1));
settings.fairshare[index] = value;
return true;
} else {
return false;
}
}
bool isFairshareSetting(const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
{
return handleFairshareSetting(FAIRSHARE, key, value, settings) || handleFairshareSetting(FAIRSHARE_ALIAS, key, value, settings);
}
}
const QueueSettings::Aliases QueueSettings::aliases;
QueueSettings::QueueSettings(bool d, bool a) :
durable(d),
autodelete(a),
lifetime(DELETE_IF_UNUSED),
isTemporary(false),
priorities(0),
defaultFairshare(0),
shareGroups(false),
addTimestamp(false),
dropMessagesAtLimit(false),
selfDestructAtLimit(false),
paging(false),
maxPages(0),
pageFactor(0),
noLocal(false),
isBrowseOnly(false),
autoDeleteDelay(0),
alertRepeatInterval(60),
maxFileSize(0),
maxFileCount(0),
sequencing(false)
{}
bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
{
if (key == MAX_COUNT) {
maxDepth.setCount(value);
return true;
} else if (key == MAX_SIZE) {
maxDepth.setSize(value);
return true;
} else if (key == POLICY_TYPE) {
if (value.getString() == POLICY_TYPE_RING) {
dropMessagesAtLimit = true;
return true;
} else if (value.getString() == POLICY_TYPE_SELF_DESTRUCT) {
selfDestructAtLimit = true;
return true;
} else if (value.getString() == POLICY_TYPE_REJECT) {
//do nothing, thats the default
return true;
} else {
QPID_LOG(warning, "Unrecognised policy option: " << value);
return false;
}
} else if (key == NO_LOCAL) {
noLocal = value;
return true;
} else if (key == BROWSE_ONLY) {
isBrowseOnly = value;
return true;
} else if (key == TRACE_ID) {
traceId = value.asString();
return true;
} else if (key == TRACE_EXCLUDES) {
traceExcludes = value.asString();
return true;
} else if (key == PRIORITIES) {
priorities = value;
return true;
} else if (key == FAIRSHARE) {
defaultFairshare = value;
return true;
} else if (isFairshareSetting(key, value, *this)) {
return true;
} else if (key == MessageGroupManager::qpidMessageGroupKey) {
groupKey = value.asString();
return true;
} else if (key == MessageGroupManager::qpidSharedGroup) {
shareGroups = value;
return true;
} else if (key == MessageGroupManager::qpidMessageGroupTimestamp) {
addTimestamp = value;
return true;
} else if (key == LVQ_KEY) {
lvqKey = value.asString();
return true;
} else if (key == LVQ_LEGACY) {
if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
return true;
} else if (key == LVQ_LEGACY_NOBROWSE) {
QPID_LOG(warning, "Ignoring 'no-browse' directive for LVQ; it is no longer necessary");
if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
return true;
} else if (key == AUTO_DELETE_TIMEOUT) {
autoDeleteDelay = value;
if (autoDeleteDelay) autodelete = true;
return true;
} else if (key == QueueFlowLimit::flowStopCountKey) {
flowStop.setCount(value);
return true;
} else if (key == QueueFlowLimit::flowResumeCountKey) {
flowResume.setCount(value);
return true;
} else if (key == QueueFlowLimit::flowStopSizeKey) {
flowStop.setSize(value);
return true;
} else if (key == QueueFlowLimit::flowResumeSizeKey) {
flowResume.setSize(value);
return true;
} else if (key == ALERT_REPEAT_GAP) {
alertRepeatInterval = value;
return true;
} else if ((key == ALERT_COUNT) || (key == ALERT_COUNT_UP)) {
alertThreshold.setCount(value);
return true;
} else if ((key == ALERT_SIZE) || (key == ALERT_SIZE_UP)) {
alertThreshold.setSize(value);
return true;
} else if (key == ALERT_COUNT_DOWN) {
alertThresholdDown.setCount(value);
return true;
} else if (key == ALERT_SIZE_DOWN) {
alertThresholdDown.setSize(value);
return true;
} else if (key == MAX_FILE_COUNT && value.asUint64() > 0) {
maxFileCount = value.asUint64();
return false; // 'handle' here and also pass to store
} else if (key == MAX_FILE_SIZE && value.asUint64() > 0) {
maxFileSize = value.asUint64();
return false; // 'handle' here and also pass to store
} else if (key == PAGING) {
paging = value;
return true;
} else if (key == MAX_PAGES) {
maxPages = value;
return true;
} else if (key == PAGE_FACTOR) {
pageFactor = value;
return true;
} else if (key == SEQUENCING) {
sequenceKey = value.getString();
sequencing = !sequenceKey.empty();
return true;
} else if (key == FILTER) {
filter = value.asString();
return true;
} else if (key == LIFETIME_POLICY) {
if (value.asString() == DELETE_IF_UNUSED_KEY) {
lifetime = DELETE_IF_UNUSED;
} else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) {
lifetime = DELETE_IF_UNUSED_AND_EMPTY;
} else {
QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value);
}
return true;
} else {
return false;
}
}
void QueueSettings::validate() const
{
if (lvqKey.size() && priorities > 0)
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PRIORITIES << " for the same queue"));
if ((fairshare.size() || defaultFairshare) && priorities == 0)
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify fairshare settings when queue is not enabled for priorities"));
if (fairshare.size() > priorities)
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot have fairshare set for priority levels greater than " << priorities));
if (groupKey.size() && lvqKey.size())
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
if (groupKey.size() && priorities)
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
if (shareGroups && groupKey.empty()) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidSharedGroup
<< " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
}
if (addTimestamp && groupKey.empty()) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidMessageGroupTimestamp
<< " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
}
// @todo: remove once "sticky" consumers are supported - see QPID-3347
if (!shareGroups && groupKey.size()) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Only shared groups are supported at present; " << MessageGroupManager::qpidSharedGroup
<< " is required if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
}
if (paging) {
if(lvqKey.size()) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PAGING << " for the same queue"));
}
if(priorities) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << PAGING << " for the same queue"));
}
if(groupKey.size()) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << MessageGroupManager::qpidMessageGroupKey << " and " << PAGING << " for the same queue"));
}
} else {
if (maxPages) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MAX_PAGES << " if " << PAGING << " is set"));
}
if (pageFactor) {
throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << PAGE_FACTOR << " if " << PAGING << " is set"));
}
}
}
void QueueSettings::populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused)
{
original = inputs;
for (qpid::types::Variant::Map::const_iterator i = inputs.begin(); i != inputs.end(); ++i) {
Aliases::const_iterator a = aliases.find(i->first);
if (!handle((a != aliases.end() ? a->second : i->first), i->second)) unused.insert(*i);
}
}
void QueueSettings::populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused)
{
qpid::types::Variant::Map o;
qpid::amqp_0_10::translate(inputs, original);
populate(original, o);
qpid::amqp_0_10::translate(o, unused);
}
std::map<std::string, qpid::types::Variant> QueueSettings::asMap() const
{
return original;
}
QueueSettings::Aliases::Aliases()
{
insert(value_type("x-qpid-priorities", "qpid.priorities"));
insert(value_type("x-qpid-fairshare", "qpid.fairshare"));
insert(value_type("x-qpid-minimum-alert-repeat-gap", "qpid.alert_repeat_gap"));
insert(value_type("x-qpid-maximum-message-count", "qpid.alert_count"));
insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size"));
}
std::string QueueSettings::getLimitPolicy() const
{
if (dropMessagesAtLimit) return POLICY_TYPE_RING;
else if (selfDestructAtLimit) return POLICY_TYPE_SELF_DESTRUCT;
else return POLICY_TYPE_REJECT;
}
}} // namespace qpid::broker