blob: d4e7f4204211d927dedfbd7d760cd7fd209e5035 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
*
*/
public class JournalMessageStore extends AbstractMessageStore {
private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
protected final JournalPersistenceAdapter peristenceAdapter;
protected final JournalTransactionStore transactionStore;
protected final MessageStore longTermStore;
protected final TransactionTemplate transactionTemplate;
protected RecordLocation lastLocation;
protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
private Map<MessageId, Message> cpAddedMessageIds;
private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
super(destination);
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
this.memoryUsage=memoryUsage;
longTermStore.setMemoryUsage(memoryUsage);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final boolean debug = LOG.isDebugEnabled();
message.incrementReferenceCount();
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if (!context.isInTransaction()) {
if (debug) {
LOG.debug("Journalled message add for: " + id + ", at: " + location);
}
addMessage(message, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
addMessage(message, location);
}
}
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
message.decrementReferenceCount();
}
});
}
}
void addMessage(final Message message, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = message.getMessageId();
messages.put(id, message);
}
}
public void replayAddMessage(ConnectionContext context, Message message) {
try {
// Only add the message if it has not already been added.
Message t = longTermStore.getMessage(message.getMessageId());
if (t == null) {
longTermStore.addMessage(context, message);
}
} catch (Throwable e) {
LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
}
}
/**
*/
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if (!context.isInTransaction()) {
if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
removeMessage(ack, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
}
final void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
Message message = messages.remove(id);
if (message == null) {
messageAcks.add(ack);
} else {
message.decrementReferenceCount();
}
}
}
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
Message t = longTermStore.getMessage(messageAck.getLastMessageId());
if (t != null) {
longTermStore.removeMessage(context, messageAck);
}
} catch (Throwable e) {
LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
}
/**
* @return
* @throws IOException
*/
public RecordLocation checkpoint() throws IOException {
return checkpoint(null);
}
/**
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
final List<MessageAck> cpRemovedMessageLocations;
final List<RecordLocation> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, Message>();
this.messageAcks = new ArrayList<MessageAck>();
}
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
synchronized (JournalMessageStore.this) {
Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
while (iterator.hasNext()) {
Message message = iterator.next();
try {
longTermStore.addMessage(context, message);
} catch (Throwable e) {
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
size += message.getSize();
message.decrementReferenceCount();
// Commit the batch if it's getting too big
if (size >= maxCheckpointMessageAddSize) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
size = 0;
}
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) {
try {
MessageAck ack = iterator.next();
longTermStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
if (postCheckpointTest != null) {
postCheckpointTest.execute();
}
}
});
synchronized (this) {
cpAddedMessageIds = null;
}
if (cpActiveJournalLocations.size() > 0) {
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
}
synchronized (this) {
return lastLocation;
}
}
/**
*
*/
public Message getMessage(MessageId identity) throws IOException {
Message answer = null;
synchronized (this) {
// Do we have a still have it in the journal?
answer = messages.get(identity);
if (answer == null && cpAddedMessageIds != null) {
answer = cpAddedMessageIds.get(identity);
}
}
if (answer != null) {
return answer;
}
// If all else fails try the long term message store.
return longTermStore.getMessage(identity);
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recover(listener);
}
public void start() throws Exception {
if (this.memoryUsage != null) {
this.memoryUsage.addUsageListener(peristenceAdapter);
}
longTermStore.start();
}
public void stop() throws Exception {
longTermStore.stop();
if (this.memoryUsage != null) {
this.memoryUsage.removeUsageListener(peristenceAdapter);
}
}
/**
* @return Returns the longTermStore.
*/
public MessageStore getLongTermMessageStore() {
return longTermStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException {
peristenceAdapter.checkpoint(true, true);
longTermStore.removeAllMessages(context);
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references.");
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException {
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned, listener);
}
public void resetBatching() {
longTermStore.resetBatching();
}
@Override
public void setBatch(MessageId messageId) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.setBatch(messageId);
}
}