blob: 243cd89347d603f0fd3363c4634bf9db38dae57f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Store Implementation
*
*
*/
public class KahaStore implements Store {
private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".FileLockBroken",
"false"));
private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ ".DisableLocking",
"false"));
//according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
//and we can use it as a monitor for the lockset.
private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class);
private final File directory;
private final String mode;
private IndexRootContainer mapsContainer;
private IndexRootContainer listsContainer;
private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
private boolean closed;
private boolean initialized;
private boolean logIndexChanges;
private boolean useAsyncDataManager;
private long maxDataFileLength = 1024 * 1024 * 32;
private FileLock lock;
private boolean persistentIndex = true;
private RandomAccessFile lockFile;
private final AtomicLong storeSize;
private String defaultContainerName = DEFAULT_CONTAINER_NAME;
public KahaStore(String name, String mode) throws IOException {
this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
}
public KahaStore(File directory, String mode) throws IOException {
this(directory, mode, new AtomicLong());
}
public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
}
public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
this.mode = mode;
this.storeSize = storeSize;
this.directory = directory;
IOHelper.mkdirs(this.directory);
}
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (initialized) {
unlock();
for (ListContainerImpl container : lists.values()) {
container.close();
}
lists.clear();
for (MapContainerImpl container : maps.values()) {
container.close();
}
maps.clear();
for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = iter.next();
im.close();
iter.remove();
}
for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = iter.next();
dm.close();
iter.remove();
}
}
if (lockFile!=null) {
lockFile.close();
lockFile=null;
}
}
}
public synchronized void force() throws IOException {
if (initialized) {
for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = iter.next();
im.force();
}
for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = iter.next();
dm.force();
}
}
}
public synchronized void clear() throws IOException {
initialize();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
container.clear();
}
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
container.clear();
}
}
public synchronized boolean delete() throws IOException {
boolean result = true;
if (initialized) {
clear();
for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = iter.next();
result &= im.delete();
iter.remove();
}
for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = iter.next();
result &= dm.delete();
iter.remove();
}
}
if (directory != null && directory.isDirectory()) {
result =IOHelper.deleteChildren(directory);
String str = result ? "successfully deleted" : "failed to delete";
LOG.info("Kaha Store " + str + " data directory " + directory);
}
return result;
}
public synchronized boolean isInitialized() {
return initialized;
}
public boolean doesMapContainerExist(Object id) throws IOException {
return doesMapContainerExist(id, defaultContainerName);
}
public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
initialize();
ContainerId containerId = new ContainerId(id, containerName);
return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
}
public MapContainer getMapContainer(Object id) throws IOException {
return getMapContainer(id, defaultContainerName);
}
public MapContainer getMapContainer(Object id, String containerName) throws IOException {
return getMapContainer(id, containerName, persistentIndex);
}
public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
throws IOException {
initialize();
ContainerId containerId = new ContainerId(id, containerName);
MapContainerImpl result = maps.get(containerId);
if (result == null) {
DataManager dm = getDataManager(containerName);
IndexManager im = getIndexManager(dm, containerName);
IndexItem root = mapsContainer.getRoot(im, containerId);
if (root == null) {
root = mapsContainer.addRoot(im, containerId);
}
result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
maps.put(containerId, result);
}
return result;
}
public void deleteMapContainer(Object id) throws IOException {
deleteMapContainer(id, defaultContainerName);
}
public void deleteMapContainer(Object id, String containerName) throws IOException {
ContainerId containerId = new ContainerId(id, containerName);
deleteMapContainer(containerId);
}
public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
initialize();
MapContainerImpl container = maps.remove(containerId);
if (container != null) {
container.clear();
mapsContainer.removeRoot(container.getIndexManager(), containerId);
container.close();
}
}
public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
initialize();
Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id);
}
return set;
}
public boolean doesListContainerExist(Object id) throws IOException {
return doesListContainerExist(id, defaultContainerName);
}
public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
initialize();
ContainerId containerId = new ContainerId(id, containerName);
return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
}
public ListContainer getListContainer(Object id) throws IOException {
return getListContainer(id, defaultContainerName);
}
public ListContainer getListContainer(Object id, String containerName) throws IOException {
return getListContainer(id, containerName, persistentIndex);
}
public synchronized ListContainer getListContainer(Object id, String containerName,
boolean persistentIndex) throws IOException {
initialize();
ContainerId containerId = new ContainerId(id, containerName);
ListContainerImpl result = lists.get(containerId);
if (result == null) {
DataManager dm = getDataManager(containerName);
IndexManager im = getIndexManager(dm, containerName);
IndexItem root = listsContainer.getRoot(im, containerId);
if (root == null) {
root = listsContainer.addRoot(im, containerId);
}
result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
lists.put(containerId, result);
}
return result;
}
public void deleteListContainer(Object id) throws IOException {
deleteListContainer(id, defaultContainerName);
}
public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
ContainerId containerId = new ContainerId(id, containerName);
deleteListContainer(containerId);
}
public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
initialize();
ListContainerImpl container = lists.remove(containerId);
if (container != null) {
listsContainer.removeRoot(container.getIndexManager(), containerId);
container.clear();
container.close();
}
}
public synchronized Set<ContainerId> getListContainerIds() throws IOException {
initialize();
Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id);
}
return set;
}
/**
* @return the listsContainer
*/
public IndexRootContainer getListsContainer() {
return this.listsContainer;
}
/**
* @return the mapsContainer
*/
public IndexRootContainer getMapsContainer() {
return this.mapsContainer;
}
public synchronized DataManager getDataManager(String name) throws IOException {
DataManager dm = dataManagers.get(name);
if (dm == null) {
if (isUseAsyncDataManager()) {
AsyncDataManager t = new AsyncDataManager(storeSize);
t.setDirectory(directory);
t.setFilePrefix("async-data-" + name + "-");
t.setMaxFileLength((int)maxDataFileLength);
t.start();
dm = new DataManagerFacade(t, name);
} else {
DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
t.setMaxFileLength(maxDataFileLength);
dm = t;
}
if (logIndexChanges) {
recover(dm);
}
dataManagers.put(name, dm);
}
return dm;
}
public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
IndexManager im = indexManagers.get(name);
if (im == null) {
im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
indexManagers.put(name, im);
}
return im;
}
private void recover(final DataManager dm) throws IOException {
dm.recoverRedoItems(new RedoListener() {
public void onRedoItem(StoreLocation item, Object o) throws Exception {
RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
// IndexManager im = getIndexManager(dm, redo.getIndexName());
IndexManager im = getIndexManager(dm, dm.getName());
im.redo(redo);
}
});
}
public synchronized boolean isLogIndexChanges() {
return logIndexChanges;
}
public synchronized void setLogIndexChanges(boolean logIndexChanges) {
this.logIndexChanges = logIndexChanges;
}
/**
* @return the maxDataFileLength
*/
public synchronized long getMaxDataFileLength() {
return maxDataFileLength;
}
/**
* @param maxDataFileLength the maxDataFileLength to set
*/
public synchronized void setMaxDataFileLength(long maxDataFileLength) {
this.maxDataFileLength = maxDataFileLength;
}
/**
* @return the default index type
*/
public synchronized String getIndexTypeAsString() {
return persistentIndex ? "PERSISTENT" : "VM";
}
/**
* Set the default index type
*
* @param type "PERSISTENT" or "VM"
*/
public synchronized void setIndexTypeAsString(String type) {
if (type.equalsIgnoreCase("VM")) {
persistentIndex = false;
} else {
persistentIndex = true;
}
}
public boolean isPersistentIndex() {
return persistentIndex;
}
public void setPersistentIndex(boolean persistentIndex) {
this.persistentIndex = persistentIndex;
}
public synchronized boolean isUseAsyncDataManager() {
return useAsyncDataManager;
}
public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
this.useAsyncDataManager = useAsyncWriter;
}
/**
* @return size of store
* @see org.apache.activemq.kaha.Store#size()
*/
public long size(){
return storeSize.get();
}
public String getDefaultContainerName() {
return defaultContainerName;
}
public void setDefaultContainerName(String defaultContainerName) {
this.defaultContainerName = defaultContainerName;
}
public synchronized void initialize() throws IOException {
if (closed) {
throw new IOException("Store has been closed.");
}
if (!initialized) {
LOG.info("Kaha Store using data directory " + directory);
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock();
DataManager defaultDM = getDataManager(defaultContainerName);
IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
IndexItem mapRoot = new IndexItem();
IndexItem listRoot = new IndexItem();
if (rootIndexManager.isEmpty()) {
mapRoot.setOffset(0);
rootIndexManager.storeIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE);
rootIndexManager.storeIndex(listRoot);
rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
} else {
mapRoot = rootIndexManager.getIndex(0);
listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
}
initialized = true;
mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
/**
* Add interest in data files - then consolidate them
*/
generateInterestInMapDataFiles();
generateInterestInListDataFiles();
for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
DataManager dm = i.next();
dm.consolidateDataFiles();
}
}
}
private void lock() throws IOException {
synchronized (LOCKSET_MONITOR) {
if (!DISABLE_LOCKING && directory != null && lock == null) {
String key = getPropertyKey();
String property = System.getProperty(key);
if (null == property) {
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
if (lock == null) {
throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application");
} else
System.setProperty(key, new Date().toString());
}
} else { //already locked
throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
}
}
}
}
private void unlock() throws IOException {
synchronized (LOCKSET_MONITOR) {
if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
System.getProperties().remove(getPropertyKey());
if (lock.isValid()) {
lock.release();
}
lock = null;
}
}
}
private String getPropertyKey() throws IOException {
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}
/**
* scans the directory and builds up the IndexManager and DataManager
*
* @throws IOException if there is a problem accessing an index or data file
*/
private void generateInterestInListDataFiles() throws IOException {
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
DataManager dm = getDataManager(id.getDataContainerName());
IndexManager im = getIndexManager(dm, id.getDataContainerName());
IndexItem theRoot = listsContainer.getRoot(im, id);
long nextItem = theRoot.getNextItem();
while (nextItem != Item.POSITION_NOT_SET) {
IndexItem item = im.getIndex(nextItem);
item.setOffset(nextItem);
dm.addInterestInFile(item.getKeyFile());
dm.addInterestInFile(item.getValueFile());
nextItem = item.getNextItem();
}
}
}
/**
* scans the directory and builds up the IndexManager and DataManager
*
* @throws IOException if there is a problem accessing an index or data file
*/
private void generateInterestInMapDataFiles() throws IOException {
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
DataManager dm = getDataManager(id.getDataContainerName());
IndexManager im = getIndexManager(dm, id.getDataContainerName());
IndexItem theRoot = mapsContainer.getRoot(im, id);
long nextItem = theRoot.getNextItem();
while (nextItem != Item.POSITION_NOT_SET) {
IndexItem item = im.getIndex(nextItem);
item.setOffset(nextItem);
dm.addInterestInFile(item.getKeyFile());
dm.addInterestInFile(item.getValueFile());
nextItem = item.getNextItem();
}
}
}
}