| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.tests.integration.client; |
| |
| import javax.transaction.xa.Xid; |
| import java.io.File; |
| import java.lang.invoke.MethodHandles; |
| import java.lang.management.ManagementFactory; |
| import java.nio.ByteBuffer; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Consumer; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQBuffer; |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.Message; |
| import org.apache.activemq.artemis.api.core.Pair; |
| import org.apache.activemq.artemis.api.core.QueueConfiguration; |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.client.ClientConsumer; |
| import org.apache.activemq.artemis.api.core.client.ClientMessage; |
| import org.apache.activemq.artemis.api.core.client.ClientProducer; |
| import org.apache.activemq.artemis.api.core.client.ClientSession; |
| import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; |
| import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; |
| import org.apache.activemq.artemis.api.core.client.ServerLocator; |
| import org.apache.activemq.artemis.core.config.Configuration; |
| import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; |
| import org.apache.activemq.artemis.core.io.IOCallback; |
| import org.apache.activemq.artemis.core.io.SequentialFile; |
| import org.apache.activemq.artemis.core.journal.Journal; |
| import org.apache.activemq.artemis.core.journal.JournalLoadInformation; |
| import org.apache.activemq.artemis.core.journal.RecordInfo; |
| import org.apache.activemq.artemis.core.paging.PageTransactionInfo; |
| import org.apache.activemq.artemis.core.paging.PagedMessage; |
| import org.apache.activemq.artemis.core.paging.PagingManager; |
| import org.apache.activemq.artemis.core.paging.PagingStore; |
| import org.apache.activemq.artemis.core.paging.cursor.PagePosition; |
| import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; |
| import org.apache.activemq.artemis.core.persistence.GroupingInfo; |
| import org.apache.activemq.artemis.core.persistence.OperationContext; |
| import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; |
| import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; |
| import org.apache.activemq.artemis.core.persistence.StorageManager; |
| import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedConnector; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedRole; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting; |
| import org.apache.activemq.artemis.core.persistence.config.PersistedUser; |
| import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; |
| import org.apache.activemq.artemis.core.postoffice.Binding; |
| import org.apache.activemq.artemis.core.postoffice.PostOffice; |
| import org.apache.activemq.artemis.core.replication.ReplicationManager; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.LargeServerMessage; |
| import org.apache.activemq.artemis.core.server.MessageReference; |
| import org.apache.activemq.artemis.core.server.RouteContextList; |
| import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; |
| import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; |
| import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; |
| import org.apache.activemq.artemis.core.server.impl.AddressInfo; |
| import org.apache.activemq.artemis.core.server.impl.JournalLoader; |
| import org.apache.activemq.artemis.core.transaction.ResourceManager; |
| import org.apache.activemq.artemis.core.transaction.Transaction; |
| import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
| import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; |
| import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; |
| import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; |
| import org.apache.activemq.artemis.tests.util.SpawnedTestBase; |
| import org.apache.activemq.artemis.utils.ArtemisCloseable; |
| import org.apache.activemq.artemis.utils.SpawnedVMSupport; |
| import org.apache.activemq.artemis.tests.util.Wait; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class SendAckFailTest extends SpawnedTestBase { |
| |
| private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| @Before |
| @After |
| public void deleteDirectory() throws Exception { |
| deleteDirectory(new File("./target/send-ack")); |
| } |
| |
| @Override |
| public String getJournalDir(final int index, final boolean backup) { |
| return "./target/send-ack/journal"; |
| } |
| |
| @Override |
| protected String getBindingsDir(final int index, final boolean backup) { |
| return "./target/send-ack/binding"; |
| } |
| |
| @Override |
| protected String getPageDir(final int index, final boolean backup) { |
| return "./target/send-ack/page"; |
| } |
| |
| @Override |
| protected String getLargeMessagesDir(final int index, final boolean backup) { |
| return "./target/send-ack/large-message"; |
| } |
| |
| @Test |
| public void testSend() throws Exception { |
| Process process = SpawnedVMSupport.spawnVM(SendAckFailTest.class.getName()); |
| ActiveMQServer server = null; |
| |
| try { |
| |
| HashSet<Integer> listSent = new HashSet<>(); |
| |
| Thread t = null; |
| { |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); |
| ServerLocator locator = factory.getServerLocator(); |
| |
| locator.setConfirmationWindowSize(0).setInitialConnectAttempts(10000).setRetryInterval(10).setBlockOnDurableSend(false).setReconnectAttempts(0); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(); |
| session.createAddress(SimpleString.toSimpleString("T1"), RoutingType.ANYCAST, true); |
| session.createQueue(new QueueConfiguration("T1").setRoutingType(RoutingType.ANYCAST)); |
| |
| ClientProducer producer = session.createProducer("T1"); |
| |
| session.setSendAcknowledgementHandler(new SendAcknowledgementHandler() { |
| @Override |
| public void sendAcknowledged(Message message) { |
| listSent.add(message.getIntProperty("myid")); |
| } |
| }); |
| |
| t = new Thread() { |
| @Override |
| public void run() { |
| for (int i = 0; i < 5000; i++) { |
| try { |
| producer.send(session.createMessage(true).putIntProperty("myid", i)); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| break; |
| } |
| } |
| } |
| }; |
| t.start(); |
| } |
| |
| Wait.waitFor(() -> listSent.size() > 100, 5000, 10); |
| |
| Assert.assertTrue(process.waitFor(1, TimeUnit.MINUTES)); |
| |
| server = startServer(false); |
| |
| { |
| ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); |
| ServerLocator locator = factory.getServerLocator(); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(); |
| |
| ClientConsumer consumer = session.createConsumer("T1"); |
| |
| session.start(); |
| |
| for (int i = 0; i < listSent.size(); i++) { |
| ClientMessage message = consumer.receive(1000); |
| if (message == null) { |
| for (Integer msgi : listSent) { |
| logger.debug("Message {} was lost", msgi); |
| } |
| fail("missed messages!"); |
| } |
| message.acknowledge(); |
| |
| if (!listSent.remove(message.getIntProperty("myid"))) { |
| logger.debug("Message {} with id {} received in duplicate", message, message.getIntProperty("myid")); |
| fail("Message " + message + " with id " + message.getIntProperty("myid") + " received in duplicate"); |
| } |
| } |
| } |
| |
| } finally { |
| if (process != null) { |
| process.destroy(); |
| } |
| if (server != null) { |
| server.stop(); |
| } |
| } |
| } |
| |
| public static void main(String[] arg) { |
| SendAckFailTest test = new SendAckFailTest(); |
| test.startServer(true); |
| } |
| |
| public ActiveMQServer startServer(boolean fail) { |
| try { |
| AtomicInteger count = new AtomicInteger(0); |
| |
| ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration()); |
| |
| Configuration configuration = createDefaultConfig(true); |
| |
| |
| if (fail) { |
| new Thread() { |
| @Override |
| public void run() { |
| try { |
| |
| // this is a protection, if the process is left forgoten for any amount of time, |
| // this will kill it |
| // This is to avoid rogue processes on the CI |
| Thread.sleep(10000); |
| System.err.println("Halting process, protecting the CI from rogue processes"); |
| Runtime.getRuntime().halt(-1); |
| } catch (Throwable e) { |
| } |
| } |
| }.start(); |
| } |
| |
| ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) { |
| @Override |
| public StorageManager createStorageManager() { |
| StorageManager original = super.createStorageManager(); |
| |
| return new StorageManagerDelegate(original) { |
| @Override |
| public void storeMessage(Message message) throws Exception { |
| |
| if (fail) { |
| if (count.incrementAndGet() == 110) { |
| Thread.sleep(100); |
| Runtime.getRuntime().halt(-1); |
| } |
| } |
| super.storeMessage(message); |
| |
| } |
| }; |
| |
| } |
| }; |
| |
| |
| |
| logger.debug("Location::{}", server.getConfiguration().getJournalLocation().getAbsolutePath()); |
| addServer(server); |
| server.start(); |
| return server; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| return null; |
| } |
| } |
| |
| |
| private class StorageManagerDelegate implements StorageManager { |
| |
| @Override |
| public void start() throws Exception { |
| manager.start(); |
| } |
| |
| @Override |
| public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception { |
| return manager.onLargeMessageCreate(id, largeMessage); |
| } |
| |
| @Override |
| public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { |
| return null; |
| } |
| |
| @Override |
| public void stop() throws Exception { |
| manager.stop(); |
| } |
| |
| @Override |
| public void asyncCommit(long txID) throws Exception { |
| |
| } |
| |
| @Override |
| public void updateQueueBinding(long tx, Binding binding) throws Exception { |
| manager.updateQueueBinding(tx, binding); |
| } |
| |
| @Override |
| public void storeAddressSetting(PersistedAddressSettingJSON addressSetting) throws Exception { |
| } |
| |
| @Override |
| public boolean isStarted() { |
| return manager.isStarted(); |
| } |
| |
| @Override |
| public long generateID() { |
| return manager.generateID(); |
| } |
| |
| @Override |
| public long getCurrentID() { |
| return manager.getCurrentID(); |
| } |
| |
| @Override |
| public void criticalError(Throwable error) { |
| manager.criticalError(error); |
| } |
| |
| @Override |
| public OperationContext getContext() { |
| return manager.getContext(); |
| } |
| |
| @Override |
| public void lineUpContext() { |
| manager.lineUpContext(); |
| } |
| |
| @Override |
| public OperationContext newContext(Executor executor) { |
| return manager.newContext(executor); |
| } |
| |
| @Override |
| public OperationContext newSingleThreadContext() { |
| return manager.newSingleThreadContext(); |
| } |
| |
| @Override |
| public void setContext(OperationContext context) { |
| manager.setContext(context); |
| } |
| |
| @Override |
| public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception { |
| manager.stop(ioCriticalError, sendFailover); |
| } |
| |
| @Override |
| public void pageClosed(SimpleString storeName, long pageNumber) { |
| manager.pageClosed(storeName, pageNumber); |
| } |
| |
| @Override |
| public void pageDeleted(SimpleString storeName, long pageNumber) { |
| manager.pageDeleted(storeName, pageNumber); |
| } |
| |
| @Override |
| public void pageWrite(PagedMessage message, long pageNumber) { |
| manager.pageWrite(message, pageNumber); |
| } |
| |
| @Override |
| public void afterCompleteOperations(IOCallback run) { |
| manager.afterCompleteOperations(run); |
| } |
| |
| @Override |
| public void afterStoreOperations(IOCallback run) { |
| manager.afterStoreOperations(run); |
| } |
| |
| @Override |
| public boolean waitOnOperations(long timeout) throws Exception { |
| return manager.waitOnOperations(timeout); |
| } |
| |
| @Override |
| public void waitOnOperations() throws Exception { |
| manager.waitOnOperations(); |
| } |
| |
| @Override |
| public ByteBuffer allocateDirectBuffer(int size) { |
| return manager.allocateDirectBuffer(size); |
| } |
| |
| @Override |
| public void freeDirectBuffer(ByteBuffer buffer) { |
| manager.freeDirectBuffer(buffer); |
| } |
| |
| @Override |
| public void clearContext() { |
| manager.clearContext(); |
| } |
| |
| @Override |
| public void confirmPendingLargeMessageTX(Transaction transaction, |
| long messageID, |
| long recordID) throws Exception { |
| manager.confirmPendingLargeMessageTX(transaction, messageID, recordID); |
| } |
| |
| @Override |
| public void confirmPendingLargeMessage(long recordID) throws Exception { |
| manager.confirmPendingLargeMessage(recordID); |
| } |
| |
| @Override |
| public void storeMessage(Message message) throws Exception { |
| manager.storeMessage(message); |
| } |
| |
| @Override |
| public void storeReference(long queueID, long messageID, boolean last) throws Exception { |
| manager.storeReference(queueID, messageID, last); |
| } |
| |
| @Override |
| public void deleteMessage(long messageID) throws Exception { |
| manager.deleteMessage(messageID); |
| } |
| |
| @Override |
| public void storeAcknowledge(long queueID, long messageID) throws Exception { |
| manager.storeAcknowledge(queueID, messageID); |
| } |
| |
| @Override |
| public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { |
| manager.storeCursorAcknowledge(queueID, position); |
| } |
| |
| @Override |
| public void updateDeliveryCount(MessageReference ref) throws Exception { |
| manager.updateDeliveryCount(ref); |
| } |
| |
| @Override |
| public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { |
| manager.updateScheduledDeliveryTime(ref); |
| } |
| |
| @Override |
| public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception { |
| manager.storeDuplicateID(address, duplID, recordID); |
| } |
| |
| @Override |
| public void deleteDuplicateID(long recordID) throws Exception { |
| manager.deleteDuplicateID(recordID); |
| } |
| |
| @Override |
| public void storeMessageTransactional(long txID, Message message) throws Exception { |
| manager.storeMessageTransactional(txID, message); |
| } |
| |
| @Override |
| public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception { |
| manager.storeReferenceTransactional(txID, queueID, messageID); |
| } |
| |
| @Override |
| public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception { |
| manager.storeAcknowledgeTransactional(txID, queueID, messageID); |
| } |
| |
| @Override |
| public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { |
| manager.storeCursorAcknowledgeTransactional(txID, queueID, position); |
| } |
| |
| @Override |
| public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { |
| manager.deleteCursorAcknowledgeTransactional(txID, ackID); |
| } |
| |
| @Override |
| public void deleteCursorAcknowledge(long ackID) throws Exception { |
| manager.deleteCursorAcknowledge(ackID); |
| } |
| |
| @Override |
| public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { |
| manager.storePageCompleteTransactional(txID, queueID, position); |
| } |
| |
| @Override |
| public void deletePageComplete(long ackID) throws Exception { |
| manager.deletePageComplete(ackID); |
| } |
| |
| @Override |
| public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception { |
| manager.updateScheduledDeliveryTimeTransactional(txID, ref); |
| } |
| |
| @Override |
| public void storeDuplicateIDTransactional(long txID, |
| SimpleString address, |
| byte[] duplID, |
| long recordID) throws Exception { |
| manager.storeDuplicateIDTransactional(txID, address, duplID, recordID); |
| } |
| |
| @Override |
| public void updateDuplicateIDTransactional(long txID, |
| SimpleString address, |
| byte[] duplID, |
| long recordID) throws Exception { |
| manager.updateDuplicateIDTransactional(txID, address, duplID, recordID); |
| } |
| |
| @Override |
| public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception { |
| manager.deleteDuplicateIDTransactional(txID, recordID); |
| } |
| |
| @Override |
| public LargeServerMessage createCoreLargeMessage() { |
| return manager.createCoreLargeMessage(); |
| } |
| |
| @Override |
| public LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception { |
| return manager.createCoreLargeMessage(id, message); |
| } |
| |
| @Override |
| public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { |
| return manager.createFileForLargeMessage(messageID, extension); |
| } |
| |
| @Override |
| public void prepare(long txID, Xid xid) throws Exception { |
| manager.prepare(txID, xid); |
| } |
| |
| @Override |
| public void commit(long txID) throws Exception { |
| manager.commit(txID); |
| } |
| |
| @Override |
| public void commit(long txID, boolean lineUpContext) throws Exception { |
| manager.commit(txID, lineUpContext); |
| } |
| |
| @Override |
| public void rollback(long txID) throws Exception { |
| manager.rollback(txID); |
| } |
| |
| @Override |
| public void rollbackBindings(long txID) throws Exception { |
| manager.rollbackBindings(txID); |
| } |
| |
| @Override |
| public void commitBindings(long txID) throws Exception { |
| manager.commitBindings(txID); |
| } |
| |
| @Override |
| public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception { |
| manager.storePageTransaction(txID, pageTransaction); |
| } |
| |
| @Override |
| public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception { |
| manager.updatePageTransaction(txID, pageTransaction, depage); |
| } |
| |
| @Override |
| public void deletePageTransactional(long recordID) throws Exception { |
| manager.deletePageTransactional(recordID); |
| } |
| |
| @Override |
| public JournalLoadInformation loadMessageJournal(PostOffice postOffice, |
| PagingManager pagingManager, |
| ResourceManager resourceManager, |
| Map<Long, QueueBindingInfo> queueInfos, |
| Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap, |
| Set<Pair<Long, Long>> pendingLargeMessages, |
| Set<Long> largeMessagesInFolder, |
| List<PageCountPending> pendingNonTXPageCounter, |
| JournalLoader journalLoader, |
| List<Consumer<RecordInfo>> extraLoaders) throws Exception { |
| return manager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, journalLoader, extraLoaders); |
| } |
| |
| @Override |
| public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception { |
| return manager.storeHeuristicCompletion(xid, isCommit); |
| } |
| |
| @Override |
| public void deleteHeuristicCompletion(long id) throws Exception { |
| manager.deleteHeuristicCompletion(id); |
| } |
| |
| @Override |
| public void addQueueBinding(long tx, Binding binding) throws Exception { |
| manager.addQueueBinding(tx, binding); |
| } |
| |
| @Override |
| public void deleteQueueBinding(long tx, long queueBindingID) throws Exception { |
| manager.deleteQueueBinding(tx, queueBindingID); |
| } |
| |
| @Override |
| public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { |
| return manager.storeQueueStatus(queueID, status); |
| } |
| |
| @Override |
| public void deleteQueueStatus(long recordID) throws Exception { |
| manager.deleteQueueStatus(recordID); |
| } |
| |
| @Override |
| public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { |
| return manager.storeAddressStatus(addressID, status); |
| } |
| |
| |
| @Override |
| public void deleteAddressStatus(long recordID) throws Exception { |
| manager.deleteAddressStatus(recordID); |
| } |
| |
| @Override |
| public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception { |
| manager.addAddressBinding(tx, addressInfo); |
| } |
| |
| @Override |
| public void deleteAddressBinding(long tx, long addressBindingID) throws Exception { |
| manager.deleteAddressBinding(tx, addressBindingID); |
| } |
| |
| @Override |
| public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, |
| List<GroupingInfo> groupingInfos, |
| List<AddressBindingInfo> addressBindingInfos) throws Exception { |
| return manager.loadBindingJournal(queueBindingInfos, groupingInfos, addressBindingInfos); |
| } |
| |
| @Override |
| public void addGrouping(GroupBinding groupBinding) throws Exception { |
| manager.addGrouping(groupBinding); |
| } |
| |
| @Override |
| public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception { |
| manager.deleteGrouping(tx, groupBinding); |
| } |
| |
| @Override |
| public void deleteAddressSetting(SimpleString addressMatch) throws Exception { |
| manager.deleteAddressSetting(addressMatch); |
| } |
| |
| @Override |
| public List<AbstractPersistedAddressSetting> recoverAddressSettings() throws Exception { |
| return manager.recoverAddressSettings(); |
| } |
| |
| @Override |
| public void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception { |
| manager.storeSecuritySetting(persistedRoles); |
| } |
| |
| @Override |
| public void deleteSecuritySetting(SimpleString addressMatch) throws Exception { |
| manager.deleteSecuritySetting(addressMatch); |
| } |
| |
| @Override |
| public List<PersistedSecuritySetting> recoverSecuritySettings() throws Exception { |
| return manager.recoverSecuritySettings(); |
| } |
| |
| @Override |
| public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception { |
| |
| } |
| |
| @Override |
| public void deleteDivertConfiguration(String divertName) throws Exception { |
| |
| } |
| |
| @Override |
| public List<PersistedDivertConfiguration> recoverDivertConfigurations() { |
| return null; |
| } |
| |
| @Override |
| public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { |
| } |
| |
| @Override |
| public void deleteBridgeConfiguration(String bridgeName) throws Exception { |
| } |
| |
| @Override |
| public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() { |
| return null; |
| } |
| |
| @Override |
| public void storeConnector(PersistedConnector persistedConnector) throws Exception { |
| } |
| |
| @Override |
| public void deleteConnector(String connectorName) throws Exception { |
| } |
| |
| @Override |
| public List<PersistedConnector> recoverConnectors() { |
| return null; |
| } |
| |
| @Override |
| public void storeUser(PersistedUser persistedUser) throws Exception { |
| manager.storeUser(persistedUser); |
| } |
| |
| @Override |
| public void deleteUser(String username) throws Exception { |
| manager.deleteUser(username); |
| } |
| |
| @Override |
| public Map<String, PersistedUser> getPersistedUsers() { |
| return manager.getPersistedUsers(); |
| } |
| |
| @Override |
| public void storeRole(PersistedRole persistedRole) throws Exception { |
| manager.storeRole(persistedRole); |
| } |
| |
| @Override |
| public void deleteRole(String role) throws Exception { |
| manager.deleteRole(role); |
| } |
| |
| @Override |
| public Map<String, PersistedRole> getPersistedRoles() { |
| return manager.getPersistedRoles(); |
| } |
| |
| @Override |
| public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception { |
| manager.storeKeyValuePair(persistedKeyValuePair); |
| } |
| |
| @Override |
| public void deleteKeyValuePair(String mapId, String key) throws Exception { |
| manager.deleteKeyValuePair(mapId, key); |
| } |
| |
| @Override |
| public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) { |
| return manager.getPersistedKeyValuePairs(mapId); |
| } |
| |
| @Override |
| public long storePageCounter(long txID, long queueID, long value, long size) throws Exception { |
| return manager.storePageCounter(txID, queueID, value, size); |
| } |
| |
| @Override |
| public long storePendingCounter(long queueID, long pageID) throws Exception { |
| return manager.storePendingCounter(queueID, pageID); |
| } |
| |
| @Override |
| public void deleteIncrementRecord(long txID, long recordID) throws Exception { |
| manager.deleteIncrementRecord(txID, recordID); |
| } |
| |
| @Override |
| public void deletePageCounter(long txID, long recordID) throws Exception { |
| manager.deletePageCounter(txID, recordID); |
| } |
| |
| @Override |
| public void deletePendingPageCounter(long txID, long recordID) throws Exception { |
| manager.deletePendingPageCounter(txID, recordID); |
| } |
| |
| @Override |
| public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception { |
| return manager.storePageCounterInc(txID, queueID, add, size); |
| } |
| |
| @Override |
| public long storePageCounterInc(long queueID, int add, long size) throws Exception { |
| return manager.storePageCounterInc(queueID, add, size); |
| } |
| |
| @Override |
| public Journal getBindingsJournal() { |
| return manager.getBindingsJournal(); |
| } |
| |
| @Override |
| public Journal getMessageJournal() { |
| return manager.getMessageJournal(); |
| } |
| |
| @Override |
| public void startReplication(ReplicationManager replicationManager, |
| PagingManager pagingManager, |
| String nodeID, |
| boolean autoFailBack, |
| long initialReplicationSyncTimeout) throws Exception { |
| manager.startReplication(replicationManager, pagingManager, nodeID, autoFailBack, initialReplicationSyncTimeout); |
| } |
| |
| @Override |
| public boolean addToPage(PagingStore store, |
| Message msg, |
| Transaction tx, |
| RouteContextList listCtx) throws Exception { |
| return manager.addToPage(store, msg, tx, listCtx); |
| } |
| |
| @Override |
| public void stopReplication() { |
| manager.stopReplication(); |
| } |
| |
| @Override |
| public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception { |
| manager.addBytesToLargeMessage(appendFile, messageID, bytes); |
| } |
| |
| @Override |
| public void storeID(long journalID, long id) throws Exception { |
| manager.storeID(journalID, id); |
| } |
| |
| @Override |
| public void deleteID(long journalD) throws Exception { |
| manager.deleteID(journalD); |
| } |
| |
| @Override |
| public ArtemisCloseable closeableReadLock() { |
| return manager.closeableReadLock(); |
| } |
| |
| @Override |
| public void persistIdGenerator() { |
| manager.persistIdGenerator(); |
| } |
| |
| @Override |
| public void injectMonitor(FileStoreMonitor monitor) throws Exception { |
| manager.injectMonitor(monitor); |
| } |
| |
| @Override |
| public void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException { |
| manager.deleteLargeMessageBody(largeServerMessage); |
| } |
| |
| @Override |
| public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { |
| manager.largeMessageClosed(largeServerMessage); |
| } |
| |
| @Override |
| public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception { |
| manager.addBytesToLargeMessage(file, messageId, bytes); |
| } |
| |
| private final StorageManager manager; |
| |
| StorageManagerDelegate(StorageManager manager) { |
| this.manager = manager; |
| } |
| } |
| |
| } |