blob: 1432cc8fcab1c942bf7b24f0cfe4fb884a779059 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdlib.h>
#include <string>
#include <windows.h>
#include <qpid/broker/RecoverableQueue.h>
#include <qpid/log/Statement.h>
#include <qpid/store/MessageStorePlugin.h>
#include <qpid/store/StorageProvider.h>
#include "AmqpTransaction.h"
#include "BlobAdapter.h"
#include "BlobRecordset.h"
#include "BindingRecordset.h"
#include "MessageMapRecordset.h"
#include "MessageRecordset.h"
#include "TplRecordset.h"
#include "DatabaseConnection.h"
#include "Exception.h"
#include "State.h"
#include "VariantHelper.h"
// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
no_namespace rename("EOF", "EndOfFile")
#include <comdef.h>
namespace {
inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
// Table names
const std::string TblBinding("tblBinding");
const std::string TblConfig("tblConfig");
const std::string TblExchange("tblExchange");
const std::string TblMessage("tblMessage");
const std::string TblMessageMap("tblMessageMap");
const std::string TblQueue("tblQueue");
const std::string TblTpl("tblTPL");
}
namespace qpid {
namespace store {
namespace ms_sql {
/**
* @class MSSqlProvider
*
* Implements a qpid::store::StorageProvider that uses Microsoft SQL Server as
* the backend data store for Qpid.
*/
class MSSqlProvider : public qpid::store::StorageProvider
{
protected:
void finalizeMe();
void dump();
public:
MSSqlProvider();
~MSSqlProvider();
virtual qpid::Options* getOptions() { return &options; }
virtual void earlyInitialize (Plugin::Target& target);
virtual void initialize(Plugin::Target& target);
/**
* Receive notification that this provider is the one that will actively
* handle provider storage for the target. If the provider is to be used,
* this method will be called after earlyInitialize() and before any
* recovery operations (recovery, in turn, precedes call to initialize()).
*/
virtual void activate(MessageStorePlugin &store);
/**
* @name Methods inherited from qpid::broker::MessageStore
*/
/**
* Record the existence of a durable queue
*/
virtual void create(PersistableQueue& queue,
const qpid::framing::FieldTable& args);
/**
* Destroy a durable queue
*/
virtual void destroy(PersistableQueue& queue);
/**
* Record the existence of a durable exchange
*/
virtual void create(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args);
/**
* Destroy a durable exchange
*/
virtual void destroy(const PersistableExchange& exchange);
/**
* Record a binding
*/
virtual void bind(const PersistableExchange& exchange,
const PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args);
/**
* Forget a binding
*/
virtual void unbind(const PersistableExchange& exchange,
const PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args);
/**
* Record generic durable configuration
*/
virtual void create(const PersistableConfig& config);
/**
* Destroy generic durable configuration
*/
virtual void destroy(const PersistableConfig& config);
/**
* Stores a messages before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
* point). If the message has not yet been stored it will
* store the headers as well as any content passed in. A
* persistence id will be set on the message which can be
* used to load the content or to append to it.
*/
virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
/**
* Destroys a previously staged message. This only needs
* to be called if the message is never enqueued. (Once
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
virtual void destroy(PersistableMessage& msg);
/**
* Appends content to a previously staged message
*/
virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
const std::string& data);
/**
* Loads (a section) of content data for the specified
* message (previously stored through a call to stage or
* enqueue) into data. The offset refers to the content
* only (i.e. an offset of 0 implies that the start of the
* content should be loaded, not the headers or related
* meta-data).
*/
virtual void loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const PersistableMessage>& msg,
std::string& data,
uint64_t offset,
uint32_t length);
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
*
* Note: that this is async so the return of the function does
* not mean the opperation is complete.
*
* @param msg the message to enqueue
* @param queue the name of the queue onto which it is to be enqueued
* @param xid (a pointer to) an identifier of the
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
virtual void enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue);
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
* if it is no longer on any other queue.
*
* Note: that this is async so the return of the function does
* not mean the opperation is complete.
*
* @param msg the message to dequeue
* @param queue the name of the queue from which it is to be dequeued
* @param xid (a pointer to) an identifier of the
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
virtual void dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue);
/**
* Flushes all async messages to disk for the specified queue
*
* Note: this is a no-op for this provider.
*
* @param queue the name of the queue from which it is to be dequeued
*/
virtual void flush(const PersistableQueue& queue) {};
/**
* Returns the number of outstanding AIO's for a given queue
*
* If 0, than all the enqueue / dequeues have been stored
* to disk
*
* @param queue the name of the queue to check for outstanding AIO
*/
virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
{return 0;}
//@}
/**
* @name Methods inherited from qpid::broker::TransactionalStore
*/
//@{
virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(qpid::broker::TPCTransactionContext& txn);
virtual void commit(qpid::broker::TransactionContext& txn);
virtual void abort(qpid::broker::TransactionContext& txn);
virtual void collectPreparedXids(std::set<std::string>& xids);
//@}
virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
ExchangeMap& exchangeMap);
virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap);
virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
const ExchangeMap& exchangeMap,
const QueueMap& queueMap);
virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap);
virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
PreparedTransactionMap& dtxMap);
private:
struct ProviderOptions : public qpid::Options
{
std::string connectString;
std::string catalogName;
ProviderOptions(const std::string &name)
: qpid::Options(name),
catalogName("QpidStore")
{
const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
TCHAR myName[NAMELEN];
DWORD myNameLen = NAMELEN;
GetComputerName(myName, &myNameLen);
connectString = "Data Source=";
connectString += myName;
connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
addOptions()
("connect",
qpid::optValue(connectString, "STRING"),
"Connection string for the database to use. Will prepend "
"Provider=SQLOLEDB;")
("catalog",
qpid::optValue(catalogName, "DB NAME"),
"Catalog (database) name")
;
}
};
ProviderOptions options;
// Each thread has a separate connection to the database and also needs
// to manage its COM initialize/finalize individually. This is done by
// keeping a thread-specific State.
boost::thread_specific_ptr<State> dbState;
State *initState();
DatabaseConnection *initConnection(void);
void createDb(DatabaseConnection *db, const std::string &name);
};
static MSSqlProvider static_instance_registers_plugin;
void
MSSqlProvider::finalizeMe()
{
dbState.reset();
}
MSSqlProvider::MSSqlProvider()
: options("MS SQL Provider options")
{
}
MSSqlProvider::~MSSqlProvider()
{
}
void
MSSqlProvider::earlyInitialize(Plugin::Target &target)
{
MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
if (store) {
// If the database init fails, report it and don't register; give
// the rest of the broker a chance to run.
//
// Don't try to initConnection() since that will fail if the
// database doesn't exist. Instead, try to open a connection without
// a database name, then search for the database. There's still a
// chance this provider won't be selected for the store too, so be
// be sure to close the database connection before return to avoid
// leaving a connection up that will not be used.
try {
initState(); // This initializes COM
std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
db->open(options.connectString, "");
_ConnectionPtr conn(*db);
_RecordsetPtr pCatalogs = NULL;
VariantHelper<std::string> catalogName(options.catalogName);
pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
if (pCatalogs->EndOfFile) {
// Database doesn't exist; create it
QPID_LOG(notice,
"MSSQL: Creating database " + options.catalogName);
createDb(db.get(), options.catalogName);
}
else {
QPID_LOG(notice,
"MSSQL: Database located: " + options.catalogName);
}
if (pCatalogs) {
if (pCatalogs->State == adStateOpen)
pCatalogs->Close();
pCatalogs = 0;
}
db->close();
store->providerAvailable("MSSQL", this);
}
catch (qpid::Exception &e) {
QPID_LOG(error, e.what());
return;
}
store->addFinalizer(boost::bind(&MSSqlProvider::finalizeMe, this));
}
}
void
MSSqlProvider::initialize(Plugin::Target& target)
{
}
void
MSSqlProvider::activate(MessageStorePlugin &store)
{
QPID_LOG(info, "MS SQL Provider is up");
}
void
MSSqlProvider::create(PersistableQueue& queue,
const qpid::framing::FieldTable& /*args needed for jrnl*/)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsQueues;
try {
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsQueues.add(queue);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error creating queue " + queue.getName(), e, errs);
}
}
/**
* Destroy a durable queue
*/
void
MSSqlProvider::destroy(PersistableQueue& queue)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsQueues;
BindingRecordset rsBindings;
MessageRecordset rsMessages;
MessageMapRecordset rsMessageMaps;
try {
db->beginTransaction();
rsQueues.open(db, TblQueue);
rsBindings.open(db, TblBinding);
rsMessages.open(db, TblMessage);
rsMessageMaps.open(db, TblMessageMap);
// Remove bindings first; the queue IDs can't be ripped out from
// under the references in the bindings table. Then remove the
// message->queue entries for the queue, also because the queue can't
// be deleted while there are references to it. If there are messages
// orphaned by removing the queue references, they're deleted by
// a trigger on the tblMessageMap table. Lastly, the queue record
// can be removed.
rsBindings.removeForQueue(queue.getPersistenceId());
rsMessageMaps.removeForQueue(queue.getPersistenceId());
rsQueues.remove(queue);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error deleting queue " + queue.getName(), e, errs);
}
}
/**
* Record the existence of a durable exchange
*/
void
MSSqlProvider::create(const PersistableExchange& exchange,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsExchanges;
try {
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsExchanges.add(exchange);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error creating exchange " + exchange.getName(),
e,
errs);
}
}
/**
* Destroy a durable exchange
*/
void
MSSqlProvider::destroy(const PersistableExchange& exchange)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsExchanges;
BindingRecordset rsBindings;
try {
db->beginTransaction();
rsExchanges.open(db, TblExchange);
rsBindings.open(db, TblBinding);
// Remove bindings first; the exchange IDs can't be ripped out from
// under the references in the bindings table.
rsBindings.removeForExchange(exchange.getPersistenceId());
rsExchanges.remove(exchange);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error deleting exchange " + exchange.getName(),
e,
errs);
}
}
/**
* Record a binding
*/
void
MSSqlProvider::bind(const PersistableExchange& exchange,
const PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
BindingRecordset rsBindings;
try {
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.add(exchange.getPersistenceId(),
queue.getPersistenceId(),
key,
args);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error binding exchange " + exchange.getName() +
" to queue " + queue.getName(),
e,
errs);
}
}
/**
* Forget a binding
*/
void
MSSqlProvider::unbind(const PersistableExchange& exchange,
const PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args)
{
DatabaseConnection *db = initConnection();
BindingRecordset rsBindings;
try {
db->beginTransaction();
rsBindings.open(db, TblBinding);
rsBindings.remove(exchange.getPersistenceId(),
queue.getPersistenceId(),
key,
args);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error unbinding exchange " + exchange.getName() +
" from queue " + queue.getName(),
e,
errs);
}
}
/**
* Record generic durable configuration
*/
void
MSSqlProvider::create(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsConfigs;
try {
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.add(config);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error creating config " + config.getName(), e, errs);
}
}
/**
* Destroy generic durable configuration
*/
void
MSSqlProvider::destroy(const PersistableConfig& config)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsConfigs;
try {
db->beginTransaction();
rsConfigs.open(db, TblConfig);
rsConfigs.remove(config);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error deleting config " + config.getName(), e, errs);
}
}
/**
* Stores a messages before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
* point). If the message has not yet been stored it will
* store the headers as well as any content passed in. A
* persistence id will be set on the message which can be
* used to load the content or to append to it.
*/
void
MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
{
DatabaseConnection *db = initConnection();
MessageRecordset rsMessages;
try {
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error staging message", e, errs);
}
}
/**
* Destroys a previously staged message. This only needs
* to be called if the message is never enqueued. (Once
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
void
MSSqlProvider::destroy(PersistableMessage& msg)
{
DatabaseConnection *db = initConnection();
BlobRecordset rsMessages;
try {
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.remove(msg);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error deleting message", e, errs);
}
}
/**
* Appends content to a previously staged message
*/
void
MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
const std::string& data)
{
DatabaseConnection *db = initConnection();
MessageRecordset rsMessages;
try {
db->beginTransaction();
rsMessages.open(db, TblMessage);
rsMessages.append(msg, data);
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error appending to message", e, errs);
}
}
/**
* Loads (a section) of content data for the specified
* message (previously stored through a call to stage or
* enqueue) into data. The offset refers to the content
* only (i.e. an offset of 0 implies that the start of the
* content should be loaded, not the headers or related
* meta-data).
*/
void
MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
const boost::intrusive_ptr<const PersistableMessage>& msg,
std::string& data,
uint64_t offset,
uint32_t length)
{
// SQL store keeps all messages in one table, so we don't need the
// queue reference.
DatabaseConnection *db = initConnection();
MessageRecordset rsMessages;
try {
rsMessages.open(db, TblMessage);
rsMessages.loadContent(msg, data, offset, length);
}
catch(_com_error &e) {
std::string errs = db->getErrors();
throw ADOException("Error loading message content", e, errs);
}
}
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
*
* @param ctxt The transaction context under which this enqueue happens.
* @param msg The message to enqueue
* @param queue the name of the queue onto which it is to be enqueued
*/
void
MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
// If this enqueue is in the context of a transaction, use the specified
// transaction to nest a new transaction for this operation. However, if
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
db->beginTransaction();
}
else {
(void)initState(); // Ensure this thread is initialized
// It's a transactional enqueue; if it's TPC, grab the xid.
AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
if (tpcTxn)
xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
atxn->sqlBegin();
}
catch(_com_error &e) {
throw ADOException("Error queuing message", e, db->getErrors());
}
}
MessageRecordset rsMessages;
MessageMapRecordset rsMap;
try {
if (msg->getPersistenceId() == 0) { // Message itself not yet saved
rsMessages.open(db, TblMessage);
rsMessages.add(msg);
}
rsMap.open(db, TblMessageMap);
rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid);
if (atxn)
atxn->sqlCommit();
else
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
if (atxn)
atxn->sqlAbort();
else
db->rollbackTransaction();
throw ADOException("Error queuing message", e, errs);
}
msg->enqueueComplete();
}
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
* if it is no longer on any other queue.
*
* @param ctxt The transaction context under which this dequeue happens.
* @param msg The message to dequeue
* @param queue The queue from which it is to be dequeued
*/
void
MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
// If this dequeue is in the context of a transaction, use the specified
// transaction to nest a new transaction for this operation. However, if
// this is not in the context of a transaction, then just use the thread's
// DatabaseConnection with a ADO transaction.
DatabaseConnection *db = 0;
std::string xid;
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt);
if (atxn == 0) {
db = initConnection();
db->beginTransaction();
}
else {
(void)initState(); // Ensure this thread is initialized
// It's a transactional dequeue; if it's TPC, grab the xid.
AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt);
if (tpcTxn)
xid = tpcTxn->getXid();
db = atxn->dbConn();
try {
atxn->sqlBegin();
}
catch(_com_error &e) {
throw ADOException("Error queuing message", e, db->getErrors());
}
}
MessageMapRecordset rsMap;
try {
rsMap.open(db, TblMessageMap);
// TPC dequeues are just marked pending and will actually be removed
// when the transaction commits; Single-phase dequeues are removed
// now, relying on the SQL transaction to put it back if the
// transaction doesn't commit.
if (!xid.empty()) {
rsMap.pendingRemove(msg->getPersistenceId(),
queue.getPersistenceId(),
xid);
}
else {
rsMap.remove(msg->getPersistenceId(),
queue.getPersistenceId());
}
if (atxn)
atxn->sqlCommit();
else
db->commitTransaction();
}
catch(ms_sql::Exception&) {
if (atxn)
atxn->sqlAbort();
else
db->rollbackTransaction();
throw;
}
catch(_com_error &e) {
std::string errs = db->getErrors();
if (atxn)
atxn->sqlAbort();
else
db->rollbackTransaction();
throw ADOException("Error dequeuing message", e, errs);
}
msg->dequeueComplete();
}
std::auto_ptr<qpid::broker::TransactionContext>
MSSqlProvider::begin()
{
(void)initState(); // Ensure this thread is initialized
// Transactions are associated with the Connection, so this transaction
// context needs its own connection. At the time of writing, single-phase
// transactions are dealt with completely on one thread, so we really
// could just use the thread-specific DatabaseConnection for this.
// However, that would introduce an ugly, hidden coupling, so play
// it safe and handle this just like a TPC transaction, which actually
// can be prepared and committed/aborted from different threads,
// making it a bad idea to try using the thread-local DatabaseConnection.
boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db));
tx->sqlBegin();
std::auto_ptr<qpid::broker::TransactionContext> tc(tx);
return tc;
}
std::auto_ptr<qpid::broker::TPCTransactionContext>
MSSqlProvider::begin(const std::string& xid)
{
(void)initState(); // Ensure this thread is initialized
boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid));
tx->sqlBegin();
TplRecordset rsTpl;
try {
tx->sqlBegin();
rsTpl.open(db.get(), TblTpl);
rsTpl.add(xid);
tx->sqlCommit();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
tx->sqlAbort();
throw ADOException("Error adding TPL record", e, errs);
}
std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
return tc;
}
void
MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn)
{
// Commit all the marked-up enqueue/dequeue ops and the TPL record.
// On commit/rollback the TPL will be removed and the TPL markups
// on the message map will be cleaned up as well.
(void)initState(); // Ensure this thread is initialized
AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn);
if (atxn == 0)
throw qpid::broker::InvalidTransactionContextException();
try {
atxn->sqlCommit();
}
catch(_com_error &e) {
throw ADOException("Error preparing", e, atxn->dbConn()->getErrors());
}
atxn->setPrepared();
}
void
MSSqlProvider::commit(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
/*
* One-phase transactions simply commit the outer SQL transaction
* that was begun on begin(). Two-phase transactions are different -
* the SQL transaction started on begin() was committed on prepare()
* so all the SQL records reflecting the enqueue/dequeue actions for
* the transaction are recorded but with xid markups on them to reflect
* that they are prepared but not committed. Now go back and remove
* the markups, deleting those marked for removal.
*/
AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
if (p2txn == 0) {
AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
if (p1txn == 0)
throw qpid::broker::InvalidTransactionContextException();
p1txn->sqlCommit();
return;
}
DatabaseConnection *db(p2txn->dbConn());
TplRecordset rsTpl;
MessageMapRecordset rsMessageMap;
try {
db->beginTransaction();
rsTpl.open(db, TblTpl);
rsMessageMap.open(db, TblMessageMap);
rsMessageMap.commitPrepared(p2txn->getXid());
rsTpl.remove(p2txn->getXid());
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error committing transaction", e, errs);
}
}
void
MSSqlProvider::abort(qpid::broker::TransactionContext& txn)
{
(void)initState(); // Ensure this thread is initialized
/*
* One-phase and non-prepared two-phase transactions simply abort
* the outer SQL transaction that was begun on begin(). However, prepared
* two-phase transactions are different - the SQL transaction started
* on begin() was committed on prepare() so all the SQL records
* reflecting the enqueue/dequeue actions for the transaction are
* recorded but with xid markups on them to reflect that they are
* prepared but not committed. Now go back and remove the markups,
* deleting those marked for addition.
*/
AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn);
if (p2txn == 0 || !p2txn->isPrepared()) {
AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn);
if (p1txn == 0)
throw qpid::broker::InvalidTransactionContextException();
p1txn->sqlAbort();
return;
}
DatabaseConnection *db(p2txn->dbConn());
TplRecordset rsTpl;
MessageMapRecordset rsMessageMap;
try {
db->beginTransaction();
rsTpl.open(db, TblTpl);
rsMessageMap.open(db, TblMessageMap);
rsMessageMap.abortPrepared(p2txn->getXid());
rsTpl.remove(p2txn->getXid());
db->commitTransaction();
}
catch(_com_error &e) {
std::string errs = db->getErrors();
db->rollbackTransaction();
throw ADOException("Error committing transaction", e, errs);
}
(void)initState(); // Ensure this thread is initialized
AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn);
if (atxn == 0)
throw qpid::broker::InvalidTransactionContextException();
atxn->sqlAbort();
}
void
MSSqlProvider::collectPreparedXids(std::set<std::string>& xids)
{
DatabaseConnection *db = initConnection();
try {
TplRecordset rsTpl;
rsTpl.open(db, TblTpl);
rsTpl.recover(xids);
}
catch(_com_error &e) {
throw ADOException("Error reading TPL", e, db->getErrors());
}
}
// @TODO Much of this recovery code is way too similar... refactor to
// a recover template method on BlobRecordset.
void
MSSqlProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
{
DatabaseConnection *db = 0;
try {
db = initConnection();
BlobRecordset rsConfigs;
rsConfigs.open(db, TblConfig);
_RecordsetPtr p = (_RecordsetPtr)rsConfigs;
if (p->BOF && p->EndOfFile)
return; // Nothing to do
p->MoveFirst();
while (!p->EndOfFile) {
uint64_t id = p->Fields->Item["persistenceId"]->Value;
long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
BlobAdapter blob(blobSize);
blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
// Recreate the Config instance and reset its ID.
broker::RecoverableConfig::shared_ptr config =
recoverer.recoverConfig(blob);
config->setPersistenceId(id);
p->MoveNext();
}
}
catch(_com_error &e) {
throw ADOException("Error recovering configs",
e,
db ? db->getErrors() : "");
}
}
void
MSSqlProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
ExchangeMap& exchangeMap)
{
DatabaseConnection *db = 0;
try {
db = initConnection();
BlobRecordset rsExchanges;
rsExchanges.open(db, TblExchange);
_RecordsetPtr p = (_RecordsetPtr)rsExchanges;
if (p->BOF && p->EndOfFile)
return; // Nothing to do
p->MoveFirst();
while (!p->EndOfFile) {
uint64_t id = p->Fields->Item["persistenceId"]->Value;
long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
BlobAdapter blob(blobSize);
blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
// Recreate the Exchange instance, reset its ID, and remember the
// ones restored for matching up when recovering bindings.
broker::RecoverableExchange::shared_ptr exchange =
recoverer.recoverExchange(blob);
exchange->setPersistenceId(id);
exchangeMap[id] = exchange;
p->MoveNext();
}
}
catch(_com_error &e) {
throw ADOException("Error recovering exchanges",
e,
db ? db->getErrors() : "");
}
}
void
MSSqlProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
QueueMap& queueMap)
{
DatabaseConnection *db = 0;
try {
db = initConnection();
BlobRecordset rsQueues;
rsQueues.open(db, TblQueue);
_RecordsetPtr p = (_RecordsetPtr)rsQueues;
if (p->BOF && p->EndOfFile)
return; // Nothing to do
p->MoveFirst();
while (!p->EndOfFile) {
uint64_t id = p->Fields->Item["persistenceId"]->Value;
long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
BlobAdapter blob(blobSize);
blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
// Recreate the Queue instance and reset its ID.
broker::RecoverableQueue::shared_ptr queue =
recoverer.recoverQueue(blob);
queue->setPersistenceId(id);
queueMap[id] = queue;
p->MoveNext();
}
}
catch(_com_error &e) {
throw ADOException("Error recovering queues",
e,
db ? db->getErrors() : "");
}
}
void
MSSqlProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
const ExchangeMap& exchangeMap,
const QueueMap& queueMap)
{
DatabaseConnection *db = 0;
try {
db = initConnection();
BindingRecordset rsBindings;
rsBindings.open(db, TblBinding);
rsBindings.recover(recoverer, exchangeMap, queueMap);
}
catch(_com_error &e) {
throw ADOException("Error recovering bindings",
e,
db ? db->getErrors() : "");
}
}
void
MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
MessageMap& messageMap,
MessageQueueMap& messageQueueMap)
{
DatabaseConnection *db = 0;
try {
db = initConnection();
MessageRecordset rsMessages;
rsMessages.open(db, TblMessage);
rsMessages.recover(recoverer, messageMap);
MessageMapRecordset rsMessageMaps;
rsMessageMaps.open(db, TblMessageMap);
rsMessageMaps.recover(messageQueueMap);
}
catch(_com_error &e) {
throw ADOException("Error recovering messages",
e,
db ? db->getErrors() : "");
}
}
void
MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
PreparedTransactionMap& dtxMap)
{
DatabaseConnection *db = initConnection();
std::set<std::string> xids;
try {
TplRecordset rsTpl;
rsTpl.open(db, TblTpl);
rsTpl.recover(xids);
}
catch(_com_error &e) {
throw ADOException("Error recovering TPL records", e, db->getErrors());
}
try {
// Rebuild the needed RecoverableTransactions.
for (std::set<std::string>::const_iterator iXid = xids.begin();
iXid != xids.end();
++iXid) {
boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection);
dbX->open(options.connectString, options.catalogName);
std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX,
*iXid));
tx->setPrepared();
std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx);
dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc);
}
}
catch(_com_error &e) {
throw ADOException("Error recreating dtx connection", e);
}
}
////////////// Internal Methods
State *
MSSqlProvider::initState()
{
State *state = dbState.get(); // See if thread has initialized
if (!state) {
state = new State;
dbState.reset(state);
}
return state;
}
DatabaseConnection *
MSSqlProvider::initConnection(void)
{
State *state = initState();
if (state->dbConn != 0)
return state->dbConn; // And the DatabaseConnection is set up too
std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
db->open(options.connectString, options.catalogName);
state->dbConn = db.release();
return state->dbConn;
}
void
MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name)
{
const std::string dbCmd = "CREATE DATABASE " + name;
const std::string useCmd = "USE " + name;
const std::string tableCmd = "CREATE TABLE ";
const std::string colSpecs =
" (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
" fieldTableBlob varbinary(MAX) NOT NULL)";
const std::string bindingSpecs =
" (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
" queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
" routingKey varchar(255),"
" fieldTableBlob varbinary(MAX))";
const std::string messageMapSpecs =
" (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL,"
" queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
" prepareStatus tinyint CHECK (prepareStatus IS NULL OR "
" prepareStatus IN (1, 2)),"
" xid varbinary(512) REFERENCES tblTPL(xid)"
" CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )";
const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)";
// SET NOCOUNT ON added to prevent extra result sets from
// interfering with SELECT statements. (Added by SQL Management)
const std::string removeUnrefMsgsTrigger =
"CREATE TRIGGER dbo.RemoveUnreferencedMessages "
"ON tblMessageMap AFTER DELETE AS BEGIN "
"SET NOCOUNT ON; "
"DELETE FROM tblMessage "
"WHERE tblMessage.persistenceId IN "
" (SELECT messageId FROM deleted) AND"
" NOT EXISTS(SELECT * FROM tblMessageMap"
" WHERE tblMessageMap.messageId IN"
" (SELECT messageId FROM deleted)) "
"END";
_variant_t unused;
_bstr_t dbStr = dbCmd.c_str();
_ConnectionPtr conn(*db);
try {
conn->Execute(dbStr, &unused, adExecuteNoRecords);
_bstr_t useStr = useCmd.c_str();
conn->Execute(useStr, &unused, adExecuteNoRecords);
std::string makeTable = tableCmd + TblQueue + colSpecs;
_bstr_t makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblExchange + colSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblConfig + colSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblMessage + colSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblBinding + bindingSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblTpl + tplSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
makeTable = tableCmd + TblMessageMap + messageMapSpecs;
makeTableStr = makeTable.c_str();
conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
_bstr_t addTriggerStr = removeUnrefMsgsTrigger.c_str();
conn->Execute(addTriggerStr, &unused, adExecuteNoRecords);
}
catch(_com_error &e) {
throw ADOException("MSSQL can't create " + name, e, db->getErrors());
}
}
void
MSSqlProvider::dump()
{
// dump all db records to qpid_log
QPID_LOG(notice, "DB Dump: (not dumping anything)");
// rsQueues.dump();
}
}}} // namespace qpid::store::ms_sql