blob: ce9fa6101041b5af94b4f9c8fec023c3267b37d7 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/Exception.h>
#include <qpid/log/Statement.h>
#include <qpid/store/StorageProvider.h>
#include "MessageMapRecordset.h"
#include "BlobEncoder.h"
#include "DatabaseConnection.h"
#include "Exception.h"
#include "VariantHelper.h"
namespace {
inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
}
namespace qpid {
namespace store {
namespace ms_sql {
void
MessageMapRecordset::open(DatabaseConnection* conn, const std::string& table)
{
init(conn, table);
}
void
MessageMapRecordset::add(uint64_t messageId,
uint64_t queueId,
const std::string& xid)
{
std::ostringstream command;
command << "INSERT INTO " << tableName
<< " (messageId, queueId";
if (!xid.empty())
command << ", prepareStatus, xid";
command << ") VALUES (" << messageId << "," << queueId;
if (!xid.empty())
command << "," << PREPARE_ADD << ",?";
command << ")" << std::ends;
_CommandPtr cmd = NULL;
_ParameterPtr xidVal = NULL;
TESTHR(cmd.CreateInstance(__uuidof(Command)));
_ConnectionPtr p = *dbConn;
cmd->ActiveConnection = p;
cmd->CommandText = command.str().c_str();
cmd->CommandType = adCmdText;
if (!xid.empty()) {
TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
xidVal->Name = "@xid";
xidVal->Type = adVarBinary;
xidVal->Size = xid.length();
xidVal->Direction = adParamInput;
xidVal->Value = BlobEncoder(xid);
cmd->Parameters->Append(xidVal);
}
cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
}
void
MessageMapRecordset::remove(uint64_t messageId, uint64_t queueId)
{
std::ostringstream command;
command << "DELETE FROM " << tableName
<< " WHERE queueId = " << queueId
<< " AND messageId = " << messageId << std::ends;
_CommandPtr cmd = NULL;
TESTHR(cmd.CreateInstance(__uuidof(Command)));
_ConnectionPtr p = *dbConn;
cmd->ActiveConnection = p;
cmd->CommandText = command.str().c_str();
cmd->CommandType = adCmdText;
_variant_t deletedRecords;
cmd->Execute(&deletedRecords, NULL, adCmdText | adExecuteNoRecords);
if ((long)deletedRecords == 0)
throw ms_sql::Exception("Message does not exist in queue mapping");
// Trigger on deleting the mapping takes care of deleting orphaned
// message record from tblMessage.
}
void
MessageMapRecordset::pendingRemove(uint64_t messageId,
uint64_t queueId,
const std::string& xid)
{
// Look up the mapping for the specified message and queue. There
// should be only one because of the uniqueness constraint in the
// SQL table. Update it to reflect it's pending delete with
// the specified xid.
std::ostringstream command;
command << "UPDATE " << tableName
<< " SET prepareStatus=" << PREPARE_REMOVE
<< " , xid=?"
<< " WHERE queueId = " << queueId
<< " AND messageId = " << messageId << std::ends;
_CommandPtr cmd = NULL;
_ParameterPtr xidVal = NULL;
TESTHR(cmd.CreateInstance(__uuidof(Command)));
TESTHR(xidVal.CreateInstance(__uuidof(Parameter)));
_ConnectionPtr p = *dbConn;
cmd->ActiveConnection = p;
cmd->CommandText = command.str().c_str();
cmd->CommandType = adCmdText;
xidVal->Name = "@xid";
xidVal->Type = adVarBinary;
xidVal->Size = xid.length();
xidVal->Direction = adParamInput;
xidVal->Value = BlobEncoder(xid);
cmd->Parameters->Append(xidVal);
cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
}
void
MessageMapRecordset::removeForQueue(uint64_t queueId)
{
std::ostringstream command;
command << "DELETE FROM " << tableName
<< " WHERE queueId = " << queueId << std::ends;
_CommandPtr cmd = NULL;
TESTHR(cmd.CreateInstance(__uuidof(Command)));
_ConnectionPtr p = *dbConn;
cmd->ActiveConnection = p;
cmd->CommandText = command.str().c_str();
cmd->CommandType = adCmdText;
cmd->Execute(NULL, NULL, adCmdText | adExecuteNoRecords);
}
void
MessageMapRecordset::commitPrepared(const std::string& xid)
{
// Find all the records for the specified xid. Records marked as adding
// are now permanent so remove the xid and prepareStatus. Records marked
// as removing are removed entirely.
openRs();
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
for (; !rs->EndOfFile; rs->MoveNext()) {
if (m.xidStatus != adFldOK)
continue;
const std::string x(m.xid, m.xidLength);
if (x != xid)
continue;
if (m.prepareStatus == PREPARE_REMOVE) {
rs->Delete(adAffectCurrent);
}
else {
_variant_t dbNull;
dbNull.ChangeType(VT_NULL);
rs->Fields->GetItem("prepareStatus")->Value = dbNull;
rs->Fields->GetItem("xid")->Value = dbNull;
}
rs->Update();
}
piAdoRecordBinding->Release();
}
void
MessageMapRecordset::abortPrepared(const std::string& xid)
{
// Find all the records for the specified xid. Records marked as adding
// need to be removed while records marked as removing are put back to
// no xid and no prepareStatus.
openRs();
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
for (; !rs->EndOfFile; rs->MoveNext()) {
if (m.xidStatus != adFldOK)
continue;
const std::string x(m.xid, m.xidLength);
if (x != xid)
continue;
if (m.prepareStatus == PREPARE_ADD) {
rs->Delete(adAffectCurrent);
}
else {
_variant_t dbNull;
dbNull.ChangeType(VT_NULL);
rs->Fields->GetItem("prepareStatus")->Value = dbNull;
rs->Fields->GetItem("xid")->Value = dbNull;
}
rs->Update();
}
piAdoRecordBinding->Release();
}
void
MessageMapRecordset::recover(MessageQueueMap& msgMap)
{
openRs();
if (rs->BOF && rs->EndOfFile)
return; // Nothing to do
rs->MoveFirst();
MessageMap b;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&b);
while (!rs->EndOfFile) {
qpid::store::QueueEntry entry(b.queueId);
if (b.xidStatus == adFldOK && b.xidLength > 0) {
entry.xid.assign(b.xid, b.xidLength);
entry.tplStatus =
b.prepareStatus == PREPARE_ADD ? QueueEntry::ADDING
: QueueEntry::REMOVING;
}
else {
entry.tplStatus = QueueEntry::NONE;
}
msgMap[b.messageId].push_back(entry);
rs->MoveNext();
}
piAdoRecordBinding->Release();
}
void
MessageMapRecordset::dump()
{
openRs();
Recordset::dump();
if (rs->EndOfFile && rs->BOF) // No records
return;
rs->MoveFirst();
MessageMap m;
IADORecordBinding *piAdoRecordBinding;
rs->QueryInterface(__uuidof(IADORecordBinding),
(LPVOID *)&piAdoRecordBinding);
piAdoRecordBinding->BindToRecordset(&m);
while (!rs->EndOfFile) {
QPID_LOG(notice, "msg " << m.messageId << " on queue " << m.queueId);
rs->MoveNext();
}
piAdoRecordBinding->Release();
}
}}} // namespace qpid::store::ms_sql