blob: bb4df8890ad419dd9132728a69c638f1fe7b9f2c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "UpdateClient.h"
#include "Cluster.h"
#include "ClusterMap.h"
#include "Connection.h"
#include "Decoder.h"
#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/ClusterConnectionSessionStateBody.h"
#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
#include <algorithm>
namespace qpid {
namespace cluster {
using broker::Broker;
using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
ClusterConnectionProxy(client::Connection c) :
AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
ClusterConnectionProxy(client::AsyncSession s) :
AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {}
};
// Create a connection with special version that marks it as a catch-up connection.
client::Connection catchUpConnection() {
client::Connection c;
client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
return c;
}
// Send a control body directly to the session.
void send(client::AsyncSession& s, const AMQBody& body) {
client::SessionBase_0_10Access sb(s);
sb.get()->send(body);
}
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{}
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
session = connection.newSession(UPDATE);
update();
done();
} catch (const std::exception& e) {
failed(e);
}
delete this;
}
void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
session.close();
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
namespace {
template <class T> std::string encode(const T& t) {
std::string encoded;
encoded.resize(t.encodedSize());
framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
t.encode(buf);
return encoded;
}
} // namespace
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
/** Bind a queue to the update exchange and update messges to it
* setting the message possition as needed.
*/
class MessageUpdater {
std::string queue;
bool haveLastPos;
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
session.exchangeBind(queue, UpdateClient::UPDATE);
}
~MessageUpdater() {
try {
session.exchangeUnbind(queue, UpdateClient::UPDATE);
}
catch (const std::exception& e) {
// Don't throw in a destructor.
QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what());
}
}
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
}
lastPos = message.position;
// Send the expiry ID if necessary.
if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
if (!expiryId) return; // Message already expired, don't replicate.
ClusterConnectionProxy(session).expiryId(*expiryId);
}
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
};
void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
MessageUpdater updater(q->getName(), session, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
}
void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) {
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
bc.getUserId(),
string(fragment.first, fragment.second)
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
client::SessionBase_0_10Access(shadowSession).set(simpl);
AMQP_AllProxy::ClusterConnection proxy(simpl->out);
// Re-create session state on remote connection.
// Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1));
updateTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
ss->senderGetCommandPoint().command,
ss->senderGetIncomplete(),
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
ss->receiverGetIncomplete()
);
// Send frames for partial message in progress.
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
arg::resumeId = ci->getResumeId(),
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionConsumerStateBody state(
ProtocolVersion(),
ci->getName(),
ci->isBlocked(),
ci->isNotifyEnabled()
);
client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
dr.getTag(),
dr.getId(),
dr.isAcquired(),
dr.isAccepted(),
dr.isCancelled(),
dr.isComplete(),
dr.isEnded(),
dr.isWindowing(),
dr.getCredit()
);
}
class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
: MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
}
void operator()(const broker::RecoveredEnqueue& renq) {
updateMessage(renq.getMessage());
proxy.txEnqueue(renq.getQueue()->getName());
}
void operator()(const broker::TxAccept& txAccept) {
proxy.txAccept(txAccept.getAcked());
}
void operator()(const broker::TxPublish& txPub) {
updateMessage(txPub.getMessage());
typedef std::list<Queue::shared_ptr> QueueList;
const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
private:
UpdateClient& parent;
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
QPID_LOG(debug, updaterId << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
txBuffer->accept(updater);
proxy.txEnd();
}
}
}} // namespace qpid::cluster