blob: 261a04b5ead933dfe1a6c61e74a048c4f6bd53d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<>();
final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
Semaphore globalQueueSemaphore;
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
// when true, message order may be compromised when cache is exhausted if store is out
// or order w.r.t cache
private boolean concurrentStoreAndDispatchTopics = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
private TransactionIdTransformer transactionIdTransformer;
public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this);
this.transactionIdTransformer = txid -> txid;
}
@Override
public String toString() {
return "KahaDB:[" + directory.getAbsolutePath() + "]";
}
@Override
public void setBrokerName(String brokerName) {
}
@Override
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
public SystemUsage getUsageManager() {
return this.usageManager;
}
/**
* @return the concurrentStoreAndDispatch
*/
public boolean isConcurrentStoreAndDispatchQueues() {
return this.concurrentStoreAndDispatchQueues;
}
/**
* @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
}
/**
* @return the concurrentStoreAndDispatch
*/
public boolean isConcurrentStoreAndDispatchTopics() {
return this.concurrentStoreAndDispatchTopics;
}
/**
* @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
/**
* @return the maxAsyncJobs
*/
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
/**
* @param maxAsyncJobs
* the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs;
}
@Override
protected void configureMetadata() {
if (brokerService != null) {
metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
wireFormat.setVersion(metadata.openwireVersion);
if (LOG.isDebugEnabled()) {
LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
}
}
}
@Override
public void doStart() throws Exception {
//configure the metadata before start, right now
//this is just the open wire version
configureMetadata();
super.doStart();
if (brokerService != null) {
// In case the recovered store used a different OpenWire version log a warning
// to assist in determining why journal reads fail.
if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
LOG.warn("Existing Store uses a different OpenWire version[{}] " +
"than the version configured[{}] reverting to the version " +
"used by this store, some newer broker features may not work" +
"as expected.",
metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
// Update the broker service instance to the actual version in use.
wireFormat.setVersion(metadata.openwireVersion);
brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
}
}
this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
thread.setDaemon(true);
return thread;
}
});
this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
thread.setDaemon(true);
return thread;
}
});
}
@Override
public void doStop(ServiceStopper stopper) throws Exception {
// drain down async jobs
LOG.info("Stopping async queue tasks");
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncQueueMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncQueueMaps.clear();
}
LOG.info("Stopping async topic tasks");
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncTopicMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncTopicMaps.clear();
}
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.drainPermits();
}
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
ThreadPoolUtils.shutdownNow(queueExecutor);
queueExecutor = null;
}
if (this.topicExecutor != null) {
ThreadPoolUtils.shutdownNow(topicExecutor);
topicExecutor = null;
}
LOG.info("Stopped KahaDB");
super.doStop(stopper);
}
private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
@Override
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(destination, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
if (sequence == null) {
return null;
}
return sd.orderIndex.get(tx, sequence).location;
}
});
}
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
StoreQueueTask task = null;
synchronized (store.asyncTaskMap) {
task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
// with asyncTaskMap locked
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
this.queueExecutor.execute(task);
}
protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
StoreTopicTask task = null;
synchronized (store.asyncTaskMap) {
task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
synchronized (store.asyncTaskMap) {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.topicExecutor.execute(task);
}
@Override
public TransactionStore createTransactionStore() throws IOException {
return this.transactionStore;
}
public boolean getForceRecoverIndex() {
return this.forceRecoverIndex;
}
public void setForceRecoverIndex(boolean forceRecoverIndex) {
this.forceRecoverIndex = forceRecoverIndex;
}
public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException {
if (preparedAcks != null) {
Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
for (MessageAck ack : preparedAcks) {
stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
}
ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
for (MessageAck ack : preparedAcks) {
if (entry.getKey().equals(ack.getDestination())) {
perStoreAcks.add(ack);
}
}
entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback);
perStoreAcks.clear();
}
}
}
public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException {
Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
for (MessageAck ack : preparedAcks) {
stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
}
ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
for (MessageAck ack : preparedAcks) {
if (entry.getKey().equals(ack.getDestination())) {
perStoreAcks.add(ack);
}
}
entry.getValue().trackRecoveredAcks(perStoreAcks);
perStoreAcks.clear();
}
}
private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
ProxyMessageStore store = (ProxyMessageStore) storeCache.get(key(convert(activeMQDestination)));
if (store == null) {
if (activeMQDestination.isQueue()) {
store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
} else {
store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination);
}
}
return (KahaDBMessageStore) store.getDelegate();
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
protected final HashMap<String, Set<String>> ackedAndPreparedMap = new HashMap<String, Set<String>>();
protected final HashMap<String, Set<String>> rolledBackAcksMap = new HashMap<String, Set<String>>();
double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
}
@Override
public ActiveMQDestination getDestination() {
return destination;
}
private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) {
return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId();
}
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
final String key = recoveredTxStateMapKey(destination, ack);
Set ackedAndPrepared = ackedAndPreparedMap.get(key);
if (ackedAndPrepared == null) {
ackedAndPrepared = new LinkedHashSet<String>();
ackedAndPreparedMap.put(key, ackedAndPrepared);
}
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
indexLock.writeLock().unlock();
}
}
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
if (acks != null) {
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
final String id = ack.getLastMessageId().toProducerKey();
final String key = recoveredTxStateMapKey(destination, ack);
Set ackedAndPrepared = ackedAndPreparedMap.get(key);
if (ackedAndPrepared != null) {
ackedAndPrepared.remove(id);
if (ackedAndPreparedMap.isEmpty()) {
ackedAndPreparedMap.remove(key);
}
}
if (rollback) {
Set rolledBackAcks = rolledBackAcksMap.get(key);
if (rolledBackAcks == null) {
rolledBackAcks = new LinkedHashSet<String>();
rolledBackAcksMap.put(key, rolledBackAcks);
}
rolledBackAcks.add(id);
pageFile.tx().execute(tx -> {
incrementAndAddSizeToStoreStat(tx, dest, 0);
});
}
}
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
message.beforeMarshall(wireFormat);
StoreQueueTask result = new StoreQueueTask(this, context, message);
ListenableFuture<Object> future = result.getFuture();
message.getMessageId().setFutureOrSequenceLong(future);
message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
result.aquireLocks();
synchronized (asyncTaskMap) {
addQueueTask(this, result);
if (indexListener != null) {
indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
}
}
return future;
} else {
return super.asyncAddQueueMessage(context, message);
}
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
StoreQueueTask task = null;
synchronized (asyncTaskMap) {
task = (StoreQueueTask) asyncTaskMap.get(key);
}
if (task != null) {
if (ack.isInTransaction() || !task.cancel()) {
try {
task.future.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
} catch (Exception ignored) {
LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
}
removeMessage(context, ack);
} else {
indexLock.writeLock().lock();
try {
metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
} finally {
indexLock.writeLock().unlock();
}
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
} else {
removeMessage(context, ack);
}
} else {
removeMessage(context, ack);
}
}
@Override
public void addMessage(final ConnectionContext context, final Message message) throws IOException {
final KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
// sync add? (for async, future present from getFutureOrSequenceLong)
Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
@Override
public void sequenceAssignedWithIndexLocked(final long sequence) {
message.getMessageId().setFutureOrSequenceLong(sequence);
if (indexListener != null) {
if (possibleFuture == null) {
trackPendingAdd(dest, sequence);
indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
@Override
public void run() {
trackPendingAddComplete(dest, sequence);
}
}));
}
}
}
}, null);
}
@Override
public void updateMessage(Message message) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
}
KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
command.setPriority(message.getPriority());
command.setPrioritySupported(prioritizedMessages);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
updateMessageCommand.setMessage(command);
store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
}
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toProducerKey());
command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
store(command, true, null, null);
}
@Override
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
Location location;
indexLock.writeLock().lock();
try {
location = findMessageLocation(key, dest);
} finally {
indexLock.writeLock().unlock();
}
if (location == null) {
return null;
}
return loadMessage(location);
}
@Override
public boolean isEmpty() throws IOException {
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
@Override
public Boolean execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of
// messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
return sd.locationIndex.isEmpty(tx);
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void recover(final MessageRecoveryListener listener) throws Exception {
// recovery may involve expiry which will modify
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
.hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
sd.orderIndex.stoppedIterating();
}
});
} finally {
indexLock.writeLock().unlock();
}
}
protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
int counter = 0;
String id;
Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey);
if (rolledBackAcks == null) {
return counter;
}
for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
id = iterator.next();
iterator.remove();
Long sequence = sd.messageIdIndex.get(tx, id);
if (sequence != null) {
if (sd.orderIndex.alreadyDispatched(sequence)) {
listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
counter++;
if (counter >= maxReturned) {
break;
}
} else {
LOG.debug("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
}
} else {
LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
}
}
if (rolledBackAcks.isEmpty()) {
rolledBackAcksMap.remove(recoveredTxStateMapKey);
}
return counter;
}
@Override
public void resetBatching() {
if (pageFile.isLoaded()) {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getExistingStoredDestination(dest, tx);
if (sd != null) {
sd.orderIndex.resetCursorPosition();}
}
});
} catch (Exception e) {
LOG.error("Failed to reset batching",e);
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override
public void setBatch(final MessageId identity) throws IOException {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = (Long) identity.getFutureOrSequenceLong();
Long pending = sd.orderIndex.minPendingAdd();
if (pending != null) {
location = Math.min(location, pending-1);
}
sd.orderIndex.setBatch(tx, location);
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
@Override
public void start() throws Exception {
super.start();
}
@Override
public void stop() throws Exception {
super.stop();
}
protected void lockAsyncJobQueue() {
try {
if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
}
} catch (Exception e) {
LOG.error("Failed to lock async jobs for " + this.destination, e);
}
}
protected void unlockAsyncJobQueue() {
this.localDestinationSemaphore.release(this.maxAsyncJobs);
}
protected void acquireLocalAsyncLock() {
try {
this.localDestinationSemaphore.acquire();
} catch (InterruptedException e) {
LOG.error("Failed to aquire async lock for " + this.destination, e);
}
}
protected void releaseLocalAsyncLock() {
this.localDestinationSemaphore.release();
}
@Override
public String toString(){
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
}
@Override
protected void recoverMessageStoreStatistics() throws IOException {
try {
MessageStoreStatistics recoveredStatistics;
lockAsyncJobQueue();
indexLock.writeLock().lock();
try {
recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
@Override
public MessageStoreStatistics execute(Transaction tx) throws IOException {
MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx);
// Iterate through all index entries to get the size of each message
if (statistics == null) {
StoredDestination sd = getStoredDestination(dest, tx);
statistics = new MessageStoreStatistics();
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) {
int locationSize = iterator.next().getKey().getSize();
statistics.getMessageCount().increment();
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
}
}
return statistics;
}
});
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
if (ackedAndPrepared != null) {
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
}
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
} finally {
indexLock.writeLock().unlock();
}
} finally {
unlockAsyncJobQueue();
}
}
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger();
protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
if (isConcurrentStoreAndDispatchTopics()) {
asyncTopicMaps.add(asyncTaskMap);
}
}
@Override
protected void recoverMessageStoreStatistics() throws IOException {
super.recoverMessageStoreStatistics();
this.recoverMessageStoreSubMetrics();
}
@Override
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
message.beforeMarshall(wireFormat);
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
result.aquireLocks();
addTopicTask(this, result);
return result.getFuture();
} else {
return super.asyncAddTopicMessage(context, message);
}
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
synchronized (asyncTaskMap) {
task = (StoreTopicTask) asyncTaskMap.get(key);
}
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(this, messageId);
if (task.cancel()) {
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
}
} else {
doAcknowledge(context, subscriptionKey, messageId, ack);
}
} else {
doAcknowledge(context, subscriptionKey, messageId, ack);
}
}
protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toProducerKey());
command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
} else {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
}
store(command, false, null, null);
}
@Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
.getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey.toString());
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
.getValue().getSubscriptionInfo().newInput()));
subscriptions.add(info);
}
}
});
} finally {
indexLock.writeLock().unlock();
}
SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
subscriptions.toArray(rc);
return rc;
}
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
@Override
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
if (command == null) {
return null;
}
return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
.getSubscriptionInfo().newInput()));
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isEnableSubscriptionStatistics()) {
return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
} else {
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
@Override
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos == null) {
// The subscription might not exist.
return 0;
}
return (int) getStoredMessageCount(tx, sd, subscriptionKey);
}
});
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override
public long getMessageSize(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isEnableSubscriptionStatistics()) {
return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
} else {
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
@Override
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos == null) {
// The subscription might not exist.
return 0l;
}
return getStoredMessageSize(tx, sd, subscriptionKey);
}
});
} finally {
indexLock.writeLock().unlock();
}
}
}
protected void recoverMessageStoreSubMetrics() throws IOException {
if (isEnableSubscriptionStatistics()) {
final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
List<String> subscriptionKeys = new ArrayList<>();
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
.iterator(tx); iterator.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
final String subscriptionKey = entry.getKey();
final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos != null) {
//add the subscriptions to a list for recovering pending sizes below
subscriptionKeys.add(subscriptionKey);
//recover just the count here as that is fast
statistics.getMessageCount(subscriptionKey)
.setCount(getStoredMessageCount(tx, sd, subscriptionKey));
}
}
//Recover the message sizes for each subscription by iterating only 1 time over the order index
//to speed up recovery
final Map<String, AtomicLong> subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys);
subPendingMessageSizes.forEach((k,v) -> {
statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0);
});
}
});
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
@SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
//If we have ackPositions tracked then compare the first one as individual acknowledge mode
//may have bumped lastAck even though there are earlier messages to still consume
if (subAckPositions != null && !subAckPositions.isEmpty()
&& subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
//we have messages to ack before lastAckedSequence
sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
} else {
subAckPositions = null;
sd.orderIndex.setBatch(tx, cursorPos);
}
recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still
//and if it doesn't skip it
if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
continue;
}
listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
@SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
SequenceSet subAckPositions = null;
if (moc == null) {
LastAck pos = getLastAck(tx, sd, subscriptionKey);
if (pos == null) {
// sub deleted
return;
}
subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
//If we have ackPositions tracked then compare the first one as individual acknowledge mode
//may have bumped lastAck even though there are earlier messages to still consume
if (subAckPositions != null && !subAckPositions.isEmpty()
&& subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
//we have messages to ack before lastAckedSequence
sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
} else {
subAckPositions = null;
sd.orderIndex.setBatch(tx, pos);
}
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
}
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still
//and if it doesn't skip it
if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
continue;
}
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}
if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
sd.orderIndex.stoppedIterating();
if (entry != null) {
MessageOrderCursor copy = sd.orderIndex.cursor.copy();
sd.subscriptionCursors.put(subscriptionKey, copy);
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
}
});
}finally {
indexLock.writeLock().unlock();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return messageStoreSubStats;
}
}
String subscriptionKey(String clientId, String subscriptionName) {
return clientId + ":" + subscriptionName;
}
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
String key = key(convert(destination));
MessageStore store = storeCache.get(key(convert(destination)));
if (store == null) {
final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
store = storeCache.putIfAbsent(key, queueStore);
if (store == null) {
store = queueStore;
}
}
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
String key = key(convert(destination));
MessageStore store = storeCache.get(key(convert(destination)));
if (store == null) {
final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
store = storeCache.putIfAbsent(key, topicStore);
if (store == null) {
store = topicStore;
}
}
return (TopicMessageStore) store;
}
/**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
*
* @param destination
* Destination to forget
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
*
* @param destination
* Destination to forget
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
@Override
public void deleteAllMessages() throws IOException {
deleteAllMessages = true;
}
@Override
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
//Removing isEmpty topic check - see AMQ-5875
rc.add(convert(entry.getKey()));
}
}
});
}finally {
indexLock.writeLock().unlock();
}
return rc;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
@Override
public long getLastProducerSequenceId(ProducerId id) {
indexLock.writeLock().lock();
try {
return metadata.producerSequenceIdTracker.getLastSeqId(id);
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public long size() {
try {
return journalSize.get() + getPageFile().getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
@Override
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
@Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
@Override
public void checkpoint(boolean sync) throws IOException {
super.checkpointCleanup(sync);
}
// /////////////////////////////////////////////////////////////////
// Internal helper methods.
// /////////////////////////////////////////////////////////////////
/**
* @param location
* @return
* @throws IOException
*/
Message loadMessage(Location location) throws IOException {
try {
JournalCommand<?> command = load(location);
KahaAddMessageCommand addMessage = null;
switch (command.type()) {
case KAHA_UPDATE_MESSAGE_COMMAND:
addMessage = ((KahaUpdateMessageCommand) command).getMessage();
break;
case KAHA_ADD_MESSAGE_COMMAND:
addMessage = (KahaAddMessageCommand) command;
break;
default:
throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location);
}
if (!addMessage.hasMessage()) {
throw new IOException("Could not load journal record, null message content at location: " + location);
}
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg;
} catch (Throwable t) {
IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t);
LOG.error("Failed to load message at: {}", location , ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
}
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
rc.setType(DestinationType.QUEUE);
return rc;
case ActiveMQDestination.TOPIC_TYPE:
rc.setType(DestinationType.TOPIC);
return rc;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
rc.setType(DestinationType.TEMP_QUEUE);
return rc;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
rc.setType(DestinationType.TEMP_TOPIC);
return rc;
default:
return null;
}
}
ActiveMQDestination convert(String dest) {
int p = dest.indexOf(":");
if (p < 0) {
throw new IllegalArgumentException("Not in the valid destination format");
}
int type = Integer.parseInt(dest.substring(0, p));
String name = dest.substring(p + 1);
return convert(type, name);
}
private ActiveMQDestination convert(KahaDestination commandDestination) {
return convert(commandDestination.getType().getNumber(), commandDestination.getName());
}
private ActiveMQDestination convert(int type, String name) {
switch (KahaDestination.DestinationType.valueOf(type)) {
case QUEUE:
return new ActiveMQQueue(name);
case TOPIC:
return new ActiveMQTopic(name);
case TEMP_QUEUE:
return new ActiveMQTempQueue(name);
case TEMP_TOPIC:
return new ActiveMQTempTopic(name);
default:
throw new IllegalArgumentException("Not in the valid destination format");
}
}
public TransactionIdTransformer getTransactionIdTransformer() {
return transactionIdTransformer;
}
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
this.transactionIdTransformer = transactionIdTransformer;
}
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
AsyncJobKey(MessageId id, ActiveMQDestination destination) {
this.id = id;
this.destination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
&& destination.equals(((AsyncJobKey) obj).destination);
}
@Override
public int hashCode() {
return id.hashCode() + destination.hashCode();
}
@Override
public String toString() {
return destination.getPhysicalName() + "-" + id;
}
}
public interface StoreTask {
public boolean cancel();
public void aquireLocks();
public void releaseLocks();
}
class StoreQueueTask implements Runnable, StoreTask {
protected final Message message;
protected final ConnectionContext context;
protected final KahaDBMessageStore store;
protected final InnerFutureTask future;
protected final AtomicBoolean done = new AtomicBoolean();
protected final AtomicBoolean locked = new AtomicBoolean();
public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
this.store = store;
this.context = context;
this.message = message;
this.future = new InnerFutureTask(this);
}
public ListenableFuture<Object> getFuture() {
return this.future;
}
@Override
public boolean cancel() {
if (this.done.compareAndSet(false, true)) {
return this.future.cancel(false);
}
return false;
}
@Override
public void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalQueueSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
@Override
public void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
store.releaseLocalAsyncLock();
globalQueueSemaphore.release();
message.decrementReferenceCount();
}
}
@Override
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Throwable t) {
this.future.setException(t);
removeQueueTask(this.store, this.message.getMessageId());
}
}
protected Message getMessage() {
return this.message;
}
private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
public InnerFutureTask(Runnable runnable) {
super(runnable, null);
}
@Override
public void setException(final Throwable e) {
super.setException(e);
}
public void complete() {
super.set(null);
}
@Override
public void done() {
fireListener();
}
@Override
public void addListener(Runnable listener) {
this.listenerRef.set(listener);
if (isDone()) {
fireListener();
}
}
private void fireListener() {
Runnable listener = listenerRef.getAndSet(null);
if (listener != null) {
try {
listener.run();
} catch (Exception ignored) {
LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
}
}
}
}
}
class StoreTopicTask extends StoreQueueTask {
private final int subscriptionCount;
private final List<String> subscriptionKeys = new ArrayList<String>(1);
private final KahaDBTopicMessageStore topicStore;
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
int subscriptionCount) {
super(store, context, message);
this.topicStore = store;
this.subscriptionCount = subscriptionCount;
}
@Override
public void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalTopicSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
@Override
public void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
message.decrementReferenceCount();
store.releaseLocalAsyncLock();
globalTopicSemaphore.release();
}
}
/**
* add a key
*
* @param key
* @return true if all acknowledgements received
*/
public boolean addSubscriptionKey(String key) {
synchronized (this.subscriptionKeys) {
this.subscriptionKeys.add(key);
}
return this.subscriptionKeys.size() >= this.subscriptionCount;
}
@Override
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.topicStore.addMessage(context, message);
// apply any acks we have
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
}
}
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Throwable t) {
this.future.setException(t);
removeTopicTask(this.topicStore, this.message.getMessageId());
}
}
}
public class StoreTaskExecutor extends ThreadPoolExecutor {
public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
if (runnable instanceof StoreTask) {
((StoreTask)runnable).releaseLocks();
}
}
}
@Override
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
return new JobSchedulerStoreImpl();
}
/* (non-Javadoc)
* @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
*/
@Override
public boolean isPersistNoLocal() {
// Prior to v11 the broker did not store the noLocal value for durable subs.
return brokerService.getStoreOpenWireVersion() >= 11;
}
}