blob: 81ca168588e42750af872494f4dda9fdafe10c25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.IDGenerator;
/**
* A StorageManager
*
* Note about IDGEnerator
*
* I've changed StorageManager to extend IDGenerator, because in some places
* all we needed from the StorageManager was the idGeneration.
* I couldn't just get the IDGenerator from the inner part because the NullPersistent has its own sequence.
* So the best was to add the interface and adjust the callers for the method
*/
public interface StorageManager extends IDGenerator, ActiveMQComponent {
default long getMaxRecordSize() {
/** Null journal is pretty much memory */
return Long.MAX_VALUE;
}
default long getWarningRecordSize() {
/** Null journal is pretty much memory */
return Long.MAX_VALUE;
}
default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception {
}
default SequentialFileFactory getJournalSequentialFileFactory() {
return null;
}
void criticalError(Throwable error);
/**
* Get the context associated with the thread for later reuse
*/
OperationContext getContext();
void lineUpContext();
/**
* It just creates an OperationContext without associating it
*/
OperationContext newContext(Executor executor);
OperationContext newSingleThreadContext();
/**
* Set the context back to the thread
*/
void setContext(OperationContext context);
/**
*
* @param ioCriticalError is the server being stopped due to an IO critical error.
* @param sendFailover this is to send the replication stopping in case of replication.
* @throws Exception
*/
void stop(boolean ioCriticalError, boolean sendFailover) throws Exception;
// Message related operations
void pageClosed(SimpleString storeName, long pageNumber);
void pageDeleted(SimpleString storeName, long pageNumber);
void pageWrite(PagedMessage message, long pageNumber);
void afterCompleteOperations(IOCallback run);
/**
* This is similar to afterComplete, however this only cares about the journal part.
*/
void afterStoreOperations(IOCallback run);
/**
* Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up
* in case of the pools are full
*
* @throws Exception
*/
boolean waitOnOperations(long timeout) throws Exception;
/**
* Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up
* in case of the pools are full
*
* @throws Exception
*/
void waitOnOperations() throws Exception;
/**
* AIO has an optimized buffer which has a method to release it
* instead of the way NIO will release data based on GC.
* These methods will use that buffer if the inner method supports it
*/
ByteBuffer allocateDirectBuffer(int size);
/**
* AIO has an optimized buffer which has a method to release it
* instead of the way NIO will release data based on GC.
* These methods will use that buffer if the inner method supports it
*/
void freeDirectBuffer(ByteBuffer buffer);
void clearContext();
/**
* Confirms that a large message was finished
*/
void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception;
/**
* Confirms that a large message was finished
*/
void confirmPendingLargeMessage(long recordID) throws Exception;
void storeMessage(Message message) throws Exception;
void storeReference(long queueID, long messageID, boolean last) throws Exception;
void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
void deleteDuplicateID(long recordID) throws Exception;
void storeMessageTransactional(long txID, Message message) throws Exception;
void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception;
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception;
void deleteCursorAcknowledge(long ackID) throws Exception;
void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception;
void deletePageComplete(long ackID) throws Exception;
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception;
LargeServerMessage createCoreLargeMessage();
/**
* Creates a new LargeServerMessage for the core Protocol with the given id.
*
* @param id
* @param message This is a temporary message that holds the parsed properties. The remoting
* layer can't create a ServerMessage directly, then this will be replaced.
* @return a large message object
* @throws Exception
*/
LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception;
/** Other protocols may inform the storage manager when a large message was created. */
LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception;
enum LargeMessageExtension {
DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
final String extension;
LargeMessageExtension(String extension) {
this.extension = extension;
}
public String getExtension() {
return extension;
}
}
/**
* Instantiates a SequentialFile to be used for storing a {@link LargeServerMessage}.
*
* @param messageID the id of the message
* @param extension the extension to add to the file
* @return
*/
SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException;
void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException;
default SequentialFile createFileForLargeMessage(long messageID, boolean durable) {
if (durable) {
return createFileForLargeMessage(messageID, LargeMessageExtension.DURABLE);
} else {
return createFileForLargeMessage(messageID, LargeMessageExtension.TEMPORARY);
}
}
void prepare(long txID, Xid xid) throws Exception;
void commit(long txID) throws Exception;
void commit(long txID, boolean lineUpContext) throws Exception;
void asyncCommit(long txID) throws Exception;
void rollback(long txID) throws Exception;
void rollbackBindings(long txID) throws Exception;
void commitBindings(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
void deletePageTransactional(long recordID) throws Exception;
default JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, null, pendingNonTXPageCounter, journalLoader);
}
default JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, journalLoader, null);
}
JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader,
List<Consumer<RecordInfo>> extraRecordsLoader) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
void deleteHeuristicCompletion(long id) throws Exception;
// BindingsImpl related operations
void addQueueBinding(long tx, Binding binding) throws Exception;
void updateQueueBinding(long tx, Binding binding) throws Exception;
void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
/**
*
* @param queueID The id of the queue
* @param status The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)
* @return the id of the journal
* @throws Exception
*/
long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception;
void deleteQueueStatus(long recordID) throws Exception;
long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception;
void deleteAddressStatus(long recordID) throws Exception;
void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception;
void deleteAddressBinding(long tx, long addressBindingID) throws Exception;
JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos,
List<AddressBindingInfo> addressBindingInfos) throws Exception;
// grouping related operations
void addGrouping(GroupBinding groupBinding) throws Exception;
void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception;
void storeAddressSetting(PersistedAddressSettingJSON addressSetting) throws Exception;
void deleteAddressSetting(SimpleString addressMatch) throws Exception;
List<AbstractPersistedAddressSetting> recoverAddressSettings() throws Exception;
AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address);
void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception;
void deleteSecuritySetting(SimpleString addressMatch) throws Exception;
List<PersistedSecuritySetting> recoverSecuritySettings() throws Exception;
void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception;
void deleteDivertConfiguration(String divertName) throws Exception;
List<PersistedDivertConfiguration> recoverDivertConfigurations();
void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception;
void deleteBridgeConfiguration(String bridgeName) throws Exception;
List<PersistedBridgeConfiguration> recoverBridgeConfigurations();
void storeConnector(PersistedConnector persistedConnector) throws Exception;
void deleteConnector(String connectorName) throws Exception;
List<PersistedConnector> recoverConnectors();
void storeUser(PersistedUser persistedUser) throws Exception;
void deleteUser(String username) throws Exception;
Map<String, PersistedUser> getPersistedUsers();
void storeRole(PersistedRole persistedRole) throws Exception;
void deleteRole(String role) throws Exception;
Map<String, PersistedRole> getPersistedRoles();
void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception;
void deleteKeyValuePair(String mapId, String key) throws Exception;
Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId);
/**
* @return The ID with the stored counter
*/
long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception;
long storePendingCounter(long queueID, long pageID) throws Exception;
void deleteIncrementRecord(long txID, long recordID) throws Exception;
void deletePageCounter(long txID, long recordID) throws Exception;
void deletePendingPageCounter(long txID, long recordID) throws Exception;
/**
* @return the ID with the increment record
* @throws Exception
*/
long storePageCounterInc(long txID, long queueID, int add, long persistentSize) throws Exception;
/**
* @return the ID with the increment record
* @throws Exception
*/
long storePageCounterInc(long queueID, int add, long size) throws Exception;
/**
* @return the bindings journal
*/
Journal getBindingsJournal();
/**
* @return the message journal
*/
Journal getMessageJournal();
/**
* @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean, long)
*/
void startReplication(ReplicationManager replicationManager,
PagingManager pagingManager,
String nodeID,
boolean autoFailBack,
long initialReplicationSyncTimeout) throws Exception;
/**
* Write message to page if we are paging.
*
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal
* @throws Exception
*/
boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception;
/**
* Stops the replication of data from the primary to the backup.
* <p>
* Typical scenario is a broken connection.
*/
void stopReplication();
/**
* @param appendFile
* @param messageID
* @param bytes
*/
void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception;
void addBytesToLargeMessage(SequentialFile file,
long messageId,
ActiveMQBuffer bytes) throws Exception;
/**
* Stores the id from IDManager.
*
* @param journalID
* @param id
* @throws Exception
*/
void storeID(long journalID, long id) throws Exception;
/*
Deletes the ID from IDManager.
*/
void deleteID(long journalD) throws Exception;
/**
* Read lock the StorageManager. USE WITH CARE!
* <p>
* The main lock is used to write lock the whole manager when starting replication. Sub-systems,
* say Paging classes, that use locks of their own AND also write through the StorageManager MUST
* first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock
* when starting replication sync.
*
*/
ArtemisCloseable closeableReadLock();
/**
* Closes the {@link IDGenerator} persisting the current record ID.
* <p>
* Effectively a "pre-stop" method. Necessary due to the "stop"-order at
* {@link org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl}
*/
void persistIdGenerator();
void injectMonitor(FileStoreMonitor monitor) throws Exception;
default int getAllowedPageSize(int pageSize) {
return pageSize;
}
}