blob: e6579f73f091fae1640c0235a18ca355a0122bf8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @org.apache.xbean.XBean
*
*/
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
private BrokerService brokerService;
protected Scheduler scheduler;
private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
private Journal journal;
private PersistenceAdapter longTermPersistence;
private final WireFormat wireFormat = new OpenWireFormat();
private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private SystemUsage usageManager;
private final long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024 * 1024;
private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private boolean fullCheckPoint;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
private TaskRunnerFactory taskRunnerFactory;
public JournalPersistenceAdapter() {
}
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
setJournal(journal);
setTaskRunnerFactory(taskRunnerFactory);
setPersistenceAdapter(longTermPersistence);
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
public void setJournal(Journal journal) {
this.journal = journal;
journal.setJournalEventListener(this);
}
public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
this.longTermPersistence = longTermPersistence;
}
final Runnable createPeriodicCheckpointTask() {
return new Runnable() {
public void run() {
long lastTime = 0;
synchronized (this) {
lastTime = lastCheckpointRequest;
}
if (System.currentTimeMillis() > lastTime + checkpointInterval) {
checkpoint(false, true);
}
}
};
}
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue)destination);
} else {
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
JournalMessageStore store = queues.get(destination);
if (store == null) {
MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
store = new JournalMessageStore(this, checkpointStore, destination);
queues.put(destination, store);
}
return store;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
JournalTopicMessageStore store = topics.get(destinationName);
if (store == null) {
TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
topics.put(destinationName, store);
}
return store;
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException {
return longTermPersistence.getLastMessageBrokerSequenceId();
}
public void beginTransaction(ConnectionContext context) throws IOException {
longTermPersistence.beginTransaction(context);
}
public void commitTransaction(ConnectionContext context) throws IOException {
longTermPersistence.commitTransaction(context);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
longTermPersistence.rollbackTransaction(context);
}
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() {
return doCheckpoint();
}
}, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
return t;
}
});
// checkpointExecutor.allowCoreThreadTimeOut(true);
this.usageManager.getMemoryUsage().addUsageListener(this);
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// Disabled periodic clean up as it deadlocks with the checkpoint
// operations.
((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
}
longTermPersistence.start();
createTransactionStore();
recover();
// Do a checkpoint periodically.
this.scheduler = new Scheduler("Journal Scheduler");
this.scheduler.start();
this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
}
public void stop() throws Exception {
this.usageManager.getMemoryUsage().removeUsageListener(this);
if (!started.compareAndSet(true, false)) {
return;
}
this.scheduler.cancel(periodicCheckpointTask);
this.scheduler.stop();
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
checkpointTask.shutdown();
checkpointExecutor.shutdown();
queues.clear();
topics.clear();
IOException firstException = null;
try {
journal.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
longTermPersistence.stop();
if (firstException != null) {
throw firstException;
}
}
// Properties
// -------------------------------------------------------------------------
public PersistenceAdapter getLongTermPersistence() {
return longTermPersistence;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
* The Journal give us a call back so that we can move old data out of the
* journal. Taking a checkpoint does this for us.
*
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/
public void overflowNotification(RecordLocation safeLocation) {
checkpoint(false, true);
}
/**
* When we checkpoint we move all the journalled data to long term storage.
*
*/
public void checkpoint(boolean sync, boolean fullCheckpoint) {
try {
if (journal == null) {
throw new IllegalStateException("Journal is closed.");
}
long now = System.currentTimeMillis();
CountDownLatch latch = null;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
lastCheckpointRequest = now;
if (fullCheckpoint) {
this.fullCheckPoint = true;
}
}
checkpointTask.wakeup();
if (sync) {
LOG.debug("Waking for checkpoint to complete.");
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Request to start checkpoint failed: " + e, e);
}
}
public void checkpoint(boolean sync) {
checkpoint(sync, sync);
}
/**
* This does the actual checkpoint.
*
* @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
boolean fullCheckpoint;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
fullCheckpoint = this.fullCheckPoint;
this.fullCheckPoint = false;
}
try {
LOG.debug("Checkpoint started.");
RecordLocation newMark = null;
ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
//
// We do many partial checkpoints (fullCheckpoint==false) to move
// topic messages
// to long term store as soon as possible.
//
// We want to avoid doing that for queue messages since removes the
// come in the same
// checkpoint cycle will nullify the previous message add.
// Therefore, we only
// checkpoint queues on the fullCheckpoint cycles.
//
if (fullCheckpoint) {
Iterator<JournalMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
try {
final JournalMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
} catch (Exception e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
}
}
Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
while (iterator.hasNext()) {
try {
final JournalTopicMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
} catch (Exception e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
}
try {
for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
FutureTask<RecordLocation> ft = iter.next();
RecordLocation mark = ft.get();
// We only set a newMark on full checkpoints.
if (fullCheckpoint) {
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
}
} catch (Throwable e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
if (fullCheckpoint) {
try {
if (newMark != null) {
LOG.debug("Marking journal at: " + newMark);
journal.setMark(newMark, true);
}
} catch (Exception e) {
LOG.error("Failed to mark the Journal: " + e, e);
}
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// We may be check pointing more often than the
// checkpointInterval if under high use
// But we don't want to clean up the db that often.
long now = System.currentTimeMillis();
if (now > lastCleanup + checkpointInterval) {
lastCleanup = now;
((JDBCPersistenceAdapter)longTermPersistence).cleanup();
}
}
}
LOG.debug("Checkpoint done.");
} finally {
latch.countDown();
}
synchronized (this) {
return this.fullCheckPoint;
}
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet packet = journal.read(location);
return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
} catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidRecordLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
RecordLocation pos = null;
int transactionCounter = 0;
LOG.info("Journal Recovery Started from: " + journal);
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message) {
Message message = (Message)c;
JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
if (message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
} else {
store.replayAddMessage(context, message);
transactionCounter++;
}
} else {
switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command = (JournalQueueAck)c;
JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
} else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++;
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: {
JournalTopicAck command = (JournalTopicAck)c;
JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
} else {
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++;
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE: {
JournalTransaction command = (JournalTransaction)c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null) {
break; // We may be trying to replay a commit
}
// that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation)iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
op.store.replayAddMessage(context, (Message)op.data);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck)op.data);
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck)op.data;
((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
}
}
transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
default:
throw new IOException("Invalid journal command type: " + command.getType());
}
} catch (IOException e) {
LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace)c;
LOG.debug("TRACE Entry: " + trace.getMessage());
break;
default:
LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
RecordLocation location = writeTraceMessage("RECOVERED", true);
journal.setMark(location, true);
LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
}
private IOException createReadException(RecordLocation location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
* @param command
* @param sync
* @return
* @throws IOException
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if (started.get()) {
try {
return journal.write(toPacket(wireFormat.marshal(command)), sync);
} catch (IOException ioe) {
LOG.error("Cannot write to the journal", ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
}
throw new IOException("closed");
}
private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90;
checkpoint(sync, true);
}
}
public JournalTransactionStore getTransactionStore() {
return transactionStore;
}
public void deleteAllMessages() throws IOException {
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
LOG.info("Journal deleted: ");
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
longTermPersistence.deleteAllMessages();
}
public SystemUsage getUsageManager() {
return usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
this.maxCheckpointWorkers = maxCheckpointWorkers;
}
public boolean isUseExternalMessageReferences() {
return false;
}
public void setUseExternalMessageReferences(boolean enable) {
if (enable) {
throw new IllegalArgumentException("The journal does not support message references.");
}
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
public void setBrokerName(String brokerName) {
longTermPersistence.setBrokerName(brokerName);
}
@Override
public String toString() {
return "JournalPersistenceAdapator(" + longTermPersistence + ")";
}
public void setDirectory(File dir) {
}
public long size(){
return 0;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
PersistenceAdapter pa = getLongTermPersistence();
if( pa instanceof BrokerServiceAware ) {
((BrokerServiceAware)pa).setBrokerService(brokerService);
}
}
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
}