| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.kaha.impl.data; |
| |
| import java.io.File; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.activemq.kaha.Marshaller; |
| import org.apache.activemq.kaha.StoreLocation; |
| import org.apache.activemq.kaha.impl.DataManager; |
| import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; |
| import org.apache.activemq.util.IOExceptionSupport; |
| import org.apache.activemq.util.IOHelper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Manages DataFiles |
| * |
| * |
| */ |
| public final class DataManagerImpl implements DataManager { |
| |
| public static final int ITEM_HEAD_SIZE = 5; // type + length |
| public static final byte DATA_ITEM_TYPE = 1; |
| public static final byte REDO_ITEM_TYPE = 2; |
| public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class); |
| private static final String NAME_PREFIX = "data-"; |
| |
| private final File directory; |
| private final String name; |
| private SyncDataFileReader reader; |
| private SyncDataFileWriter writer; |
| private DataFile currentWriteFile; |
| private long maxFileLength = MAX_FILE_LENGTH; |
| private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); |
| private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; |
| private String dataFilePrefix; |
| private final AtomicLong storeSize; |
| |
| public DataManagerImpl(File dir, final String name,AtomicLong storeSize) { |
| this.directory = dir; |
| this.name = name; |
| this.storeSize=storeSize; |
| |
| dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-"); |
| // build up list of current dataFiles |
| File[] files = dir.listFiles(new FilenameFilter() { |
| public boolean accept(File dir, String n) { |
| return dir.equals(directory) && n.startsWith(dataFilePrefix); |
| } |
| }); |
| if (files != null) { |
| for (int i = 0; i < files.length; i++) { |
| File file = files[i]; |
| String n = file.getName(); |
| String numStr = n.substring(dataFilePrefix.length(), n.length()); |
| int num = Integer.parseInt(numStr); |
| DataFile dataFile = new DataFile(file, num); |
| storeSize.addAndGet(dataFile.getLength()); |
| fileMap.put(dataFile.getNumber(), dataFile); |
| if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) { |
| currentWriteFile = dataFile; |
| } |
| } |
| } |
| } |
| |
| private DataFile createAndAddDataFile(int num) { |
| String fileName = dataFilePrefix + num; |
| File file = new File(directory, fileName); |
| DataFile result = new DataFile(file, num); |
| fileMap.put(result.getNumber(), result); |
| return result; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#getName() |
| */ |
| public String getName() { |
| return name; |
| } |
| |
| synchronized DataFile findSpaceForData(DataItem item) throws IOException { |
| if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) { |
| int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1; |
| if (currentWriteFile != null && currentWriteFile.isUnused()) { |
| removeDataFile(currentWriteFile); |
| } |
| currentWriteFile = createAndAddDataFile(nextNum); |
| } |
| item.setOffset(currentWriteFile.getLength()); |
| item.setFile(currentWriteFile.getNumber().intValue()); |
| int len = item.getSize() + ITEM_HEAD_SIZE; |
| currentWriteFile.incrementLength(len); |
| storeSize.addAndGet(len); |
| return currentWriteFile; |
| } |
| |
| DataFile getDataFile(StoreLocation item) throws IOException { |
| Integer key = Integer.valueOf(item.getFile()); |
| DataFile dataFile = fileMap.get(key); |
| if (dataFile == null) { |
| LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); |
| throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile()); |
| } |
| return dataFile; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, |
| * org.apache.activemq.kaha.StoreLocation) |
| */ |
| public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { |
| return getReader().readItem(marshaller, item); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, |
| * java.lang.Object) |
| */ |
| public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { |
| return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object) |
| */ |
| public synchronized StoreLocation storeRedoItem(Object payload) throws IOException { |
| return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, |
| * org.apache.activemq.kaha.Marshaller, java.lang.Object) |
| */ |
| public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload) |
| throws IOException { |
| getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener) |
| */ |
| public synchronized void recoverRedoItems(RedoListener listener) throws IOException { |
| |
| // Nothing to recover if there is no current file. |
| if (currentWriteFile == null) { |
| return; |
| } |
| |
| DataItem item = new DataItem(); |
| item.setFile(currentWriteFile.getNumber().intValue()); |
| item.setOffset(0); |
| while (true) { |
| byte type; |
| try { |
| type = getReader().readDataItemSize(item); |
| } catch (IOException ignore) { |
| LOG.trace("End of data file reached at (header was invalid): " + item); |
| return; |
| } |
| if (type == REDO_ITEM_TYPE) { |
| // Un-marshal the redo item |
| Object object; |
| try { |
| object = readItem(redoMarshaller, item); |
| } catch (IOException e1) { |
| LOG.trace("End of data file reached at (payload was invalid): " + item); |
| return; |
| } |
| try { |
| |
| listener.onRedoItem(item, object); |
| // in case the listener is holding on to item references, |
| // copy it |
| // so we don't change it behind the listener's back. |
| item = item.copy(); |
| |
| } catch (Exception e) { |
| throw IOExceptionSupport.create("Recovery handler failed: " + e, e); |
| } |
| } |
| // Move to the next item. |
| item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize()); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#close() |
| */ |
| public synchronized void close() throws IOException { |
| getWriter().close(); |
| for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { |
| DataFile dataFile = i.next(); |
| getWriter().force(dataFile); |
| dataFile.close(); |
| } |
| fileMap.clear(); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#force() |
| */ |
| public synchronized void force() throws IOException { |
| for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { |
| DataFile dataFile = i.next(); |
| getWriter().force(dataFile); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#delete() |
| */ |
| public synchronized boolean delete() throws IOException { |
| boolean result = true; |
| for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { |
| DataFile dataFile = i.next(); |
| storeSize.addAndGet(-dataFile.getLength()); |
| result &= dataFile.delete(); |
| } |
| fileMap.clear(); |
| return result; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int) |
| */ |
| public synchronized void addInterestInFile(int file) throws IOException { |
| if (file >= 0) { |
| Integer key = Integer.valueOf(file); |
| DataFile dataFile = fileMap.get(key); |
| if (dataFile == null) { |
| dataFile = createAndAddDataFile(file); |
| } |
| addInterestInFile(dataFile); |
| } |
| } |
| |
| synchronized void addInterestInFile(DataFile dataFile) { |
| if (dataFile != null) { |
| dataFile.increment(); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int) |
| */ |
| public synchronized void removeInterestInFile(int file) throws IOException { |
| if (file >= 0) { |
| Integer key = Integer.valueOf(file); |
| DataFile dataFile = fileMap.get(key); |
| removeInterestInFile(dataFile); |
| } |
| } |
| |
| synchronized void removeInterestInFile(DataFile dataFile) throws IOException { |
| if (dataFile != null) { |
| |
| if (dataFile.decrement() <= 0) { |
| if (dataFile != currentWriteFile) { |
| removeDataFile(dataFile); |
| } |
| } |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles() |
| */ |
| public synchronized void consolidateDataFiles() throws IOException { |
| List<DataFile> purgeList = new ArrayList<DataFile>(); |
| for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { |
| DataFile dataFile = i.next(); |
| if (dataFile.isUnused() && dataFile != currentWriteFile) { |
| purgeList.add(dataFile); |
| } |
| } |
| for (int i = 0; i < purgeList.size(); i++) { |
| DataFile dataFile = purgeList.get(i); |
| removeDataFile(dataFile); |
| } |
| } |
| |
| private void removeDataFile(DataFile dataFile) throws IOException { |
| fileMap.remove(dataFile.getNumber()); |
| if (writer != null) { |
| writer.force(dataFile); |
| } |
| storeSize.addAndGet(-dataFile.getLength()); |
| boolean result = dataFile.delete(); |
| LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller() |
| */ |
| public Marshaller getRedoMarshaller() { |
| return redoMarshaller; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller) |
| */ |
| public void setRedoMarshaller(Marshaller redoMarshaller) { |
| this.redoMarshaller = redoMarshaller; |
| } |
| |
| /** |
| * @return the maxFileLength |
| */ |
| public long getMaxFileLength() { |
| return maxFileLength; |
| } |
| |
| /** |
| * @param maxFileLength the maxFileLength to set |
| */ |
| public void setMaxFileLength(long maxFileLength) { |
| this.maxFileLength = maxFileLength; |
| } |
| |
| public String toString() { |
| return "DataManager:(" + NAME_PREFIX + name + ")"; |
| } |
| |
| public synchronized SyncDataFileReader getReader() { |
| if (reader == null) { |
| reader = createReader(); |
| } |
| return reader; |
| } |
| |
| protected synchronized SyncDataFileReader createReader() { |
| return new SyncDataFileReader(this); |
| } |
| |
| public synchronized void setReader(SyncDataFileReader reader) { |
| this.reader = reader; |
| } |
| |
| public synchronized SyncDataFileWriter getWriter() { |
| if (writer == null) { |
| writer = createWriter(); |
| } |
| return writer; |
| } |
| |
| private SyncDataFileWriter createWriter() { |
| return new SyncDataFileWriter(this); |
| } |
| |
| public synchronized void setWriter(SyncDataFileWriter writer) { |
| this.writer = writer; |
| } |
| |
| } |