blob: ef2e0d97132c1c4072403ac4a74da605dfc373fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
Semaphore globalQueueSemaphore;
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
// when true, message order may be compromised when cache is exhausted if store is out
// or order w.r.t cache
private boolean concurrentStoreAndDispatchTopics = false;
private boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this);
}
public void setBrokerName(String brokerName) {
}
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
public SystemUsage getUsageManager() {
return this.usageManager;
}
/**
* @return the concurrentStoreAndDispatch
*/
public boolean isConcurrentStoreAndDispatchQueues() {
return this.concurrentStoreAndDispatchQueues;
}
/**
* @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
}
/**
* @return the concurrentStoreAndDispatch
*/
public boolean isConcurrentStoreAndDispatchTopics() {
return this.concurrentStoreAndDispatchTopics;
}
/**
* @param concurrentStoreAndDispatch
* the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
/**
* @return the maxAsyncJobs
*/
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
/**
* @param maxAsyncJobs
* the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs;
}
@Override
public void doStart() throws Exception {
super.doStart();
this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
thread.setDaemon(true);
return thread;
}
});
this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
thread.setDaemon(true);
return thread;
}
});
}
@Override
public void doStop(ServiceStopper stopper) throws Exception {
// drain down async jobs
LOG.info("Stopping async queue tasks");
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncQueueMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncQueueMaps.clear();
}
LOG.info("Stopping async topic tasks");
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncTopicMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncTopicMaps.clear();
}
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.drainPermits();
}
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
this.queueExecutor.shutdownNow();
}
if (this.topicExecutor != null) {
this.topicExecutor.shutdownNow();
}
LOG.info("Stopped KahaDB");
super.doStop(stopper);
}
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
StoreQueueTask task = null;
synchronized (store.asyncTaskMap) {
task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
synchronized (store.asyncTaskMap) {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.queueExecutor.execute(task);
}
protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
StoreTopicTask task = null;
synchronized (store.asyncTaskMap) {
task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
synchronized (store.asyncTaskMap) {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.topicExecutor.execute(task);
}
public TransactionStore createTransactionStore() throws IOException {
return this.transactionStore;
}
public boolean getForceRecoverIndex() {
return this.forceRecoverIndex;
}
public void setForceRecoverIndex(boolean forceRecoverIndex) {
this.forceRecoverIndex = forceRecoverIndex;
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
}
@Override
public ActiveMQDestination getDestination() {
return destination;
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
result.aquireLocks();
addQueueTask(this, result);
return result.getFuture();
} else {
return super.asyncAddQueueMessage(context, message);
}
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
StoreQueueTask task = null;
synchronized (asyncTaskMap) {
task = (StoreQueueTask) asyncTaskMap.get(key);
}
if (task != null) {
if (!task.cancel()) {
try {
task.future.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
} catch (Exception ignored) {
LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
}
removeMessage(context, ack);
} else {
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
} else {
removeMessage(context, ack);
}
} else {
removeMessage(context, ack);
}
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
store(command, true, null, null);
}
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toString();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
Location location;
indexLock.readLock().lock();
try {
location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
if (sequence == null) {
return null;
}
return sd.orderIndex.get(tx, sequence).location;
}
});
}finally {
indexLock.readLock().unlock();
}
if (location == null) {
return null;
}
return loadMessage(location);
}
public int getMessageCount() throws IOException {
try {
lockAsyncJobQueue();
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
// of
// messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
}finally {
indexLock.readLock().unlock();
}
} finally {
unlockAsyncJobQueue();
}
}
@Override
public boolean isEmpty() throws IOException {
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
public Boolean execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of
// messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
return sd.locationIndex.isEmpty(tx);
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void recover(final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
}
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
listener.hasSpace() && iterator.hasNext(); ) {
entry = iterator.next();
Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned) {
break;
}
}
sd.orderIndex.stoppedIterating();
}
});
}finally {
indexLock.readLock().unlock();
}
}
public void resetBatching() {
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getExistingStoredDestination(dest, tx);
if (sd != null) {
sd.orderIndex.resetCursorPosition();}
}
});
} catch (Exception e) {
LOG.error("Failed to reset batching",e);
}
}
@Override
public void setBatch(MessageId identity) throws IOException {
try {
final String key = identity.toString();
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key);
if (location != null) {
sd.orderIndex.setBatch(tx, location);
}
}
});
}finally {
indexLock.writeLock().unlock();
}
} finally {
unlockAsyncJobQueue();
}
}
@Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
@Override
public void start() throws Exception {
super.start();
}
@Override
public void stop() throws Exception {
super.stop();
}
protected void lockAsyncJobQueue() {
try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to lock async jobs for " + this.destination, e);
}
}
protected void unlockAsyncJobQueue() {
this.localDestinationSemaphore.release(this.maxAsyncJobs);
}
protected void acquireLocalAsyncLock() {
try {
this.localDestinationSemaphore.acquire();
} catch (InterruptedException e) {
LOG.error("Failed to aquire async lock for " + this.destination, e);
}
}
protected void releaseLocalAsyncLock() {
this.localDestinationSemaphore.release();
}
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger();
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
asyncTopicMaps.add(asyncTaskMap);
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
result.aquireLocks();
addTopicTask(this, result);
return result.getFuture();
} else {
return super.asyncAddTopicMessage(context, message);
}
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack)
throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
synchronized (asyncTaskMap) {
task = (StoreTopicTask) asyncTaskMap.get(key);
}
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(this, messageId);
if (task.cancel()) {
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
}
} else {
doAcknowledge(context, subscriptionKey, messageId, ack);
}
} else {
doAcknowledge(context, subscriptionKey, messageId, ack);
}
}
protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
}
store(command, false, null, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
.getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
.getValue().getSubscriptionInfo().newInput()));
subscriptions.add(info);
}
}
});
}finally {
indexLock.readLock().unlock();
}
SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
subscriptions.toArray(rc);
return rc;
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.readLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
if (command == null) {
return null;
}
return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
.getSubscriptionInfo().newInput()));
}
});
}finally {
indexLock.readLock().unlock();
}
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
if (cursorPos == null) {
// The subscription might not exist.
return 0;
}
int counter = 0;
for (Iterator<Entry<Long, HashSet<String>>> iterator =
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().contains(subscriptionKey)) {
counter++;
}
}
return counter;
}
});
}finally {
indexLock.writeLock().unlock();
}
}
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
}
});
}finally {
indexLock.writeLock().unlock();
}
}
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
if (pos == null) {
// sub deleted
return;
}
sd.orderIndex.setBatch(tx, pos);
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
}
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}
if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
sd.orderIndex.stoppedIterating();
if (entry != null) {
MessageOrderCursor copy = sd.orderIndex.cursor.copy();
sd.subscriptionCursors.put(subscriptionKey, copy);
}
}
});
}finally {
indexLock.writeLock().unlock();
}
}
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
}
});
}finally {
indexLock.writeLock().unlock();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
String subscriptionKey(String clientId, String subscriptionName) {
return clientId + ":" + subscriptionName;
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return this.transactionStore.proxy(new KahaDBMessageStore(destination));
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
}
/**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
*
* @param destination
* Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
*
* @param destination
* Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
public void deleteAllMessages() throws IOException {
deleteAllMessages = true;
}
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
indexLock.readLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey()));
}
}
}
private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
throws IOException {
boolean isEmptyTopic = false;
ActiveMQDestination dest = convert(entry.getKey());
if (dest.isTopic()) {
StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
if (loadedStore.subscriptionAcks.isEmpty(tx)) {
isEmptyTopic = true;
}
}
return isEmptyTopic;
}
});
}finally {
indexLock.readLock().unlock();
}
return rc;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock();
try {
return metadata.producerSequenceIdTracker.getLastSeqId(id);
} finally {
indexLock.readLock().unlock();
}
}
public long size() {
return storeSize.get();
}
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
public void checkpoint(boolean sync) throws IOException {
super.checkpointCleanup(false);
}
// /////////////////////////////////////////////////////////////////
// Internal helper methods.
// /////////////////////////////////////////////////////////////////
/**
* @param location
* @return
* @throws IOException
*/
Message loadMessage(Location location) throws IOException {
KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg;
}
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
rc.setType(DestinationType.QUEUE);
return rc;
case ActiveMQDestination.TOPIC_TYPE:
rc.setType(DestinationType.TOPIC);
return rc;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
rc.setType(DestinationType.TEMP_QUEUE);
return rc;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
rc.setType(DestinationType.TEMP_TOPIC);
return rc;
default:
return null;
}
}
ActiveMQDestination convert(String dest) {
int p = dest.indexOf(":");
if (p < 0) {
throw new IllegalArgumentException("Not in the valid destination format");
}
int type = Integer.parseInt(dest.substring(0, p));
String name = dest.substring(p + 1);
switch (KahaDestination.DestinationType.valueOf(type)) {
case QUEUE:
return new ActiveMQQueue(name);
case TOPIC:
return new ActiveMQTopic(name);
case TEMP_QUEUE:
return new ActiveMQTempQueue(name);
case TEMP_TOPIC:
return new ActiveMQTempTopic(name);
default:
throw new IllegalArgumentException("Not in the valid destination format");
}
}
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
AsyncJobKey(MessageId id, ActiveMQDestination destination) {
this.id = id;
this.destination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
&& destination.equals(((AsyncJobKey) obj).destination);
}
@Override
public int hashCode() {
return id.hashCode() + destination.hashCode();
}
@Override
public String toString() {
return destination.getPhysicalName() + "-" + id;
}
}
interface StoreTask {
public boolean cancel();
}
class StoreQueueTask implements Runnable, StoreTask {
protected final Message message;
protected final ConnectionContext context;
protected final KahaDBMessageStore store;
protected final InnerFutureTask future;
protected final AtomicBoolean done = new AtomicBoolean();
protected final AtomicBoolean locked = new AtomicBoolean();
public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
this.store = store;
this.context = context;
this.message = message;
this.future = new InnerFutureTask(this);
}
public Future<Object> getFuture() {
return this.future;
}
public boolean cancel() {
releaseLocks();
if (this.done.compareAndSet(false, true)) {
return this.future.cancel(false);
}
return false;
}
void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalQueueSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
store.releaseLocalAsyncLock();
globalQueueSemaphore.release();
message.decrementReferenceCount();
}
}
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);
} finally {
releaseLocks();
}
}
protected Message getMessage() {
return this.message;
}
private class InnerFutureTask extends FutureTask<Object> {
public InnerFutureTask(Runnable runnable) {
super(runnable, null);
}
public void setException(final Exception e) {
super.setException(e);
}
public void complete() {
super.set(null);
}
}
}
class StoreTopicTask extends StoreQueueTask {
private final int subscriptionCount;
private final List<String> subscriptionKeys = new ArrayList<String>(1);
private final KahaDBTopicMessageStore topicStore;
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
int subscriptionCount) {
super(store, context, message);
this.topicStore = store;
this.subscriptionCount = subscriptionCount;
}
@Override
void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalTopicSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
@Override
void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
message.decrementReferenceCount();
store.releaseLocalAsyncLock();
globalTopicSemaphore.release();
}
}
/**
* add a key
*
* @param key
* @return true if all acknowledgements received
*/
public boolean addSubscriptionKey(String key) {
synchronized (this.subscriptionKeys) {
this.subscriptionKeys.add(key);
}
return this.subscriptionKeys.size() >= this.subscriptionCount;
}
@Override
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.topicStore.addMessage(context, message);
// apply any acks we have
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
}
}
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);
} finally {
releaseLocks();
}
}
}
}