| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.virtualhost; |
| |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQStoreException; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.server.binding.BindingFactory; |
| import org.apache.qpid.server.exchange.Exchange; |
| import org.apache.qpid.server.federation.BrokerLink; |
| import org.apache.qpid.server.logging.actors.CurrentActor; |
| import org.apache.qpid.server.logging.messages.TransactionLogMessages; |
| import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; |
| import org.apache.qpid.server.message.AMQMessage; |
| import org.apache.qpid.server.message.AbstractServerMessageImpl; |
| import org.apache.qpid.server.message.EnqueableMessage; |
| import org.apache.qpid.server.message.MessageTransferMessage; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.queue.AMQQueueFactory; |
| import org.apache.qpid.server.queue.QueueEntry; |
| import org.apache.qpid.server.store.ConfigurationRecoveryHandler; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.MessageStoreRecoveryHandler; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.store.TransactionLogRecoveryHandler; |
| import org.apache.qpid.server.store.TransactionLogResource; |
| import org.apache.qpid.server.txn.DtxBranch; |
| import org.apache.qpid.server.txn.DtxRegistry; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.transport.Xid; |
| import org.apache.qpid.transport.util.Functions; |
| import org.apache.qpid.util.ByteBufferInputStream; |
| |
| public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, |
| ConfigurationRecoveryHandler.QueueRecoveryHandler, |
| ConfigurationRecoveryHandler.ExchangeRecoveryHandler, |
| ConfigurationRecoveryHandler.BindingRecoveryHandler, |
| ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, |
| MessageStoreRecoveryHandler, |
| MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, |
| TransactionLogRecoveryHandler, |
| TransactionLogRecoveryHandler.QueueEntryRecoveryHandler, |
| TransactionLogRecoveryHandler.DtxRecordRecoveryHandler |
| { |
| private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); |
| |
| |
| private final VirtualHost _virtualHost; |
| |
| private MessageStoreLogSubject _logSubject; |
| private List<ProcessAction> _actions; |
| |
| private MessageStore _store; |
| |
| private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); |
| private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); |
| private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); |
| |
| |
| |
| public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost) |
| { |
| _virtualHost = virtualHost; |
| } |
| |
| public VirtualHostConfigRecoveryHandler begin(MessageStore store) |
| { |
| _logSubject = new MessageStoreLogSubject(_virtualHost,store); |
| _store = store; |
| CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); |
| |
| return this; |
| } |
| |
| public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) |
| { |
| try |
| { |
| AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); |
| |
| if (q == null) |
| { |
| q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, |
| FieldTable.convertToMap(arguments)); |
| _virtualHost.getQueueRegistry().registerQueue(q); |
| } |
| |
| CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); |
| |
| //Record that we have a queue for recovery |
| _queueRecoveries.put(queueName, 0); |
| } |
| catch (AMQException e) |
| { |
| // TODO |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public ExchangeRecoveryHandler completeQueueRecovery() |
| { |
| return this; |
| } |
| |
| public void exchange(String exchangeName, String type, boolean autoDelete) |
| { |
| try |
| { |
| Exchange exchange; |
| AMQShortString exchangeNameSS = new AMQShortString(exchangeName); |
| exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); |
| if (exchange == null) |
| { |
| exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); |
| _virtualHost.getExchangeRegistry().registerExchange(exchange); |
| } |
| } |
| catch (AMQException e) |
| { |
| // TODO |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public BindingRecoveryHandler completeExchangeRecovery() |
| { |
| return this; |
| } |
| |
| public StoredMessageRecoveryHandler begin() |
| { |
| // TODO - log begin |
| return this; |
| } |
| |
| public void message(StoredMessage message) |
| { |
| AbstractServerMessageImpl serverMessage; |
| switch(message.getMetaData().getType()) |
| { |
| case META_DATA_0_8: |
| serverMessage = new AMQMessage(message); |
| break; |
| case META_DATA_0_10: |
| serverMessage = new MessageTransferMessage(message, null); |
| break; |
| default: |
| throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); |
| } |
| |
| _recoveredMessages.put(message.getMessageNumber(), serverMessage); |
| _unusedMessages.put(message.getMessageNumber(), message); |
| } |
| |
| public void completeMessageRecovery() |
| { |
| //TODO - log end |
| } |
| |
| public BridgeRecoveryHandler brokerLink(final UUID id, |
| final long createTime, |
| final Map<String, String> arguments) |
| { |
| BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); |
| return new BridgeRecoveryHandlerImpl(blink); |
| |
| } |
| |
| public void completeBrokerLinkRecovery() |
| { |
| } |
| |
| public void dtxRecord(long format, byte[] globalId, byte[] branchId, |
| MessageStore.Transaction.Record[] enqueues, |
| MessageStore.Transaction.Record[] dequeues) |
| { |
| Xid id = new Xid(format, globalId, branchId); |
| DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); |
| DtxBranch branch = dtxRegistry.getBranch(id); |
| if(branch == null) |
| { |
| branch = new DtxBranch(id, _store, _virtualHost); |
| dtxRegistry.registerBranch(branch); |
| } |
| for(MessageStore.Transaction.Record record : enqueues) |
| { |
| final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); |
| if(queue != null) |
| { |
| final long messageId = record.getMessage().getMessageNumber(); |
| final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| message.incrementReference(); |
| |
| branch.enqueue(queue,message); |
| |
| branch.addPostTransactionAcion(new ServerTransaction.Action() |
| { |
| |
| public void postCommit() |
| { |
| try |
| { |
| |
| queue.enqueue(message, true, null); |
| message.decrementReference(); |
| } |
| catch (AMQException e) |
| { |
| _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " + |
| "queue " + queue.getName() + " (from XA transaction)", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void onRollback() |
| { |
| message.decrementReference(); |
| } |
| }); |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| String messageNumberString = String.valueOf(message.getMessageNumber()); |
| CurrentActor.get().message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), |
| messageNumberString)); |
| |
| } |
| |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| CurrentActor.get().message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), |
| record.getQueue().getResourceName())); |
| |
| } |
| } |
| for(MessageStore.Transaction.Record record : dequeues) |
| { |
| final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); |
| if(queue != null) |
| { |
| final long messageId = record.getMessage().getMessageNumber(); |
| final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| final QueueEntry entry = queue.getMessageOnTheQueue(messageId); |
| |
| entry.acquire(); |
| |
| branch.dequeue(queue, message); |
| |
| branch.addPostTransactionAcion(new ServerTransaction.Action() |
| { |
| |
| public void postCommit() |
| { |
| entry.discard(); |
| } |
| |
| public void onRollback() |
| { |
| entry.release(); |
| } |
| }); |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| String messageNumberString = String.valueOf(message.getMessageNumber()); |
| CurrentActor.get().message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), |
| messageNumberString)); |
| |
| } |
| |
| } |
| else |
| { |
| StringBuilder xidString = xidAsString(id); |
| CurrentActor.get().message(_logSubject, |
| TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), |
| queue.getName())); |
| } |
| |
| } |
| |
| try |
| { |
| branch.setState(DtxBranch.State.PREPARED); |
| branch.prePrepareTransaction(); |
| } |
| catch (AMQStoreException e) |
| { |
| _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " + |
| xidAsString(id), e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static StringBuilder xidAsString(Xid id) |
| { |
| return new StringBuilder("(") |
| .append(id.getFormat()) |
| .append(',') |
| .append(Functions.str(id.getGlobalId())) |
| .append(',') |
| .append(Functions.str(id.getBranchId())) |
| .append(')'); |
| } |
| |
| public void completeDtxRecordRecovery() |
| { |
| for(StoredMessage m : _unusedMessages.values()) |
| { |
| _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); |
| m.remove(); |
| } |
| CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); |
| } |
| |
| private static final class ProcessAction |
| { |
| private final AMQQueue _queue; |
| private final AMQMessage _message; |
| |
| public ProcessAction(AMQQueue queue, AMQMessage message) |
| { |
| _queue = queue; |
| _message = message; |
| } |
| |
| public void process() |
| { |
| try |
| { |
| _queue.enqueue(_message); |
| } |
| catch(AMQException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| } |
| |
| public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) |
| { |
| _actions = new ArrayList<ProcessAction>(); |
| try |
| { |
| Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); |
| if (exchange == null) |
| { |
| _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName); |
| return; |
| } |
| |
| AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName)); |
| if (queue == null) |
| { |
| _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName); |
| } |
| else |
| { |
| FieldTable argumentsFT = null; |
| if(buf != null) |
| { |
| try |
| { |
| argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException("IOException should not be thrown here", e); |
| } |
| } |
| |
| BindingFactory bf = _virtualHost.getBindingFactory(); |
| |
| Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT); |
| |
| if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) |
| { |
| |
| _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName |
| + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); |
| |
| bf.restoreBinding(bindingKey, queue, exchange, argumentMap); |
| } |
| } |
| } |
| catch (AMQException e) |
| { |
| throw new RuntimeException(e); |
| } |
| |
| } |
| |
| public BrokerLinkRecoveryHandler completeBindingRecovery() |
| { |
| return this; |
| } |
| |
| public void complete() |
| { |
| |
| |
| } |
| |
| public void queueEntry(final String queueName, long messageId) |
| { |
| AMQShortString queueNameShortString = new AMQShortString(queueName); |
| |
| AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); |
| |
| try |
| { |
| if(queue != null) |
| { |
| ServerMessage message = _recoveredMessages.get(messageId); |
| _unusedMessages.remove(messageId); |
| |
| if(message != null) |
| { |
| |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString()); |
| } |
| |
| Integer count = _queueRecoveries.get(queueName); |
| if (count == null) |
| { |
| count = 0; |
| } |
| |
| queue.enqueue(message); |
| |
| _queueRecoveries.put(queueName, ++count); |
| } |
| else |
| { |
| _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); |
| MessageStore.Transaction txn = _store.newTransaction(); |
| txn.dequeueMessage(queue, new DummyMessage(messageId)); |
| txn.commitTranAsync(); |
| } |
| } |
| else |
| { |
| _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); |
| MessageStore.Transaction txn = _store.newTransaction(); |
| TransactionLogResource mockQueue = |
| new TransactionLogResource() |
| { |
| |
| public String getResourceName() |
| { |
| return queueName; |
| } |
| }; |
| txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); |
| txn.commitTranAsync(); |
| } |
| |
| } |
| catch(AMQException e) |
| { |
| throw new RuntimeException(e); |
| } |
| |
| |
| |
| } |
| |
| public DtxRecordRecoveryHandler completeQueueEntryRecovery() |
| { |
| |
| for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) |
| { |
| CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); |
| |
| CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); |
| } |
| |
| |
| |
| return this; |
| } |
| |
| private static class DummyMessage implements EnqueableMessage |
| { |
| |
| |
| private final long _messageId; |
| |
| public DummyMessage(long messageId) |
| { |
| _messageId = messageId; |
| } |
| |
| public long getMessageNumber() |
| { |
| return _messageId; |
| } |
| |
| |
| public boolean isPersistent() |
| { |
| return true; |
| } |
| |
| |
| public StoredMessage getStoredMessage() |
| { |
| return null; |
| } |
| } |
| |
| private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler |
| { |
| private final BrokerLink _blink; |
| |
| public BridgeRecoveryHandlerImpl(final BrokerLink blink) |
| { |
| _blink = blink; |
| } |
| |
| public void bridge(final UUID id, final long createTime, final Map<String, String> arguments) |
| { |
| _blink.createBridge(id, createTime, arguments); |
| } |
| |
| public void completeBridgeRecoveryForLink() |
| { |
| } |
| } |
| } |