blob: 7c96f232bb224013788c22ef89f198393baa9238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*
*/
public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
protected OpenWireFormat wireFormat = new OpenWireFormat();
protected KahaTransactionStore transactionStore;
protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private long maxDataFileLength = 32 * 1024 * 1024;
private File directory;
private String brokerName;
private Store theStore;
private boolean initialized;
private final AtomicLong storeSize;
private boolean persistentIndex = true;
private BrokerService brokerService;
public KahaPersistenceAdapter(AtomicLong size) {
this.storeSize=size;
}
public KahaPersistenceAdapter() {
this(new AtomicLong());
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
try {
Store store = getStore();
for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
Object obj = id.getKey();
if (obj instanceof ActiveMQDestination) {
rc.add((ActiveMQDestination)obj);
}
}
} catch (IOException e) {
LOG.error("Failed to get destinations ", e);
}
return rc;
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = queues.get(destination);
if (rc == null) {
rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
messageStores.put(destination, rc);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
queues.put(destination, rc);
}
return rc;
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
throws IOException {
TopicMessageStore rc = topics.get(destination);
if (rc == null) {
Store store = getStore();
MapContainer messageContainer = getMapContainer(destination, "topic-data");
MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
"topic-subs");
ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
messageStores.put(destination, rc);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
topics.put(destination, rc);
}
return rc;
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
try{
if(theStore!=null){
theStore.deleteMapContainer(destination,"queue-data");
}
}catch(IOException e ){
LOG.error("Failed to remove store map container for queue:"+destination, e);
}
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
protected MessageStore retrieveMessageStore(Object id) {
MessageStore result = messageStores.get(id);
return result;
}
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
while (true) {
try {
Store store = getStore();
MapContainer container = store
.getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load();
transactionStore = new KahaTransactionStore(this, container);
transactionStore.setBrokerService(brokerService);
break;
} catch (StoreLockedExcpetion e) {
LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the Store to be unlocked.");
try {
Thread.sleep(STORE_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
return transactionStore;
}
public void beginTransaction(ConnectionContext context) {
}
public void commitTransaction(ConnectionContext context) throws IOException {
if (theStore != null) {
theStore.force();
}
}
public void rollbackTransaction(ConnectionContext context) {
}
public void start() throws Exception {
initialize();
}
public void stop() throws Exception {
if (theStore != null) {
theStore.close();
}
}
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
public void deleteAllMessages() throws IOException {
if (theStore != null) {
if (theStore.isInitialized()) {
theStore.clear();
} else {
theStore.delete();
}
} else {
StoreFactory.delete(getStoreDirectory());
}
}
protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
throws IOException {
Store store = getStore();
MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new MessageMarshaller(wireFormat));
container.load();
return container;
}
protected MapContainer getSubsMapContainer(Object id, String containerName)
throws IOException {
Store store = getStore();
MapContainer container = store.getMapContainer(id, containerName);
container.setKeyMarshaller(Store.STRING_MARSHALLER);
container.setValueMarshaller(createMessageMarshaller());
container.load();
return container;
}
protected Marshaller<Object> createMessageMarshaller() {
return new CommandMarshaller(wireFormat);
}
protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
Store store = getStore();
ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
container.setMarshaller(createMessageMarshaller());
container.load();
return container;
}
/**
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
public void setUsageManager(SystemUsage usageManager) {
}
/**
* @return the maxDataFileLength
*/
public long getMaxDataFileLength() {
return maxDataFileLength;
}
public boolean isPersistentIndex() {
return persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMaxDataFileLength(long maxDataFileLength) {
this.maxDataFileLength = maxDataFileLength;
}
protected final synchronized Store getStore() throws IOException {
if (theStore == null) {
theStore = createStore();
}
return theStore;
}
protected final Store createStore() throws IOException {
Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
result.setMaxDataFileLength(maxDataFileLength);
result.setPersistentIndex(isPersistentIndex());
result.setDefaultContainerName("container-roots");
return result;
}
private String getStoreName() {
initialize();
return directory.getAbsolutePath();
}
private File getStoreDirectory() {
initialize();
return directory;
}
public String toString() {
return "KahaPersistenceAdapter(" + getStoreName() + ")";
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public String getBrokerName() {
return brokerName;
}
public File getDirectory() {
return this.directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public void checkpoint(boolean sync) throws IOException {
if (sync) {
getStore().force();
}
}
public long size(){
return storeSize.get();
}
private void initialize() {
if (!initialized) {
initialized = true;
if (this.directory == null) {
File file = new File(IOHelper.getDefaultDataDirectory());
file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
setDirectory(file);
}
try {
IOHelper.mkdirs(this.directory);
} catch (IOException e) {
throw new RuntimeException(e);
}
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
}
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public long getLastProducerSequenceId(ProducerId id) {
// reference store send has adequate duplicate suppression
return -1;
}
}