blob: 84b87744a6eff1ba9a87e8cef6d9d39e3641649f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.RecoverableBaseQueue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
private static final Logger _logger = LoggerFactory.getLogger(AsynchronousMessageStoreRecoverer.class);
private AsynchronousRecoverer _asynchronousRecoverer;
@Override
public ListenableFuture<Void> recover(final VirtualHost<?> virtualHost)
{
_asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
return _asynchronousRecoverer.recover();
}
@Override
public void cancel()
{
if (_asynchronousRecoverer != null)
{
_asynchronousRecoverer.cancel();
}
}
private static class AsynchronousRecoverer
{
private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousRecoverer.class);
public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000;
private final VirtualHost<?> _virtualHost;
private final EventLogger _eventLogger;
private final MessageStore _store;
private final MessageStoreLogSubject _logSubject;
private final long _maxMessageId;
private final Set<Queue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
private final MessageStore.MessageStoreReader _storeReader;
private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
private AsynchronousRecoverer(final VirtualHost<?> virtualHost)
{
_virtualHost = virtualHost;
_eventLogger = virtualHost.getEventLogger();
_store = virtualHost.getMessageStore();
_storeReader = _store.newMessageStoreReader();
_logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName());
_maxMessageId = _store.getNextMessageId();
Collection children = _virtualHost.getChildren(Queue.class);
_recoveringQueues.addAll((Collection<? extends Queue<?>>) children);
}
public ListenableFuture<Void> recover()
{
getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
List<ListenableFuture<Void>> queueRecoveryFutures = new ArrayList<>();
for(Queue<?> queue : _recoveringQueues)
{
ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
queueRecoveryFutures.add(result);
}
ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(_virtualHost.getTransferQueue()), null);
queueRecoveryFutures.add(result);
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
return Futures.transform(combinedFuture, new Function<List<?>, Void>()
{
@Override
public Void apply(List<?> voids)
{
return null;
}
});
}
public VirtualHost<?> getVirtualHost()
{
return _virtualHost;
}
public EventLogger getEventLogger()
{
return _eventLogger;
}
public MessageStore.MessageStoreReader getStoreReader()
{
return _storeReader;
}
public MessageStoreLogSubject getLogSubject()
{
return _logSubject;
}
private boolean isRecovering(Queue<?> queue)
{
return _recoveringQueues.contains(queue);
}
private void recoverQueue(RecoverableBaseQueue queue)
{
MessageInstanceVisitor handler = new MessageInstanceVisitor(queue);
_storeReader.visitMessageInstances(queue, handler);
getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName()));
getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
queue.completeRecovery();
_recoveringQueues.remove(queue);
if (_recoveringQueues.isEmpty() && _recoveryComplete.compareAndSet(false, true))
{
completeRecovery();
}
}
private synchronized void completeRecovery()
{
// at this point nothing should be writing to the map of recovered messages
for (Map.Entry<Long,MessageReference<? extends ServerMessage<?>>> entry : _recoveredMessages.entrySet())
{
entry.getValue().release();
entry.setValue(null); // free up any memory associated with the reference object
}
final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
getStoreReader().visitMessages(new MessageHandler()
{
@Override
public boolean handle(final StoredMessage<?> storedMessage)
{
long messageNumber = storedMessage.getMessageNumber();
if (!_recoveredMessages.containsKey(messageNumber))
{
messagesToDelete.add(storedMessage);
}
return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
}
});
for(StoredMessage<?> storedMessage : messagesToDelete)
{
if (_continueRecovery.get())
{
_logger.info("Message id "
+ storedMessage.getMessageNumber()
+ " in store, but not in any queue - removing....");
storedMessage.remove();
}
}
messagesToDelete.clear();
_recoveredMessages.clear();
_storeReader.close();
}
private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)
{
MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId);
if (ref == null)
{
StoredMessage<?> message = _storeReader.getMessage(messageId);
if(message != null)
{
StorableMessageMetaData metaData = message.getMetaData();
@SuppressWarnings("rawtypes")
MessageMetaDataType type = metaData.getType();
@SuppressWarnings("unchecked")
ServerMessage<?> serverMessage = type.createMessage(message);
ref = serverMessage.newReference();
_recoveredMessages.put(messageId, ref);
}
}
return ref == null ? null : ref.getMessage();
}
public void cancel()
{
_continueRecovery.set(false);
_queueRecoveryExecutor.shutdown();
try
{
boolean wasShutdown = _queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!wasShutdown)
{
_logger.warn("Failed to gracefully shutdown queue recovery executor within permitted time period");
_queueRecoveryExecutor.shutdownNow();
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
_storeReader.close();
}
private class DistributedTransactionVisitor implements DistributedTransactionHandler
{
@Override
public boolean handle(final Transaction.StoredXidRecord storedXid,
final Transaction.EnqueueRecord[] enqueues,
final Transaction.DequeueRecord[] dequeues)
{
Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
DtxBranch branch = dtxRegistry.getBranch(id);
if (branch == null)
{
branch = new DtxBranch(storedXid, dtxRegistry);
dtxRegistry.registerBranch(branch);
}
for (Transaction.EnqueueRecord record : enqueues)
{
final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getResource().getId());
if (queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
final ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
{
final MessageReference<?> ref = message.newReference();
final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
{
@Override
public void performAction(final MessageEnqueueRecord record)
{
records[0] = record;
}
});
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
queue.enqueue(message, null, records[0]);
ref.release();
}
@Override
public void onRollback()
{
ref.release();
}
});
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(
messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getResource()
.getId()
.toString()));
}
}
for (Transaction.DequeueRecord record : dequeues)
{
final Queue<?> queue = getVirtualHost().getAttainedQueue(record.getEnqueueRecord().getQueueId());
if (queue != null)
{
// For DTX to work correctly the queues which have uncommitted branches with dequeues
// must be synchronously recovered
if (isRecovering(queue))
{
recoverQueue(queue);
}
final long messageId = record.getEnqueueRecord().getMessageNumber();
final ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
if (entry.acquire())
{
branch.dequeue(entry.getEnqueueRecord());
branch.addPostTransactionAction(new ServerTransaction.Action()
{
public void postCommit()
{
entry.delete();
}
public void onRollback()
{
entry.release();
}
});
}
else
{
// Should never happen - dtx recovery is always synchronous and occurs before
// any other message actors are allowed to act on the virtualhost.
throw new ServerScopedRuntimeException(
"Distributed transaction dequeue handler failed to acquire " + entry +
" during recovery of queue " + queue);
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
Long.toString(
messageId)));
}
}
else
{
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
record.getEnqueueRecord()
.getQueueId()
.toString()));
}
}
branch.setState(DtxBranch.State.PREPARED);
branch.prePrepareTransaction();
return _continueRecovery.get();
}
private StringBuilder xidAsString(Xid id)
{
return new StringBuilder("(")
.append(id.getFormat())
.append(',')
.append(Functions.str(id.getGlobalId()))
.append(',')
.append(Functions.str(id.getBranchId()))
.append(')');
}
}
private class QueueRecoveringTask implements Runnable
{
private final RecoverableBaseQueue _queue;
public QueueRecoveringTask(final RecoverableBaseQueue queue)
{
_queue = queue;
}
@Override
public void run()
{
String originalThreadName = Thread.currentThread().getName();
Thread.currentThread().setName("Queue Recoverer : " + _queue.getName() + " (vh: " + getVirtualHost().getName() + ")");
try
{
recoverQueue(_queue);
}
finally
{
Thread.currentThread().setName(originalThreadName);
}
}
}
private class MessageInstanceVisitor implements MessageInstanceHandler
{
private final RecoverableBaseQueue _queue;
long _recoveredCount;
private MessageInstanceVisitor(RecoverableBaseQueue queue)
{
_queue = queue;
}
@Override
public boolean handle(final MessageEnqueueRecord record)
{
long messageId = record.getMessageNumber();
String queueName = _queue.getName();
if(messageId < _maxMessageId)
{
ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
{
if (_logger.isDebugEnabled())
{
_logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
}
_queue.recover(message, record);
_recoveredCount++;
}
else
{
_logger.warn("Message id "
+ messageId
+ " referenced in log as enqueued in queue "
+ queueName
+ " is unknown, entry will be discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(record);
txn.commitTranAsync((Void) null);
}
return _continueRecovery.get();
}
else
{
return false;
}
}
public long getRecoveredCount()
{
return _recoveredCount;
}
}
}
}