blob: 9402ca3871f34eef1ebb1c0a7ed2dcb47a1b44a2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.Transaction.EnqueueRecord;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
private static final Logger LOGGER = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);
@Override
public ListenableFuture<Void> recover(QueueManagingVirtualHost<?> virtualHost)
{
EventLogger eventLogger = virtualHost.getEventLogger();
MessageStore store = virtualHost.getMessageStore();
MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader();
MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName());
Map<Queue<?>, Integer> queueRecoveries = new TreeMap<>();
Map<Long, ServerMessage<?>> recoveredMessages = new HashMap<>();
Map<Long, StoredMessage<?>> unusedMessages = new TreeMap<>();
Map<UUID, Integer> unknownQueuesWithMessages = new HashMap<>();
Map<Queue<?>, Integer> queuesWithUnknownMessages = new HashMap<>();
eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START());
storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false));
try
{
storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost,
store,
queueRecoveries,
recoveredMessages,
unusedMessages,
unknownQueuesWithMessages,
queuesWithUnknownMessages));
}
finally
{
if (!unknownQueuesWithMessages.isEmpty())
{
unknownQueuesWithMessages.forEach((queueId, count) -> {
LOGGER.info("Discarded {} entry(s) associated with queue id '{}' as a queue with this "
+ "id does not appear in the configuration.",
count, queueId);
});
}
if (!queuesWithUnknownMessages.isEmpty())
{
queuesWithUnknownMessages.forEach((queue, count) -> {
LOGGER.info("Discarded {} entry(s) associated with queue '{}' as the referenced message "
+ "does not exist.",
count, queue.getName());
});
}
}
for(Map.Entry<Queue<?>, Integer> entry : queueRecoveries.entrySet())
{
Queue<?> queue = entry.getKey();
Integer deliveredCount = entry.getValue();
eventLogger.message(logSubject, TransactionLogMessages.RECOVERED(deliveredCount, queue.getName()));
eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
queue.completeRecovery();
}
for (Queue<?> q : virtualHost.getChildren(Queue.class))
{
if (!queueRecoveries.containsKey(q))
{
q.completeRecovery();
}
}
storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost,
eventLogger,
logSubject, recoveredMessages, unusedMessages));
for(StoredMessage<?> m : unusedMessages.values())
{
LOGGER.debug("Message id '{}' is orphaned, removing", m.getMessageNumber());
m.remove();
}
if (unusedMessages.size() > 0)
{
LOGGER.info("Discarded {} orphaned message(s).", unusedMessages.size());
}
eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
eventLogger.message(logSubject,
MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size()));
eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
return Futures.immediateFuture(null);
}
@Override
public void cancel()
{
// No-op
}
private static class MessageVisitor implements MessageHandler
{
private final Map<Long, ServerMessage<?>> _recoveredMessages;
private final Map<Long, StoredMessage<?>> _unusedMessages;
MessageVisitor(final Map<Long, ServerMessage<?>> recoveredMessages,
final Map<Long, StoredMessage<?>> unusedMessages)
{
_recoveredMessages = recoveredMessages;
_unusedMessages = unusedMessages;
}
@Override
public boolean handle(StoredMessage<?> message)
{
StorableMessageMetaData metaData = message.getMetaData();
@SuppressWarnings("rawtypes")
MessageMetaDataType type = metaData.getType();
@SuppressWarnings("unchecked")
ServerMessage<?> serverMessage = type.createMessage(message);
_recoveredMessages.put(message.getMessageNumber(), serverMessage);
_unusedMessages.put(message.getMessageNumber(), message);
return true;
}
}
private static class MessageInstanceVisitor implements MessageInstanceHandler
{
private final QueueManagingVirtualHost<?> _virtualHost;
private final MessageStore _store;
private final Map<Queue<?>, Integer> _queueRecoveries;
private final Map<Long, ServerMessage<?>> _recoveredMessages;
private final Map<Long, StoredMessage<?>> _unusedMessages;
private final Map<UUID, Integer> _unknownQueuesWithMessages;
private Map<Queue<?>, Integer> _queuesWithUnknownMessages;
private MessageInstanceVisitor(final QueueManagingVirtualHost<?> virtualHost,
final MessageStore store,
final Map<Queue<?>, Integer> queueRecoveries,
final Map<Long, ServerMessage<?>> recoveredMessages,
final Map<Long, StoredMessage<?>> unusedMessages,
final Map<UUID, Integer> unknownQueuesWithMessages,
final Map<Queue<?>, Integer> queuesWithUnknownMessages)
{
_virtualHost = virtualHost;
_store = store;
_queueRecoveries = queueRecoveries;
_recoveredMessages = recoveredMessages;
_unusedMessages = unusedMessages;
_unknownQueuesWithMessages = unknownQueuesWithMessages;
_queuesWithUnknownMessages = queuesWithUnknownMessages;
}
@Override
public boolean handle(final MessageEnqueueRecord record)
{
final UUID queueId = record.getQueueId();
long messageId = record.getMessageNumber();
Queue<?> queue = _virtualHost.getAttainedQueue(queueId);
boolean dequeueMessageInstance = true;
if(queue != null)
{
String queueName = queue.getName();
ServerMessage<?> message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if (message != null)
{
LOGGER.debug("Delivering message id '{}' to queue '{}'", message.getMessageNumber(), queueName);
_queueRecoveries.merge(queue, 1, (old, unused) -> old + 1);
queue.recover(message, record);
dequeueMessageInstance = false;
}
else
{
LOGGER.debug("Message id '{}' referenced in log as enqueued in queue '{}' is unknown, entry will be discarded",
messageId, queueName);
_queuesWithUnknownMessages.merge(queue, 1, (old, unused) -> old + 1);
}
}
else
{
LOGGER.debug(
"Message id '{}' in log references queue with id '{}' which is not in the configuration, entry will be discarded",
messageId, queueId);
_unknownQueuesWithMessages.merge(queueId, 1, (old, unused) -> old + 1);
}
if (dequeueMessageInstance)
{
Transaction txn = _store.newTransaction();
txn.dequeueMessage(record);
txn.commitTranAsync((Void) null);
}
return true;
}
}
private static class DistributedTransactionVisitor implements DistributedTransactionHandler
{
private final QueueManagingVirtualHost<?> _virtualHost;
private final EventLogger _eventLogger;
private final MessageStoreLogSubject _logSubject;
private final Map<Long, ServerMessage<?>> _recoveredMessages;
private final Map<Long, StoredMessage<?>> _unusedMessages;
private DistributedTransactionVisitor(final QueueManagingVirtualHost<?> virtualHost,
final EventLogger eventLogger,
final MessageStoreLogSubject logSubject,
final Map<Long, ServerMessage<?>> recoveredMessages,
final Map<Long, StoredMessage<?>> unusedMessages)
{
_virtualHost = virtualHost;
_eventLogger = eventLogger;
_logSubject = logSubject;
_recoveredMessages = recoveredMessages;
_unusedMessages = unusedMessages;
}
@Override
public boolean handle(final Transaction.StoredXidRecord storedXid,
final Transaction.EnqueueRecord[] enqueues,
final Transaction.DequeueRecord[] dequeues)
{
Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
DtxBranch branch = dtxRegistry.getBranch(id);
if(branch == null)
{
branch = new DtxBranch(storedXid, dtxRegistry);
dtxRegistry.registerBranch(branch);
}
for(EnqueueRecord record : enqueues)
{
final Queue<?> queue = _virtualHost.getAttainedQueue(record.getResource().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
final ServerMessage<?> message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
{
final MessageReference<?> ref = message.newReference();
final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
{
@Override
public void performAction(final MessageEnqueueRecord record)
{
records[0] = record;
}
});
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
queue.enqueue(message, null, records[0]);
ref.release();
}
@Override
public void onRollback()
{
ref.release();
}
});
}
else
{
StringBuilder xidString = xidAsString(id);
_eventLogger.message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
_eventLogger.message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getResource().getId().toString()));
}
}
for(Transaction.DequeueRecord record : dequeues)
{
final Queue<?> queue = _virtualHost.getAttainedQueue(record.getEnqueueRecord().getQueueId());
if(queue != null)
{
final long messageId = record.getEnqueueRecord().getMessageNumber();
final ServerMessage<?> message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
if (entry.acquire())
{
branch.dequeue(entry.getEnqueueRecord());
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
else
{
// Should never happen - dtx recovery is always synchronous and occurs before
// any other message actors are allowed to act on the virtualhost.
throw new ServerScopedRuntimeException(
"Distributed transaction dequeue handler failed to acquire " + entry +
" during recovery of queue " + queue);
}
}
else
{
StringBuilder xidString = xidAsString(id);
_eventLogger.message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
_eventLogger.message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getEnqueueRecord().getQueueId().toString()));
}
}
branch.setState(DtxBranch.State.PREPARED);
branch.prePrepareTransaction();
return true;
}
private StringBuilder xidAsString(Xid id)
{
return new StringBuilder("(")
.append(id.getFormat())
.append(',')
.append(Functions.str(id.getGlobalId()))
.append(',')
.append(Functions.str(id.getBranchId()))
.append(')');
}
}
}