/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.airavata.mft.agent;

import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

public class TransportMediator {

    private static final Logger logger = LoggerFactory.getLogger(TransportMediator.class);

    /*
    Number of maximum transfers handled at atime
     */
    private final ExecutorService monitorPool;

    private String tempDataDir = "/tmp";
    private final int chunkedSize;
    private final boolean doChunkStreaming;

    private final ExecutorService chunkedExecutorService;

    public TransportMediator(String tempDataDir,
                             int concurrentTransfers,
                             int concurrentChunkedThreads,
                             int chunkedSize,
                             boolean doChunkStreaming) {
        this.tempDataDir = tempDataDir;
        monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
        this.chunkedSize = chunkedSize;
        chunkedExecutorService = Executors.newFixedThreadPool(concurrentChunkedThreads);
        this.doChunkStreaming = doChunkStreaming;
    }

    public void transferSingleThread(String transferId,
                                     TransferApiRequest request,
                                     ConnectorConfig srcCC,
                                     ConnectorConfig dstCC,
                                     BiConsumer<String, TransferState> onStatusCallback,
                                     BiConsumer<String, Boolean> exitingCallback) {

        final AtomicBoolean transferInProgress = new AtomicBoolean(true);

        try {

            logger.info("Stating transfer {}", transferId);

            Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
                    .resolveIncomingStreamingConnector(request.getSourceType());
            Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
                    .resolveOutgoingStreamingConnector(request.getDestinationType());

            Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
                    .resolveIncomingChunkedConnector(request.getSourceType());
            Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
                    .resolveOutgoingChunkedConnector(request.getDestinationType());



            onStatusCallback.accept(transferId, new TransferState()
                    .setPercentage(0)
                    .setState("RUNNING")
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Transfer is ongoing"));

            long start = System.currentTimeMillis();

            // Give priority for chunked transfers.
            // TODO: Provide a preference at the API level
            if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {

                logger.info("Starting the chunked transfer for transfer {}", transferId);

                long chunkSize = chunkedSize * 1024 * 1024L;

                CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);

                long fileLength = srcCC.getMetadata().getResourceSize();
                long uploadLength = 0L;
                int chunkIdx = 0;

                IncomingChunkedConnector inConnector = inChunkedConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));

                OutgoingChunkedConnector outConnector = outChunkedConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));

                inConnector.init(srcCC);
                outConnector.init(dstCC);

                while(uploadLength < fileLength) {

                    long endPos = uploadLength + chunkSize;
                    if (endPos > fileLength) {
                        endPos = fileLength;
                    }


                    completionService.submit(new ChunkMover(inConnector,
                            outConnector, uploadLength, endPos, chunkIdx,
                            transferId, doChunkStreaming));

                    uploadLength = endPos;
                    chunkIdx++;
                }


                for (int i = 0; i < chunkIdx; i++) {
                    Future<Integer> future = completionService.take();
                }

                inConnector.complete();
                outConnector.complete();
                logger.info("Completed chunked transfer for transfer {}", transferId);

            } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {

                logger.info("Starting streaming transfer for transfer {}", transferId);
                IncomingStreamingConnector inConnector = inStreamingConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));

                OutgoingStreamingConnector outConnector = outStreamingConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));

                inConnector.init(srcCC);
                outConnector.init(dstCC);

                String srcChild = request.getSourceChildResourcePath();
                String dstChild = request.getDestinationChildResourcePath();

                InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
                OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);

                long count = 0;
                final AtomicLong countAtomic = new AtomicLong();
                countAtomic.set(count);

                monitorPool.submit(() -> {
                    while (true) {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                        if (!transferInProgress.get()) {
                            logger.info("Status monitor is exiting for transfer {}", transferId);
                            break;
                        }
                        double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
                        logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
                        onStatusCallback.accept(transferId, new TransferState()
                                .setPercentage(transferPercentage)
                                .setState("RUNNING")
                                .setUpdateTimeMils(System.currentTimeMillis())
                                .setDescription("Transfer Progress Updated"));
                    }
                });

                int n;
                byte[] buffer = new byte[128 * 1024];
                for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
                    outputStream.write(buffer, 0, n);
                    countAtomic.set(count);
                }

                inConnector.complete();
                outConnector.complete();

                logger.info("Completed streaming ransfer for transfer {}", transferId);

            } else {
                throw new Exception("No matching connector found to perform the transfer");
            }

            long endTime = System.currentTimeMillis();

            double time = (endTime - start) / 1000.0;

            logger.info("Transfer {} completed. Time {} S.  Speed {} MB/s", transferId, time,
                    (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));

            onStatusCallback.accept(transferId, new TransferState()
                    .setPercentage(100)
                    .setState("COMPLETED")
                    .setUpdateTimeMils(endTime)
                    .setDescription("Transfer successfully completed"));

            exitingCallback.accept(transferId, true);
        } catch (Exception e) {

            logger.error("Transfer {} failed with error", transferId, e);

            onStatusCallback.accept(transferId, new TransferState()
                    .setPercentage(0)
                    .setState("FAILED")
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
        } finally {
            transferInProgress.set(false);
        }

    }

    public void destroy() {
        monitorPool.shutdown();
    }

    private class ChunkMover implements Callable<Integer> {

        IncomingChunkedConnector downloader;
        OutgoingChunkedConnector uploader;
        long startPos;
        long endPos;
        int chunkIdx;
        String transferId;
        boolean useStreaming;

        public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long startPos,
                          long endPos, int chunkIdx, String transferId, boolean useStreaming) {
            this.downloader = downloader;
            this.uploader = uploader;
            this.startPos = startPos;
            this.endPos = endPos;
            this.chunkIdx = chunkIdx;
            this.transferId = transferId;
            this.useStreaming = useStreaming;
        }

        @Override
        public Integer call() throws Exception {
            try {
                if (useStreaming) {
                    InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
                    uploader.uploadChunk(chunkIdx, startPos, endPos, inputStream);
                } else {
                    String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
                    downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
                    uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
                    new File(tempFile).delete();
                }
                return chunkIdx;
            } catch (Exception e) {
                logger.error("Failed to transfer ", e);
                throw e;
            }
        }
    }
}
