/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;

import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static org.apache.ignite.internal.util.IgniteUtils.findField;
import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
import static org.apache.ignite.internal.util.IgniteUtils.jdkVersion;
import static org.apache.ignite.internal.util.IgniteUtils.majorJavaVersion;

/**
 * File handle for one log segment.
 */
@SuppressWarnings("SignalWithoutCorrespondingAwait")
class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle {
    /** Memory mapped buffer fsync operation runner. */
    private static final MMapFSyncer FSYNCER = pickFsyncer();

    /** {@link FileWriteHandleImpl#written} atomic field updater. */
    private static final AtomicLongFieldUpdater<FileWriteHandleImpl> WRITTEN_UPD =
        AtomicLongFieldUpdater.newUpdater(FileWriteHandleImpl.class, "written");

    /** Page size. */
    private static final int PAGE_SIZE = GridUnsafe.pageSize();

    /** Serializer latest version to use. */
    private final int serializerVer =
        IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);

    /** Use mapped byte buffer. */
    private final boolean mmap;

    /** Created on resume logging. */
    private volatile boolean resume;

    /**
     * Position in current file after the end of last written record (incremented after file channel write operation)
     */
    volatile long written;

    /** */
    protected volatile long lastFsyncPos;

    /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}. */
    protected final AtomicBoolean stop = new AtomicBoolean(false);

    /** */
    private final Lock lock = new ReentrantLock();

    /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()}. */
    private final Condition fsync = lock.newCondition();

    /**
     * Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
     * #fileIO}=<code>null</code>.
     */
    private final Condition nextSegment = lock.newCondition();

    /** Buffer. */
    protected final SegmentedRingByteBuffer buf;

    /** Cdc Buffer, {@code null} if CDC is disabled. */
    private final @Nullable CdcProcessor cdcProc;

    /** */
    private final WALMode mode;

    /** Fsync delay. */
    private final long fsyncDelay;

    /** Persistence metrics tracker. */
    private final DataStorageMetricsImpl metrics;

    /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
    private final long maxWalSegmentSize;

    /** Logger. */
    protected final IgniteLogger log;

    /** */
    private final RecordSerializer serializer;

    /** Context. */
    protected final GridCacheSharedContext cctx;

    /** WAL writer worker. */
    private final FileHandleManagerImpl.WALWriter walWriter;

    /** Switch segment record offset. */
    private int switchSegmentRecordOffset;

    /**
     * @param cctx Context.
     * @param fileIO I/O file interface to use
     * @param serializer Serializer.
     * @param metrics Data storage metrics.
     * @param writer WAL writer.
     * @param pos Initial position.
     * @param mode WAL mode.
     * @param mmap Mmap.
     * @param resume Created on resume logging flag.
     * @param fsyncDelay Fsync delay.
     * @param maxWalSegmentSize Max WAL segment size.
     * @throws IOException If failed.
     */
    FileWriteHandleImpl(
        GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer,
        DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, CdcProcessor cdcProc,
        long pos, WALMode mode, boolean mmap, boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
        super(fileIO);
        assert serializer != null;

        this.mmap = mmap;
        this.mode = mode;
        this.fsyncDelay = fsyncDelay;
        this.metrics = metrics;
        this.maxWalSegmentSize = maxWalSegmentSize;
        this.log = cctx.logger(FileWriteHandleImpl.class);
        this.cctx = cctx;
        this.walWriter = writer;
        this.cdcProc = cdcProc;
        this.serializer = serializer;
        this.written = pos;
        this.lastFsyncPos = pos;
        this.resume = resume;
        this.buf = rbuf;

        if (!mmap)
            fileIO.position(pos);
    }

    /** {@inheritDoc} */
    @Override public int serializerVersion() {
        return serializer.version();
    }

    /** {@inheritDoc} */
    @Override public void finishResumeLogging() {
        resume = false;
    }

    /**
     * @throws StorageException If node is no longer valid and we missed a WAL operation.
     */
    private void checkNode() throws StorageException {
        if (cctx.kernalContext().invalid())
            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
                "previous error)");
    }

    /**
     * Write serializer version to current handle.
     */
    @Override public void writeHeader() {
        SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE);

        assert seg != null && seg.position() > 0;

        prepareSerializerVersionBuffer(getSegmentId(), serializerVer, false, seg.buffer());

        seg.release();
    }

    /**
     * @param rec Record to be added to write queue.
     * @return Pointer or null if roll over to next segment is required or already started by other thread.
     * @throws StorageException If failed.
     * @throws IgniteCheckedException If failed.
     */
    @Override @Nullable public WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
        assert rec.size() > 0 : rec;

        for (; ; ) {
            checkNode();

            SegmentedRingByteBuffer.WriteSegment seg;

            try {
                // Buffer can be in open state in case of resuming with different serializer version.
                if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
                    seg = buf.offerSafe(rec.size());
                else
                    seg = buf.offer(rec.size());
            }
            catch (IgniteException e) {
                // WAL record size is greater than the buffer's capacity.
                throw new IgniteCheckedException(e);
            }

            WALPointer ptr = null;

            if (seg != null) {
                try {
                    int pos = (int)(seg.position() - rec.size());

                    ByteBuffer buf = seg.buffer();

                    if (buf == null)
                        return null; // Can not write to this segment, need to switch to the next one.

                    ptr = new WALPointer(getSegmentId(), pos, rec.size());

                    rec.position(ptr);

                    fillBuffer(buf, rec);

                    if (mmap) {
                        // written field must grow only, but segment with greater position can be serialized
                        // earlier than segment with smaller position.
                        while (true) {
                            long written0 = written;

                            if (seg.position() > written0) {
                                if (WRITTEN_UPD.compareAndSet(this, written0, seg.position()))
                                    break;
                            }
                            else
                                break;
                        }
                    }

                    return ptr;
                }
                finally {
                    seg.release();

                    if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord)
                        flushOrWait(ptr);
                }
            }
            else
                walWriter.flushAll();
        }
    }

    /**
     * Flush or wait for concurrent flush completion.
     *
     * @param ptr Pointer.
     */
    public void flushOrWait(WALPointer ptr) throws IgniteCheckedException {
        if (ptr != null) {
            // If requested obsolete file index, it must be already flushed by close.
            if (ptr.index() != getSegmentId())
                return;
        }

        flush(ptr);
    }

    /** {@inheritDoc} */
    @Override public void flushAll() throws IgniteCheckedException {
        flush(null);
    }

    /**
     * @param ptr Pointer.
     */
    public void flush(WALPointer ptr) throws IgniteCheckedException {
        if (ptr == null) { // Unconditional flush.
            walWriter.flushAll();

            return;
        }

        assert ptr.index() == getSegmentId() : "Pointer segment idx is not equals to current write segment idx. " +
            "ptr=" + ptr + " segmetntId=" + getSegmentId();

        walWriter.flushBuffer(ptr.fileOffset() + ptr.length());
    }

    /**
     * @param buf Buffer.
     * @param rec WAL record.
     * @throws IgniteCheckedException If failed.
     */
    private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException {
        try {
            serializer.writeRecord(rec, buf);
        }
        catch (RuntimeException e) {
            throw new IllegalStateException("Failed to write record: " + rec, e);
        }
    }

    /**
     * Non-blocking check if this pointer needs to be sync'ed.
     *
     * @param ptr WAL pointer to check.
     * @return {@code False} if this pointer has been already sync'ed.
     */
    @Override public boolean needFsync(WALPointer ptr) {
        // If index has changed, it means that the log was rolled over and already sync'ed.
        // If requested position is smaller than last sync'ed, it also means all is good.
        // If position is equal, then our record is the last not synced.
        return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
    }

    /**
     * @return Pointer to the end of the last written record (probably not fsync-ed).
     */
    @Override public WALPointer position() {
        lock.lock();

        try {
            return new WALPointer(getSegmentId(), (int)written, 0);
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * @param ptr Pointer to sync.
     * @throws StorageException If failed.
     */
    @Override public void fsync(WALPointer ptr) throws StorageException, IgniteCheckedException {
        lock.lock();

        try {
            if (ptr != null) {
                if (!needFsync(ptr))
                    return;

                if (fsyncDelay > 0 && !stop.get()) {
                    // Delay fsync to collect as many updates as possible: trade latency for throughput.
                    U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);

                    if (!needFsync(ptr))
                        return;
                }
            }

            flushOrWait(ptr);

            if (stop.get())
                return;

            long lastFsyncPos0 = lastFsyncPos;
            long written0 = written;

            if (lastFsyncPos0 != written0) {
                // Fsync position must be behind.
                assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0;

                boolean metricsEnabled = metrics.metricsEnabled();

                long start = metricsEnabled ? System.nanoTime() : 0;

                if (mmap) {
                    long pos = ptr == null ? -1 : ptr.fileOffset();

                    List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos);

                    if (segs != null) {
                        assert segs.size() == 1;

                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);

                        int off = seg.buffer().position();
                        int len = seg.buffer().limit() - off;

                        fsync((MappedByteBuffer)buf.buf, off, len);

                        if (cdcProc != null) {
                            ByteBuffer cdcBuf = buf.buf.duplicate();
                            cdcBuf.position(off);
                            cdcBuf.limit(off + len);
                            cdcBuf.order(buf.buf.order());

                            cdcProc.collect(cdcBuf);
                        }

                        seg.release();
                    }
                }
                else
                    walWriter.force();

                lastFsyncPos = written;

                if (fsyncDelay > 0)
                    fsync.signalAll();

                long end = metricsEnabled ? System.nanoTime() : 0;

                if (metricsEnabled)
                    metrics.onFsync(end - start);
            }
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * @param buf Mapped byte buffer.
     * @param off Offset.
     * @param len Length.
     */
    private static void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException {
        FSYNCER.fsync(buf, off, len);
    }

    /** {@inheritDoc} */
    @Override public void closeBuffer() {
        buf.close();
    }

    /**
     * @return {@code true} If this thread actually closed the segment.
     * @throws IgniteCheckedException If failed.
     * @throws StorageException If failed.
     */
    @Override public boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
        if (stop.compareAndSet(false, true)) {
            lock.lock();

            try {
                flushOrWait(null);

                RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
                    .createSerializer(serializerVer);

                SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();

                int switchSegmentRecSize = backwardSerializer.size(segmentRecord);

                if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
                    segmentRecord.size(switchSegmentRecSize);

                    WALPointer segRecPtr = addRecord(segmentRecord);

                    if (segRecPtr != null) {
                        fsync(segRecPtr);

                        switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
                    }
                    else {
                        if (log.isDebugEnabled())
                            log.debug("Not enough space in wal segment to write segment switch");
                    }
                }
                else {
                    if (log.isDebugEnabled()) {
                        log.debug("Not enough space in wal segment to write segment switch, written="
                            + written + ", switchSegmentRecSize=" + switchSegmentRecSize);
                    }
                }

                // Unconditional flush (tail of the buffer)
                flushOrWait(null);

                if (mmap) {
                    List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);

                    if (segs != null) {
                        assert segs.size() == 1;

                        segs.get(0).release();
                    }
                }

                // Do the final fsync.
                if (mode != WALMode.NONE) {
                    if (mmap)
                        ((MappedByteBuffer)buf.buf).force();
                    else
                        walWriter.force();

                    lastFsyncPos = written;
                }

                if (mmap)
                    U.closeQuiet(fileIO);
                else
                    walWriter.close();

                if (!mmap && !rollOver)
                    buf.free();

                if (log.isDebugEnabled())
                    log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");

                return true;
            }
            finally {
                if (mmap)
                    buf.free();

                lock.unlock();
            }
        }
        else
            return false;
    }

    /**
     * Signals next segment available to wake up other worker threads waiting for WAL to write.
     */
    @Override public void signalNextAvailable() {
        lock.lock();

        try {
            assert cctx.kernalContext().invalid() ||
                written == lastFsyncPos || mode != WALMode.FSYNC :
                "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']';

            fileIO = null;

            nextSegment.signalAll();
        }
        finally {
            lock.unlock();
        }
    }

    /** {@inheritDoc} */
    @Override public void awaitNext() {
        lock.lock();

        try {
            while (fileIO != null)
                U.awaitQuiet(nextSegment);
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
     */
    public String safePosition() {
        FileIO io = fileIO;

        if (io == null)
            return "null";

        try {
            return String.valueOf(io.position());
        }
        catch (IOException e) {
            return "{Failed to read channel position: " + e.getMessage() + '}';
        }
    }

    /** {@inheritDoc} */
    @Override public int getSwitchSegmentRecordOffset() {
        return switchSegmentRecordOffset;
    }

    /**
     * Interface for performing {@link MappedByteBuffer} fsync.
     */
    private interface MMapFSyncer {
        /** {@link MappedByteBuffer#fd} */
        static final Field fd = findField(MappedByteBuffer.class, "fd");

        /**
         * Performs fsync.
         *
         * @param buf Mmapped byte buffer.
         * @param index Index of the syncing segment in the buffer.
         * @param len Length of the syncing segment part.
         * @throws IgniteCheckedException If failed.
         */
        void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException;
    }

    /**
     * @return FSyncer suitable for the current JRE.
     */
    private static MMapFSyncer pickFsyncer() {
        int javaVersion = majorJavaVersion(jdkVersion());

        if (javaVersion >= 15)
            return new JDK15FSyncer();

        return new LegacyFSyncer();
    }

    /**
     * Runs fsync operation on JRE15 and higher using MappedMemoryUtils which provides aligned fsync.
     */
    private static class JDK15FSyncer implements MMapFSyncer {
        /** {@link MappedByteBuffer#address}. */
        private static final Field address = findField(MappedByteBuffer.class, "address");

        /**
         * A flag true if this buffer is mapped against non-volatile
         * memory using one of the extended FileChannel.MapMode modes.
         */
        private static final Field isSync = findField(MappedByteBuffer.class, "isSync");

        /**
         * Mapped memory utils class. For compatibility reasons it might only be accessed via
         * {@link Class#forName(String)}.
         */
        private final Class<?> mappedMemoryUtils;

        /**
         * MappedMemoryUtils#force method.
         */
        private final Method force;

        /** Constructor. */
        public JDK15FSyncer() {
            try {
                mappedMemoryUtils = Class.forName("java.nio.MappedMemoryUtils");

                force = findNonPublicMethod(
                    mappedMemoryUtils,
                    "force",
                    FileDescriptor.class,
                    long.class, // Address
                    boolean.class, // Is sync?
                    long.class, // Index
                    long.class // Length
                );
            }
            catch (ClassNotFoundException e) {
                throw new IgniteException(e);
            }
        }

        /** {@inheritDoc} */
        @Override public void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException {
            try {
                boolean isSync = (boolean)JDK15FSyncer.isSync.get(buf);
                long address = (long)JDK15FSyncer.address.get(buf);

                assert address % PAGE_SIZE == 0 : "Buffer's address is not aligned: " + address;

                // Don't need to align manually as MappedMemoryUtils does the alignment
                force.invoke(mappedMemoryUtils, fd.get(buf), address, isSync, index, len);
            }
            catch (IllegalAccessException | InvocationTargetException e) {
                throw new IgniteCheckedException(e);
            }
        }
    }

    /**
     * Runs fsync on pre-java15 JVMs that don't offer a possibility to fsync mapped byte buffers in an aligned way.
     */
    private static class LegacyFSyncer implements MMapFSyncer {
        /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
        private static final Method force0 = findNonPublicMethod(
            MappedByteBuffer.class, "force0",
            java.io.FileDescriptor.class, long.class, long.class
        );

        /** {@link MappedByteBuffer#mappingOffset()}. */
        private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");

        /** {@link MappedByteBuffer#mappingAddress(long)}. */
        private static final Method mappingAddress = findNonPublicMethod(
            MappedByteBuffer.class, "mappingAddress", long.class
        );

        /** {@inheritDoc} */
        @Override public void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException {
            try {
                long mappedOff = (Long)mappingOffset.invoke(buf);

                assert mappedOff == 0 : mappedOff;

                long addr = (Long)mappingAddress.invoke(buf, mappedOff);

                long alignmentDelta = (addr + index) % PAGE_SIZE;

                // Given an alignment delta calculate the largest page aligned address
                // of the mapping less than or equal to the address of the buffer
                // element identified by the index.
                long alignedAddr = (addr + index) - alignmentDelta;

                force0.invoke(buf, fd.get(buf), alignedAddr, len + alignmentDelta);
            }
            catch (IllegalAccessException | InvocationTargetException e) {
                throw new IgniteCheckedException(e);
            }
        }
    }
}
