package org.apache.commons.jcs3.auxiliary.disk.block;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.serialization.StandardSerializer;

/**
 * This class manages reading an writing data to disk. When asked to write a value, it returns a
 * block array. It can read an object from the block numbers in a byte array.
 * <p>
 * @author Aaron Smuts
 */
public class BlockDisk implements AutoCloseable
{
    /** The logger */
    private static final Log log = LogManager.getLog(BlockDisk.class);

    /** The size of the header that indicates the amount of data stored in an occupied block. */
    public static final byte HEADER_SIZE_BYTES = 4;
    // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt()

    /** defaults to 4kb */
    private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;

    /** Size of the blocks */
    private final int blockSizeBytes;

    /**
     * the total number of blocks that have been used. If there are no free, we will use this to
     * calculate the position of the next block.
     */
    private final AtomicInteger numberOfBlocks = new AtomicInteger(0);

    /** Empty blocks that can be reused. */
    private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>();

    /** The serializer. */
    private final IElementSerializer elementSerializer;

    /** Location of the spot on disk */
    private final String filepath;

    /** File channel for multiple concurrent reads and writes */
    private final FileChannel fc;

    /** How many bytes have we put to disk */
    private final AtomicLong putBytes = new AtomicLong(0);

    /** How many items have we put to disk */
    private final AtomicLong putCount = new AtomicLong(0);

    /**
     * Constructor for the Disk object
     * <p>
     * @param file
     * @param elementSerializer
     * @throws IOException
     */
    public BlockDisk(final File file, final IElementSerializer elementSerializer)
        throws IOException
    {
        this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer);
    }

    /**
     * Creates the file and set the block size in bytes.
     * <p>
     * @param file
     * @param blockSizeBytes
     * @throws IOException
     */
    public BlockDisk(final File file, final int blockSizeBytes)
        throws IOException
    {
        this(file, blockSizeBytes, new StandardSerializer());
    }

    /**
     * Creates the file and set the block size in bytes.
     * <p>
     * @param file
     * @param blockSizeBytes
     * @param elementSerializer
     * @throws IOException
     */
    public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer)
        throws IOException
    {
        this.filepath = file.getAbsolutePath();
        this.fc = FileChannel.open(file.toPath(),
                StandardOpenOption.CREATE,
                StandardOpenOption.READ,
                StandardOpenOption.WRITE);
        this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));

        log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes);

        this.blockSizeBytes = blockSizeBytes;
        this.elementSerializer = elementSerializer;
    }

    /**
     * Allocate a given number of blocks from the available set
     *
     * @param numBlocksNeeded
     * @return an array of allocated blocks
     */
    private int[] allocateBlocks(final int numBlocksNeeded)
    {
        assert numBlocksNeeded >= 1;

        final int[] blocks = new int[numBlocksNeeded];
        // get them from the empty list or take the next one
        for (int i = 0; i < numBlocksNeeded; i++)
        {
            Integer emptyBlock = emptyBlocks.poll();
            if (emptyBlock == null)
            {
                emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
            }
            blocks[i] = emptyBlock.intValue();
        }

        return blocks;
    }

    /**
     * This writes an object to disk and returns the blocks it was stored in.
     * <p>
     * The program flow is as follows:
     * <ol>
     * <li>Serialize the object.</li>
     * <li>Determine the number of blocks needed.</li>
     * <li>Look for free blocks in the emptyBlock list.</li>
     * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
     * <li>If the data will not fit in one block, create sub arrays.</li>
     * <li>Write the subarrays to disk.</li>
     * <li>If the process fails we should decrement the block count if we took from it.</li>
     * </ol>
     * @param object
     * @return the blocks we used.
     * @throws IOException
     */
    protected <T> int[] write(final T object)
        throws IOException
    {
        // serialize the object
        final byte[] data = elementSerializer.serialize(object);

        log.debug("write, total pre-chunking data.length = {0}", data.length);

        this.putBytes.addAndGet(data.length);
        this.putCount.incrementAndGet();

        // figure out how many blocks we need.
        final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);

        log.debug("numBlocksNeeded = {0}", numBlocksNeeded);

        // allocate blocks
        final int[] blocks = allocateBlocks(numBlocksNeeded);

        int offset = 0;
        final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
        final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
        final ByteBuffer dataBuffer = ByteBuffer.wrap(data);

        for (int i = 0; i < numBlocksNeeded; i++)
        {
            headerBuffer.clear();
            final int length = Math.min(maxChunkSize, data.length - offset);
            headerBuffer.putInt(length);
            headerBuffer.flip();

            dataBuffer.position(offset).limit(offset + length);
            final ByteBuffer slice = dataBuffer.slice();

            final long position = calculateByteOffsetForBlockAsLong(blocks[i]);
            // write the header
            int written = fc.write(headerBuffer, position);
            assert written == HEADER_SIZE_BYTES;

            //write the data
            written = fc.write(slice, position + HEADER_SIZE_BYTES);
            assert written == length;

            offset += length;
        }

        //fc.force(false);

        return blocks;
    }

    /**
     * Return the amount to put in each block. Fill them all the way, minus the header.
     * <p>
     * @param complete
     * @param numBlocksNeeded
     * @return byte[][]
     */
    protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded)
    {
        final byte[][] chunks = new byte[numBlocksNeeded][];

        if (numBlocksNeeded == 1)
        {
            chunks[0] = complete;
        }
        else
        {
            final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
            final int totalBytes = complete.length;
            int totalUsed = 0;
            for (short i = 0; i < numBlocksNeeded; i++)
            {
                // use the max that can be written to a block or whatever is left in the original
                // array
                final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed);
                final byte[] chunk = new byte[chunkSize];
                // copy from the used position to the chunk size on the complete array to the chunk
                // array.
                System.arraycopy(complete, totalUsed, chunk, 0, chunkSize);
                chunks[i] = chunk;
                totalUsed += chunkSize;
            }
        }

        return chunks;
    }

    /**
     * Reads an object that is located in the specified blocks.
     * <p>
     * @param blockNumbers
     * @return the object instance
     * @throws IOException
     * @throws ClassNotFoundException
     */
    protected <T> T read(final int[] blockNumbers)
        throws IOException, ClassNotFoundException
    {
        final ByteBuffer data;

        if (blockNumbers.length == 1)
        {
            data = readBlock(blockNumbers[0]);
        }
        else
        {
            data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes());
            // get all the blocks into data
            for (short i = 0; i < blockNumbers.length; i++)
            {
                final ByteBuffer chunk = readBlock(blockNumbers[i]);
                data.put(chunk);
            }

            data.flip();
        }

        log.debug("read, total post combination data.length = {0}", () -> data.limit());

        return elementSerializer.deSerialize(data.array(), null);
    }

    /**
     * This reads the occupied data in a block.
     * <p>
     * The first four bytes of the record should tell us how long it is. The data is read into a
     * byte array and then an object is constructed from the byte array.
     * <p>
     * @return byte[]
     * @param block
     * @throws IOException
     */
    private ByteBuffer readBlock(final int block)
        throws IOException
    {
        int datalen = 0;

        String message = null;
        boolean corrupted = false;
        final long fileLength = fc.size();

        final long position = calculateByteOffsetForBlockAsLong(block);
//        if (position > fileLength)
//        {
//            corrupted = true;
//            message = "Record " + position + " starts past EOF.";
//        }
//        else
        {
            final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
            fc.read(datalength, position);
            datalength.flip();
            datalen = datalength.getInt();
            if (position + datalen > fileLength)
            {
                corrupted = true;
                message = "Record " + position + " exceeds file length.";
            }
        }

        if (corrupted)
        {
            log.warn("\n The file is corrupt: \n {0}", message);
            throw new IOException("The File Is Corrupt, need to reset");
        }

        final ByteBuffer data = ByteBuffer.allocate(datalen);
        fc.read(data, position + HEADER_SIZE_BYTES);
        data.flip();

        return data;
    }

    /**
     * Add these blocks to the emptyBlock list.
     * <p>
     * @param blocksToFree
     */
    protected void freeBlocks(final int[] blocksToFree)
    {
        if (blocksToFree != null)
        {
            for (short i = 0; i < blocksToFree.length; i++)
            {
                emptyBlocks.offer(Integer.valueOf(blocksToFree[i]));
            }
        }
    }

    /**
     * Calculates the file offset for a particular block.
     * <p>
     * @param block number
     * @return the byte offset for this block in the file as a long
     * @since 2.0
     */
    protected long calculateByteOffsetForBlockAsLong(final int block)
    {
        return (long) block * blockSizeBytes;
    }

    /**
     * The number of blocks needed.
     * <p>
     * @param data
     * @return the number of blocks needed to store the byte array
     */
    protected int calculateTheNumberOfBlocksNeeded(final byte[] data)
    {
        final int dataLength = data.length;

        final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;

        // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
        if (dataLength <= oneBlock)
        {
            return 1;
        }

        int dividend = dataLength / oneBlock;

        if (dataLength % oneBlock != 0)
        {
            dividend++;
        }
        return dividend;
    }

    /**
     * Returns the file length.
     * <p>
     * @return the size of the file.
     * @throws IOException
     */
    protected long length()
        throws IOException
    {
        return fc.size();
    }

    /**
     * Closes the file.
     * <p>
     * @throws IOException
     */
    @Override
    public void close()
        throws IOException
    {
        this.numberOfBlocks.set(0);
        this.emptyBlocks.clear();
        fc.close();
    }

    /**
     * Resets the file.
     * <p>
     * @throws IOException
     */
    protected synchronized void reset()
        throws IOException
    {
        this.numberOfBlocks.set(0);
        this.emptyBlocks.clear();
        fc.truncate(0);
        fc.force(true);
    }

    /**
     * @return Returns the numberOfBlocks.
     */
    protected int getNumberOfBlocks()
    {
        return numberOfBlocks.get();
    }

    /**
     * @return Returns the blockSizeBytes.
     */
    protected int getBlockSizeBytes()
    {
        return blockSizeBytes;
    }

    /**
     * @return Returns the average size of the an element inserted.
     */
    protected long getAveragePutSizeBytes()
    {
        final long count = this.putCount.get();

        if (count == 0)
        {
            return 0;
        }
        return this.putBytes.get() / count;
    }

    /**
     * @return Returns the number of empty blocks.
     */
    protected int getEmptyBlocks()
    {
        return this.emptyBlocks.size();
    }

    /**
     * For debugging only.
     * <p>
     * @return String with details.
     */
    @Override
    public String toString()
    {
        final StringBuilder buf = new StringBuilder();
        buf.append("\nBlock Disk ");
        buf.append("\n  Filepath [" + filepath + "]");
        buf.append("\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]");
        buf.append("\n  BlockSizeBytes [" + this.blockSizeBytes + "]");
        buf.append("\n  Put Bytes [" + this.putBytes + "]");
        buf.append("\n  Put Count [" + this.putCount + "]");
        buf.append("\n  Average Size [" + getAveragePutSizeBytes() + "]");
        buf.append("\n  Empty Blocks [" + this.getEmptyBlocks() + "]");
        try
        {
            buf.append("\n  Length [" + length() + "]");
        }
        catch (final IOException e)
        {
            // swallow
        }
        return buf.toString();
    }

    /**
     * This is used for debugging.
     * <p>
     * @return the file path.
     */
    protected String getFilePath()
    {
        return filepath;
    }
}
