| package org.apache.commons.jcs3.auxiliary.disk.block; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.StandardOpenOption; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.commons.jcs3.engine.behavior.IElementSerializer; |
| import org.apache.commons.jcs3.log.Log; |
| import org.apache.commons.jcs3.log.LogManager; |
| import org.apache.commons.jcs3.utils.serialization.StandardSerializer; |
| |
| /** |
| * This class manages reading an writing data to disk. When asked to write a value, it returns a |
| * block array. It can read an object from the block numbers in a byte array. |
| * <p> |
| * @author Aaron Smuts |
| */ |
| public class BlockDisk implements AutoCloseable |
| { |
| /** The logger */ |
| private static final Log log = LogManager.getLog(BlockDisk.class); |
| |
| /** The size of the header that indicates the amount of data stored in an occupied block. */ |
| public static final byte HEADER_SIZE_BYTES = 4; |
| // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt() |
| |
| /** defaults to 4kb */ |
| private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024; |
| |
| /** Size of the blocks */ |
| private final int blockSizeBytes; |
| |
| /** |
| * the total number of blocks that have been used. If there are no free, we will use this to |
| * calculate the position of the next block. |
| */ |
| private final AtomicInteger numberOfBlocks = new AtomicInteger(0); |
| |
| /** Empty blocks that can be reused. */ |
| private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>(); |
| |
| /** The serializer. */ |
| private final IElementSerializer elementSerializer; |
| |
| /** Location of the spot on disk */ |
| private final String filepath; |
| |
| /** File channel for multiple concurrent reads and writes */ |
| private final FileChannel fc; |
| |
| /** How many bytes have we put to disk */ |
| private final AtomicLong putBytes = new AtomicLong(0); |
| |
| /** How many items have we put to disk */ |
| private final AtomicLong putCount = new AtomicLong(0); |
| |
| /** |
| * Constructor for the Disk object |
| * <p> |
| * @param file |
| * @param elementSerializer |
| * @throws IOException |
| */ |
| public BlockDisk(final File file, final IElementSerializer elementSerializer) |
| throws IOException |
| { |
| this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer); |
| } |
| |
| /** |
| * Creates the file and set the block size in bytes. |
| * <p> |
| * @param file |
| * @param blockSizeBytes |
| * @throws IOException |
| */ |
| public BlockDisk(final File file, final int blockSizeBytes) |
| throws IOException |
| { |
| this(file, blockSizeBytes, new StandardSerializer()); |
| } |
| |
| /** |
| * Creates the file and set the block size in bytes. |
| * <p> |
| * @param file |
| * @param blockSizeBytes |
| * @param elementSerializer |
| * @throws IOException |
| */ |
| public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer) |
| throws IOException |
| { |
| this.filepath = file.getAbsolutePath(); |
| this.fc = FileChannel.open(file.toPath(), |
| StandardOpenOption.CREATE, |
| StandardOpenOption.READ, |
| StandardOpenOption.WRITE); |
| this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes)); |
| |
| log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes); |
| |
| this.blockSizeBytes = blockSizeBytes; |
| this.elementSerializer = elementSerializer; |
| } |
| |
| /** |
| * Allocate a given number of blocks from the available set |
| * |
| * @param numBlocksNeeded |
| * @return an array of allocated blocks |
| */ |
| private int[] allocateBlocks(final int numBlocksNeeded) |
| { |
| assert numBlocksNeeded >= 1; |
| |
| final int[] blocks = new int[numBlocksNeeded]; |
| // get them from the empty list or take the next one |
| for (int i = 0; i < numBlocksNeeded; i++) |
| { |
| Integer emptyBlock = emptyBlocks.poll(); |
| if (emptyBlock == null) |
| { |
| emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement()); |
| } |
| blocks[i] = emptyBlock.intValue(); |
| } |
| |
| return blocks; |
| } |
| |
| /** |
| * This writes an object to disk and returns the blocks it was stored in. |
| * <p> |
| * The program flow is as follows: |
| * <ol> |
| * <li>Serialize the object.</li> |
| * <li>Determine the number of blocks needed.</li> |
| * <li>Look for free blocks in the emptyBlock list.</li> |
| * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li> |
| * <li>If the data will not fit in one block, create sub arrays.</li> |
| * <li>Write the subarrays to disk.</li> |
| * <li>If the process fails we should decrement the block count if we took from it.</li> |
| * </ol> |
| * @param object |
| * @return the blocks we used. |
| * @throws IOException |
| */ |
| protected <T> int[] write(final T object) |
| throws IOException |
| { |
| // serialize the object |
| final byte[] data = elementSerializer.serialize(object); |
| |
| log.debug("write, total pre-chunking data.length = {0}", data.length); |
| |
| this.putBytes.addAndGet(data.length); |
| this.putCount.incrementAndGet(); |
| |
| // figure out how many blocks we need. |
| final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data); |
| |
| log.debug("numBlocksNeeded = {0}", numBlocksNeeded); |
| |
| // allocate blocks |
| final int[] blocks = allocateBlocks(numBlocksNeeded); |
| |
| int offset = 0; |
| final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES; |
| final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES); |
| final ByteBuffer dataBuffer = ByteBuffer.wrap(data); |
| |
| for (int i = 0; i < numBlocksNeeded; i++) |
| { |
| headerBuffer.clear(); |
| final int length = Math.min(maxChunkSize, data.length - offset); |
| headerBuffer.putInt(length); |
| headerBuffer.flip(); |
| |
| dataBuffer.position(offset).limit(offset + length); |
| final ByteBuffer slice = dataBuffer.slice(); |
| |
| final long position = calculateByteOffsetForBlockAsLong(blocks[i]); |
| // write the header |
| int written = fc.write(headerBuffer, position); |
| assert written == HEADER_SIZE_BYTES; |
| |
| //write the data |
| written = fc.write(slice, position + HEADER_SIZE_BYTES); |
| assert written == length; |
| |
| offset += length; |
| } |
| |
| //fc.force(false); |
| |
| return blocks; |
| } |
| |
| /** |
| * Return the amount to put in each block. Fill them all the way, minus the header. |
| * <p> |
| * @param complete |
| * @param numBlocksNeeded |
| * @return byte[][] |
| */ |
| protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded) |
| { |
| final byte[][] chunks = new byte[numBlocksNeeded][]; |
| |
| if (numBlocksNeeded == 1) |
| { |
| chunks[0] = complete; |
| } |
| else |
| { |
| final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES; |
| final int totalBytes = complete.length; |
| int totalUsed = 0; |
| for (short i = 0; i < numBlocksNeeded; i++) |
| { |
| // use the max that can be written to a block or whatever is left in the original |
| // array |
| final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed); |
| final byte[] chunk = new byte[chunkSize]; |
| // copy from the used position to the chunk size on the complete array to the chunk |
| // array. |
| System.arraycopy(complete, totalUsed, chunk, 0, chunkSize); |
| chunks[i] = chunk; |
| totalUsed += chunkSize; |
| } |
| } |
| |
| return chunks; |
| } |
| |
| /** |
| * Reads an object that is located in the specified blocks. |
| * <p> |
| * @param blockNumbers |
| * @return the object instance |
| * @throws IOException |
| * @throws ClassNotFoundException |
| */ |
| protected <T> T read(final int[] blockNumbers) |
| throws IOException, ClassNotFoundException |
| { |
| final ByteBuffer data; |
| |
| if (blockNumbers.length == 1) |
| { |
| data = readBlock(blockNumbers[0]); |
| } |
| else |
| { |
| data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes()); |
| // get all the blocks into data |
| for (short i = 0; i < blockNumbers.length; i++) |
| { |
| final ByteBuffer chunk = readBlock(blockNumbers[i]); |
| data.put(chunk); |
| } |
| |
| data.flip(); |
| } |
| |
| log.debug("read, total post combination data.length = {0}", () -> data.limit()); |
| |
| return elementSerializer.deSerialize(data.array(), null); |
| } |
| |
| /** |
| * This reads the occupied data in a block. |
| * <p> |
| * The first four bytes of the record should tell us how long it is. The data is read into a |
| * byte array and then an object is constructed from the byte array. |
| * <p> |
| * @return byte[] |
| * @param block |
| * @throws IOException |
| */ |
| private ByteBuffer readBlock(final int block) |
| throws IOException |
| { |
| int datalen = 0; |
| |
| String message = null; |
| boolean corrupted = false; |
| final long fileLength = fc.size(); |
| |
| final long position = calculateByteOffsetForBlockAsLong(block); |
| // if (position > fileLength) |
| // { |
| // corrupted = true; |
| // message = "Record " + position + " starts past EOF."; |
| // } |
| // else |
| { |
| final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES); |
| fc.read(datalength, position); |
| datalength.flip(); |
| datalen = datalength.getInt(); |
| if (position + datalen > fileLength) |
| { |
| corrupted = true; |
| message = "Record " + position + " exceeds file length."; |
| } |
| } |
| |
| if (corrupted) |
| { |
| log.warn("\n The file is corrupt: \n {0}", message); |
| throw new IOException("The File Is Corrupt, need to reset"); |
| } |
| |
| final ByteBuffer data = ByteBuffer.allocate(datalen); |
| fc.read(data, position + HEADER_SIZE_BYTES); |
| data.flip(); |
| |
| return data; |
| } |
| |
| /** |
| * Add these blocks to the emptyBlock list. |
| * <p> |
| * @param blocksToFree |
| */ |
| protected void freeBlocks(final int[] blocksToFree) |
| { |
| if (blocksToFree != null) |
| { |
| for (short i = 0; i < blocksToFree.length; i++) |
| { |
| emptyBlocks.offer(Integer.valueOf(blocksToFree[i])); |
| } |
| } |
| } |
| |
| /** |
| * Calculates the file offset for a particular block. |
| * <p> |
| * @param block number |
| * @return the byte offset for this block in the file as a long |
| * @since 2.0 |
| */ |
| protected long calculateByteOffsetForBlockAsLong(final int block) |
| { |
| return (long) block * blockSizeBytes; |
| } |
| |
| /** |
| * The number of blocks needed. |
| * <p> |
| * @param data |
| * @return the number of blocks needed to store the byte array |
| */ |
| protected int calculateTheNumberOfBlocksNeeded(final byte[] data) |
| { |
| final int dataLength = data.length; |
| |
| final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES; |
| |
| // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes |
| if (dataLength <= oneBlock) |
| { |
| return 1; |
| } |
| |
| int dividend = dataLength / oneBlock; |
| |
| if (dataLength % oneBlock != 0) |
| { |
| dividend++; |
| } |
| return dividend; |
| } |
| |
| /** |
| * Returns the file length. |
| * <p> |
| * @return the size of the file. |
| * @throws IOException |
| */ |
| protected long length() |
| throws IOException |
| { |
| return fc.size(); |
| } |
| |
| /** |
| * Closes the file. |
| * <p> |
| * @throws IOException |
| */ |
| @Override |
| public void close() |
| throws IOException |
| { |
| this.numberOfBlocks.set(0); |
| this.emptyBlocks.clear(); |
| fc.close(); |
| } |
| |
| /** |
| * Resets the file. |
| * <p> |
| * @throws IOException |
| */ |
| protected synchronized void reset() |
| throws IOException |
| { |
| this.numberOfBlocks.set(0); |
| this.emptyBlocks.clear(); |
| fc.truncate(0); |
| fc.force(true); |
| } |
| |
| /** |
| * @return Returns the numberOfBlocks. |
| */ |
| protected int getNumberOfBlocks() |
| { |
| return numberOfBlocks.get(); |
| } |
| |
| /** |
| * @return Returns the blockSizeBytes. |
| */ |
| protected int getBlockSizeBytes() |
| { |
| return blockSizeBytes; |
| } |
| |
| /** |
| * @return Returns the average size of the an element inserted. |
| */ |
| protected long getAveragePutSizeBytes() |
| { |
| final long count = this.putCount.get(); |
| |
| if (count == 0) |
| { |
| return 0; |
| } |
| return this.putBytes.get() / count; |
| } |
| |
| /** |
| * @return Returns the number of empty blocks. |
| */ |
| protected int getEmptyBlocks() |
| { |
| return this.emptyBlocks.size(); |
| } |
| |
| /** |
| * For debugging only. |
| * <p> |
| * @return String with details. |
| */ |
| @Override |
| public String toString() |
| { |
| final StringBuilder buf = new StringBuilder(); |
| buf.append("\nBlock Disk "); |
| buf.append("\n Filepath [" + filepath + "]"); |
| buf.append("\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]"); |
| buf.append("\n BlockSizeBytes [" + this.blockSizeBytes + "]"); |
| buf.append("\n Put Bytes [" + this.putBytes + "]"); |
| buf.append("\n Put Count [" + this.putCount + "]"); |
| buf.append("\n Average Size [" + getAveragePutSizeBytes() + "]"); |
| buf.append("\n Empty Blocks [" + this.getEmptyBlocks() + "]"); |
| try |
| { |
| buf.append("\n Length [" + length() + "]"); |
| } |
| catch (final IOException e) |
| { |
| // swallow |
| } |
| return buf.toString(); |
| } |
| |
| /** |
| * This is used for debugging. |
| * <p> |
| * @return the file path. |
| */ |
| protected String getFilePath() |
| { |
| return filepath; |
| } |
| } |