/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;

import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;

/**
 * Record V1 serializer.
 * Stores records in following format:
 * <ul>
 *     <li>Record type from {@link RecordType#ordinal()} incremented by 1</li>
 *     <li>WAL pointer to double check consistency</li>
 *     <li>Data</li>
 *     <li>CRC or zero padding</li>
 * </ul>
 */
public class RecordV1Serializer implements RecordSerializer {
    /** Length of Type */
    public static final int REC_TYPE_SIZE = 1;

    /** Length of WAL Pointer: Index (8) + File offset (4). */
    public static final int FILE_WAL_POINTER_SIZE = 8 + 4;

    /** Length of CRC value */
    public static final int CRC_SIZE = 4;

    /** Total length of HEADER record. */
    public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE;

    /** Skip CRC calculation/check flag */
    public static boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);

    /** V1 data serializer. */
    private final RecordDataV1Serializer dataSerializer;

    /** Write pointer. */
    private final boolean writePointer;

    /**
     * Record type filter.
     * {@link FilteredRecord} is deserialized instead of original record if type doesn't match filter.
     */
    private final IgniteBiPredicate<RecordType, WALPointer> recordFilter;

    /** Skip position check flag. Should be set for reading compacted wal file with skipped physical records. */
    private final boolean skipPositionCheck;

    /**
     * Marshalled mode.
     * Records are not deserialized in this mode, {@link MarshalledRecord} with binary representation are read instead.
     */
    private final boolean marshalledMode;

    /** Thread-local heap byte buffer. */
    private final ThreadLocal<ByteBuffer> heapTlb = new ThreadLocal<ByteBuffer>() {
        @Override protected ByteBuffer initialValue() {
            ByteBuffer buf = ByteBuffer.allocate(4096);

            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);

            return buf;
        }
    };

    /** Record read/write functional interface. */
    private final RecordIO recordIO = new RecordIO() {

        /** {@inheritDoc} */
        @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException {
            int recordSize = dataSerializer.size(record);

            int recordSizeWithType = recordSize + REC_TYPE_SIZE;

            // Why this condition here, see SWITCH_SEGMENT_RECORD doc.
            return record.type() != SWITCH_SEGMENT_RECORD ?
                recordSizeWithType + FILE_WAL_POINTER_SIZE + CRC_SIZE : recordSizeWithType;
        }

        /** {@inheritDoc} */
        @Override public WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
            RecordType recType = readRecordType(in);

            if (recType == RecordType.SWITCH_SEGMENT_RECORD)
                throw new SegmentEofException("Reached end of segment", null);

            FileWALPointer ptr = readPosition(in);

            if (!skipPositionCheck && !F.eq(ptr, expPtr))
                throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
                        ", readPtr=" + ptr + ']', null);

            if (recType == null)
                throw new IOException("Unknown record type: " + recType);

            final WALRecord rec = dataSerializer.readRecord(recType, in);

            rec.position(ptr);

            if (recordFilter != null && !recordFilter.apply(rec.type(), ptr))
                return FilteredRecord.INSTANCE;
            else if (marshalledMode) {
                ByteBuffer buf = heapTlb.get();

                int recordSize = size(rec);

                if (buf.capacity() < recordSize)
                    heapTlb.set(buf = ByteBuffer.allocate(recordSize * 3 / 2).order(ByteOrder.nativeOrder()));
                else
                    buf.clear();

                writeRecord(rec, buf);

                buf.flip();

                assert buf.remaining() == recordSize;

                return new MarshalledRecord(rec.type(), rec.position(), buf);
            }
            else
                return rec;
        }

        /** {@inheritDoc} */
        @Override public void writeWithHeaders(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
            // Write record type.
            putRecordType(buf, dataSerializer.recordType(rec));

            // SWITCH_SEGMENT_RECORD should have only type, no need to write pointer.
            if (rec.type() == SWITCH_SEGMENT_RECORD)
                return;

            // Write record file position.
            putPositionOfRecord(buf, rec);

            // Write record data.
            dataSerializer.writeRecord(rec, buf);
        }
    };

    /**
     * Create an instance of V1 serializer.
     * @param dataSerializer V1 data serializer.
     * @param writePointer Write pointer.
     * @param marshalledMode Marshalled mode.
     * @param skipPositionCheck Skip position check mode.
     * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead of original record
     */
    public RecordV1Serializer(
        RecordDataV1Serializer dataSerializer,
        boolean writePointer,
        boolean marshalledMode,
        boolean skipPositionCheck,
        IgniteBiPredicate<RecordType, WALPointer> recordFilter
    ) {
        this.dataSerializer = dataSerializer;
        this.writePointer = writePointer;
        this.recordFilter = recordFilter;
        this.skipPositionCheck = skipPositionCheck;
        this.marshalledMode = marshalledMode;
    }

    /** {@inheritDoc} */
    @Override public int version() {
        return 1;
    }

    /** {@inheritDoc} */
    @Override public boolean writePointer() {
        return writePointer;
    }

    /** {@inheritDoc} */
    @SuppressWarnings("CastConflictsWithInstanceof")
    @Override public void writeRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedException {
        writeWithCrc(rec, buf, recordIO);
    }

    /** {@inheritDoc} */
    @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws  IOException, IgniteCheckedException {
        return readWithCrc(in0, expPtr, recordIO);
    }

    /** {@inheritDoc} */
    @SuppressWarnings("CastConflictsWithInstanceof")
    @Override public int size(WALRecord record) throws IgniteCheckedException {
        return recordIO.sizeWithHeaders(record);
    }

    /**
     * Saves position, WAL pointer (requires {@link #FILE_WAL_POINTER_SIZE} bytes)
     * @param buf Byte buffer to serialize version to.
     * @param ptr File WAL pointer to write.
     */
    public static void putPosition(ByteBuffer buf, FileWALPointer ptr) {
        buf.putLong(ptr.index());
        buf.putInt(ptr.fileOffset());
    }

    /**
     * Reads stored record from provided {@code io}.
     * NOTE: Method mutates position of {@code io}.
     *
     * @param io I/O interface for file.
     * @param segmentFileInputFactory File input factory.
     * @return Instance of {@link SegmentHeader} extracted from the file.
     * @throws IgniteCheckedException If failed to read serializer version.
     */
    public static SegmentHeader readSegmentHeader(SegmentIO io, SegmentFileInputFactory segmentFileInputFactory)
        throws IgniteCheckedException, IOException {
        try (ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
            ByteBufferBackedDataInput in = segmentFileInputFactory.createFileInput(io, buf);

            in.ensure(HEADER_RECORD_SIZE);

            int recordType = in.readUnsignedByte();

            if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
                throw new SegmentEofException("Reached logical end of the segment", null);

            WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1);

            if (type != WALRecord.RecordType.HEADER_RECORD)
                throw new IOException("Can't read serializer version", null);

            // Read file pointer.
            FileWALPointer ptr = readPosition(in);

            if (io.getSegmentId() != ptr.index())
                throw new SegmentEofException("Reached logical end of the segment by pointer", null);

            assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr;

            long hdrMagicNum = in.readLong();

            boolean compacted;

            if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC)
                compacted = false;
            else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC)
                compacted = true;
            else {
                throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) +
                    ", actual=" + U.hexLong(hdrMagicNum) + ']');
            }

            // Read serializer version.
            int ver = in.readInt();

            // Read and skip CRC.
            in.readInt();

            return new SegmentHeader(ver, compacted);
        }
    }

    /**
     * @param in Data input to read pointer from.
     * @return Read file WAL pointer.
     * @throws IOException If failed to write.
     */
    public static FileWALPointer readPosition(DataInput in) throws IOException {
        long idx = in.readLong();
        int fileOff = in.readInt();

        return new FileWALPointer(idx, fileOff, 0);
    }

    /**
     * Writes record file position to given {@code buf}.
     *
     * @param buf Buffer to write record file position.
     * @param rec WAL record.
     */
    private static void putPositionOfRecord(ByteBuffer buf, WALRecord rec) {
        putPosition(buf, (FileWALPointer)rec.position());
    }

    /**
     * Writes record type to given {@code buf}.
     *
     * @param buf Buffer to write record type.
     * @param type WAL record type.
     */
    static void putRecordType(ByteBuffer buf, RecordType type) {
        buf.put((byte)(type.ordinal() + 1));
    }

    /**
     * Reads record type from given {@code in}.
     *
     * @param in Buffer to read record type.
     * @return Record type.
     * @throws IgniteCheckedException If logical end of segment is reached.
     * @throws IOException In case of I/O problems.
     */
    static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
        int type = in.readUnsignedByte();

        if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
            throw new SegmentEofException("Reached logical end of the segment", null);

        return RecordType.fromOrdinal(type - 1);
    }

    /**
     * Reads record from file {@code in0} and validates CRC of record.
     *
     * @param in0 File input.
     * @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file.
     * @param reader Record reader I/O interface.
     * @return WAL record.
     * @throws EOFException In case of end of file.
     * @throws IgniteCheckedException If it's unable to read record.
     */
    static WALRecord readWithCrc(
        FileInput in0,
        WALPointer expPtr,
        RecordIO reader
    ) throws EOFException, IgniteCheckedException {
        long startPos = -1;

        try (SimpleFileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
            startPos = in0.position();

            WALRecord res = reader.readWithHeaders(in, expPtr);

            assert res != null;

            res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards.

            return res;
        }
        catch (EOFException | SegmentEofException | WalSegmentTailReachedException e) {
            throw e;
        }
        catch (Exception e) {
            long size = -1;

            try {
                size = in0.io().size();
            }
            catch (IOException ignore) {
                // No-op. It just for information. Fail calculate file size.
            }

            throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos + " size: " + size, e);
        }
    }

    /**
     * Writes record with calculated CRC to buffer {@code buf}.
     *
     * @param rec WAL record.
     * @param buf Buffer to write.
     * @param writer Record write I/O interface.
     * @throws IgniteCheckedException If it's unable to write record.
     */
    static void writeWithCrc(WALRecord rec, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
        assert rec.size() >= 0 && buf.remaining() >= rec.size() : rec.size();

        boolean switchSegmentRec = rec.type() == RecordType.SWITCH_SEGMENT_RECORD;

        int startPos = buf.position();

        writer.writeWithHeaders(rec, buf);

        // No need calculate and write CRC for SWITCH_SEGMENT_RECORD.
        if (switchSegmentRec)
            return;

        if (!skipCrc) {
            int curPos = buf.position();

            buf.position(startPos);

            // This call will move buffer position to the end of the record again.
            int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos);

            buf.putInt(crcVal);
        }
        else
            buf.putInt(0);
    }

    /**
     * @param buf Buffer.
     * @param ver Version to write.
     * @param allowNull Is {@code null}version allowed.
     */
    static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
        CacheVersionIO.write(buf, ver, allowNull);
    }

    /**
     * Changes the buffer position by the number of read bytes.
     *
     * @param in Data input to read from.
     * @param allowNull Is {@code null}version allowed.
     * @return Read cache version.
     */
    static GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
        // To be able to read serialization protocol version.
        in.ensure(1);

        try {
            int size = CacheVersionIO.readSize(in.buffer(), allowNull);

            in.ensure(size);

            return CacheVersionIO.read(in.buffer(), allowNull);
        }
        catch (IgniteCheckedException e) {
            throw new IOException(e);
        }
    }
}
