| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.virtualhost; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.server.logging.EventLogger; |
| import org.apache.qpid.server.logging.messages.MessageStoreMessages; |
| import org.apache.qpid.server.logging.messages.TransactionLogMessages; |
| import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; |
| import org.apache.qpid.server.message.MessageReference; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.model.Queue; |
| import org.apache.qpid.server.model.VirtualHost; |
| import org.apache.qpid.server.plugin.MessageMetaDataType; |
| import org.apache.qpid.server.queue.QueueEntry; |
| import org.apache.qpid.server.queue.RecoverableBaseQueue; |
| import org.apache.qpid.server.store.MessageEnqueueRecord; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StorableMessageMetaData; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.store.Transaction; |
| import org.apache.qpid.server.store.Transaction.EnqueueRecord; |
| import org.apache.qpid.server.store.handler.DistributedTransactionHandler; |
| import org.apache.qpid.server.store.handler.MessageHandler; |
| import org.apache.qpid.server.store.handler.MessageInstanceHandler; |
| import org.apache.qpid.server.transfer.TransferQueue; |
| import org.apache.qpid.server.txn.DtxBranch; |
| import org.apache.qpid.server.txn.DtxRegistry; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.util.Action; |
| import org.apache.qpid.server.util.ServerScopedRuntimeException; |
| import org.apache.qpid.transport.Xid; |
| import org.apache.qpid.transport.util.Functions; |
| |
| public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class); |
| |
| @Override |
| public ListenableFuture<Void> recover(VirtualHost<?> virtualHost) |
| { |
| EventLogger eventLogger = virtualHost.getEventLogger(); |
| MessageStore store = virtualHost.getMessageStore(); |
| MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader(); |
| MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName()); |
| |
| Map<String, Integer> queueRecoveries = new TreeMap<>(); |
| Map<Long, ServerMessage<?>> recoveredMessages = new HashMap<>(); |
| Map<Long, StoredMessage<?>> unusedMessages = new HashMap<>(); |
| |
| |
| eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START()); |
| |
| storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages)); |
| |
| eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false)); |
| storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries, |
| recoveredMessages, unusedMessages)); |
| for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet()) |
| { |
| eventLogger.message(logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); |
| eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); |
| Queue queue = virtualHost.getAttainedChildFromAddress(Queue.class, entry.getKey()); |
| if(queue != null) |
| { |
| queue.completeRecovery(); |
| } |
| } |
| |
| for(Queue<?> q : virtualHost.getChildren(Queue.class)) |
| { |
| if(!queueRecoveries.containsKey(q.getName())) |
| { |
| q.completeRecovery(); |
| } |
| } |
| TransferQueue q = virtualHost.getTransferQueue(); |
| q.completeRecovery(); |
| |
| storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger, |
| logSubject, recoveredMessages, unusedMessages)); |
| |
| |
| |
| for(StoredMessage<?> m : unusedMessages.values()) |
| { |
| _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); |
| m.remove(); |
| } |
| eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); |
| |
| eventLogger.message(logSubject, |
| MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size())); |
| eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); |
| |
| return Futures.immediateFuture(null); |
| } |
| |
| @Override |
| public void cancel() |
| { |
| // No-op |
| } |
| |
| private static class MessageVisitor implements MessageHandler |
| { |
| |
| private final Map<Long, ServerMessage<?>> _recoveredMessages; |
| private final Map<Long, StoredMessage<?>> _unusedMessages; |
| |
| public MessageVisitor(final Map<Long, ServerMessage<?>> recoveredMessages, |
| final Map<Long, StoredMessage<?>> unusedMessages) |
| { |
| _recoveredMessages = recoveredMessages; |
| _unusedMessages = unusedMessages; |
| } |
| |
| @Override |
| public boolean handle(StoredMessage<?> message) |
| { |
| StorableMessageMetaData metaData = message.getMetaData(); |
| |
| @SuppressWarnings("rawtypes") |
| MessageMetaDataType type = metaData.getType(); |
| |
| @SuppressWarnings("unchecked") |
| ServerMessage<?> serverMessage = type.createMessage(message); |
| |
| _recoveredMessages.put(message.getMessageNumber(), serverMessage); |
| _unusedMessages.put(message.getMessageNumber(), message); |
| return true; |
| } |
| |
| } |
| |
| private static class MessageInstanceVisitor implements MessageInstanceHandler |
| { |
| private final VirtualHost<?> _virtualHost; |
| private final MessageStore _store; |
| |
| private final Map<String, Integer> _queueRecoveries; |
| private final Map<Long, ServerMessage<?>> _recoveredMessages; |
| private final Map<Long, StoredMessage<?>> _unusedMessages; |
| |
| private MessageInstanceVisitor(final VirtualHost<?> virtualHost, |
| final MessageStore store, |
| final Map<String, Integer> queueRecoveries, |
| final Map<Long, ServerMessage<?>> recoveredMessages, |
| final Map<Long, StoredMessage<?>> unusedMessages) |
| { |
| _virtualHost = virtualHost; |
| _store = store; |
| _queueRecoveries = queueRecoveries; |
| _recoveredMessages = recoveredMessages; |
| _unusedMessages = unusedMessages; |
| } |
| |
| @Override |
| public boolean handle(final MessageEnqueueRecord record) |
| { |
| final UUID queueId = record.getQueueId(); |
| long messageId = record.getMessageNumber(); |
| RecoverableBaseQueue queue = _virtualHost.getAttainedQueue(queueId); |
| if(queue == null && _virtualHost.getTransferQueue().getId().equals(queueId)) |
| { |
| queue = _virtualHost.getTransferQueue(); |
| } |
| if(queue != null) |
| { |
| String queueName = queue.getName(); |
| ServerMessage<?> message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); |
| } |
| |
| Integer count = _queueRecoveries.get(queueName); |
| if (count == null) |
| { |
| count = 0; |
| } |
| |
| queue.recover(message, record); |
| |
| _queueRecoveries.put(queueName, ++count); |
| } |
| else |
| { |
| _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); |
| Transaction txn = _store.newTransaction(); |
| txn.dequeueMessage(record); |
| txn.commitTranAsync((Void) null); |
| } |
| } |
| else |
| { |
| _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); |
| Transaction txn = _store.newTransaction(); |
| txn.dequeueMessage(record); |
| txn.commitTranAsync((Void) null); |
| } |
| return true; |
| } |
| } |
| |
| private static class DistributedTransactionVisitor implements DistributedTransactionHandler |
| { |
| |
| private final VirtualHost<?> _virtualHost; |
| private final MessageStore _store; |
| private final EventLogger _eventLogger; |
| private final MessageStoreLogSubject _logSubject; |
| |
| private final Map<Long, ServerMessage<?>> _recoveredMessages; |
| private final Map<Long, StoredMessage<?>> _unusedMessages; |
| |
| private DistributedTransactionVisitor(final VirtualHost<?> virtualHost, |
| final MessageStore store, |
| final EventLogger eventLogger, |
| final MessageStoreLogSubject logSubject, |
| final Map<Long, ServerMessage<?>> recoveredMessages, |
| final Map<Long, StoredMessage<?>> unusedMessages) |
| { |
| _virtualHost = virtualHost; |
| _store = store; |
| _eventLogger = eventLogger; |
| _logSubject = logSubject; |
| _recoveredMessages = recoveredMessages; |
| _unusedMessages = unusedMessages; |
| } |
| |
| @Override |
| public boolean handle(final Transaction.StoredXidRecord storedXid, |
| final Transaction.EnqueueRecord[] enqueues, |
| final Transaction.DequeueRecord[] dequeues) |
| { |
| Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId()); |
| DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); |
| DtxBranch branch = dtxRegistry.getBranch(id); |
| if(branch == null) |
| { |
| branch = new DtxBranch(storedXid, dtxRegistry); |
| dtxRegistry.registerBranch(branch); |
| } |
| for(EnqueueRecord record : enqueues) |
| { |
| final Queue<?> queue = _virtualHost.getAttainedQueue(record.getResource().getId()); |
| if(queue != null) |
| { |
| final long messageId = record.getMessage().getMessageNumber(); |
| final ServerMessage<?> message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| final MessageReference<?> ref = message.newReference(); |
| final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1]; |
| |
| branch.enqueue(queue, message, new Action<MessageEnqueueRecord>() |
| { |
| @Override |
| public void performAction(final MessageEnqueueRecord record) |
| { |
| records[0] = record; |
| } |
| }); |
| branch.addPostTransactionAction(new ServerTransaction.Action() |
| { |
| @Override |
| public void postCommit() |
| { |
| queue.enqueue(message, null, records[0]); |
| ref.release(); |
| } |
| |
| @Override |
| public void onRollback() |
| { |
| ref.release(); |
| } |
| }); |
| |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| _eventLogger.message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), |
| Long.toString(messageId))); |
| } |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| _eventLogger.message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), |
| record.getResource().getId().toString())); |
| |
| } |
| } |
| for(Transaction.DequeueRecord record : dequeues) |
| { |
| final Queue<?> queue = _virtualHost.getAttainedQueue(record.getEnqueueRecord().getQueueId()); |
| if(queue != null) |
| { |
| final long messageId = record.getEnqueueRecord().getMessageNumber(); |
| final ServerMessage<?> message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| final QueueEntry entry = queue.getMessageOnTheQueue(messageId); |
| |
| if (entry.acquire()) |
| { |
| branch.dequeue(entry.getEnqueueRecord()); |
| |
| branch.addPostTransactionAction(new ServerTransaction.Action() |
| { |
| |
| public void postCommit() |
| { |
| entry.delete(); |
| } |
| |
| public void onRollback() |
| { |
| entry.release(); |
| } |
| }); |
| } |
| else |
| { |
| // Should never happen - dtx recovery is always synchronous and occurs before |
| // any other message actors are allowed to act on the virtualhost. |
| throw new ServerScopedRuntimeException( |
| "Distributed transaction dequeue handler failed to acquire " + entry + |
| " during recovery of queue " + queue); |
| } |
| |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| _eventLogger.message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), |
| Long.toString(messageId))); |
| |
| } |
| |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| _eventLogger.message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), |
| record.getEnqueueRecord().getQueueId().toString())); |
| } |
| |
| } |
| |
| branch.setState(DtxBranch.State.PREPARED); |
| branch.prePrepareTransaction(); |
| return true; |
| } |
| |
| private StringBuilder xidAsString(Xid id) |
| { |
| return new StringBuilder("(") |
| .append(id.getFormat()) |
| .append(',') |
| .append(Functions.str(id.getGlobalId())) |
| .append(',') |
| .append(Functions.str(id.getBranchId())) |
| .append(')'); |
| } |
| |
| |
| } |
| |
| |
| } |