// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.storage.upload;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;

import javax.inject.Inject;

import org.apache.cloudstack.api.BaseCmd;

import org.apache.cloudstack.api.command.user.iso.ExtractIsoCmd;
import org.apache.cloudstack.api.command.user.template.ExtractTemplateCmd;
import org.apache.cloudstack.api.command.user.volume.ExtractVolumeCmd;
import org.apache.cloudstack.api.response.ExtractResponse;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;

import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupStorageCommand;
import com.cloud.agent.api.storage.UploadAnswer;
import com.cloud.agent.api.storage.UploadCommand;
import com.cloud.agent.api.storage.UploadProgressCommand;
import com.cloud.agent.api.storage.UploadProgressCommand.RequestType;
import com.cloud.api.ApiDBUtils;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.host.Host;
import com.cloud.storage.Storage;
import com.cloud.storage.Upload.Status;
import com.cloud.storage.Upload.Type;
import com.cloud.storage.UploadVO;
import com.cloud.storage.dao.UploadDao;
import com.cloud.storage.upload.UploadState.UploadEvent;
import com.cloud.utils.exception.CloudRuntimeException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

public class UploadListener implements Listener {

    private static final class StatusTask extends ManagedContextTimerTask {
        private final UploadListener ul;
        private final RequestType reqType;

        public StatusTask(UploadListener ul, RequestType req) {
            reqType = req;
            this.ul = ul;
        }

        @Override
        protected void runInContext() {
            ul.sendCommand(reqType);

        }
    }

    private static final class TimeoutTask extends ManagedContextTimerTask {
        private final UploadListener ul;

        public TimeoutTask(UploadListener ul) {
            this.ul = ul;
        }

        @Override
        protected void runInContext() {
            ul.checkProgress();
        }
    }

    protected Logger logger = LogManager.getLogger(getClass());
    public static final int SMALL_DELAY = 100;
    public static final long STATUS_POLL_INTERVAL = 10000L;

    public static final String UPLOADED = Status.UPLOADED.toString();
    public static final String NOT_UPLOADED = Status.NOT_UPLOADED.toString();
    public static final String UPLOAD_ERROR = Status.UPLOAD_ERROR.toString();
    public static final String UPLOAD_IN_PROGRESS = Status.UPLOAD_IN_PROGRESS.toString();
    public static final String UPLOAD_ABANDONED = Status.ABANDONED.toString();
    public static final Map<String, String> responseNameMap;
    static {
        Map<String, String> tempMap = new HashMap<String, String>();
        tempMap.put(Type.ISO.toString(), BaseCmd.getResponseNameByClass(ExtractIsoCmd.class));
        tempMap.put(Type.TEMPLATE.toString(), BaseCmd.getResponseNameByClass(ExtractTemplateCmd.class));
        tempMap.put(Type.VOLUME.toString(), BaseCmd.getResponseNameByClass(ExtractVolumeCmd.class));
        tempMap.put("DEFAULT", "extractresponse");
        responseNameMap = Collections.unmodifiableMap(tempMap);
    }

    private DataStore sserver;

    private UploadDao uploadDao;

    private final UploadMonitorImpl uploadMonitor;

    private UploadState currState;

    private UploadCommand cmd;

    private Timer timer;

    private StatusTask statusTask;
    private TimeoutTask timeoutTask;
    private Date lastUpdated = new Date();
    private String jobId;
    private Long accountId;
    private String typeName;
    private Type type;
    private long asyncJobId;
    private long eventId;
    private AsyncJobManager asyncMgr;
    private ExtractResponse resultObj;
    @Inject
    EndPointSelector _epSelector;

    public AsyncJobManager getAsyncMgr() {
        return asyncMgr;
    }

    public void setAsyncMgr(AsyncJobManager asyncMgr) {
        this.asyncMgr = asyncMgr;
    }

    public long getAsyncJobId() {
        return asyncJobId;
    }

    public void setAsyncJobId(long asyncJobId) {
        this.asyncJobId = asyncJobId;
    }

    public long getEventId() {
        return eventId;
    }

    public void setEventId(long eventId) {
        this.eventId = eventId;
    }

    private final Map<String, UploadState> stateMap = new HashMap<String, UploadState>();
    private Long uploadId;

    public UploadListener(DataStore host, Timer timerInput, UploadDao uploadDao, UploadVO uploadObj, UploadMonitorImpl uploadMonitor, UploadCommand cmd, Long accountId,
            String typeName, Type type, long eventId, long asyncJobId, AsyncJobManager asyncMgr) {
        sserver = host;
        this.uploadDao = uploadDao;
        this.uploadMonitor = uploadMonitor;
        this.cmd = cmd;
        uploadId = uploadObj.getId();
        this.accountId = accountId;
        this.typeName = typeName;
        this.type = type;
        initStateMachine();
        currState = getState(Status.NOT_UPLOADED.toString());
        timer = timerInput;
        timeoutTask = new TimeoutTask(this);
        timer.schedule(timeoutTask, 3 * STATUS_POLL_INTERVAL);
        this.eventId = eventId;
        this.asyncJobId = asyncJobId;
        this.asyncMgr = asyncMgr;
        String extractId = null;
        if (type == Type.VOLUME) {
            extractId = ApiDBUtils.findVolumeById(uploadObj.getTypeId()).getUuid();
        } else {
            extractId = ApiDBUtils.findTemplateById(uploadObj.getTypeId()).getUuid();
        }
        resultObj =
            new ExtractResponse(extractId, typeName, ApiDBUtils.findAccountById(accountId).getUuid(), Status.NOT_UPLOADED.toString(), ApiDBUtils.findUploadById(uploadId)
                .getUuid());
        resultObj.setResponseName(responseNameMap.get(type.toString()));
        updateDatabase(Status.NOT_UPLOADED, cmd.getUrl(), "");
    }

    public UploadListener(UploadMonitorImpl monitor) {
        uploadMonitor = monitor;
    }

    public void checkProgress() {
        transition(UploadEvent.TIMEOUT_CHECK, null);
    }

    @Override
    public int getTimeout() {
        return -1;
    }

    @Override
    public boolean isRecurring() {
        return false;
    }

    public void setCommand(UploadCommand cmd) {
        this.cmd = cmd;
    }

    public void setJobId(String jobId) {
        this.jobId = jobId;
    }

    public String getJobId() {
        return jobId;
    }

    @Override
    public boolean processAnswers(long agentId, long seq, Answer[] answers) {
        boolean processed = false;
        if (answers != null & answers.length > 0) {
            if (answers[0] instanceof UploadAnswer) {
                final UploadAnswer answer = (UploadAnswer)answers[0];
                if (getJobId() == null) {
                    setJobId(answer.getJobId());
                } else if (!getJobId().equalsIgnoreCase(answer.getJobId())) {
                    return false;//TODO
                }
                transition(UploadEvent.UPLOAD_ANSWER, answer);
                processed = true;
            }
        }
        return processed;
    }

    @Override
    public boolean processCommands(long agentId, long seq, Command[] commands) {
        return false;
    }

    @Override
    public void processHostAdded(long hostId) {
    }

    @Override
    public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) {
        if (!(cmd instanceof StartupStorageCommand)) {
            return;
        }

        long agentId = agent.getId();

        StartupStorageCommand storage = (StartupStorageCommand)cmd;
        if (storage.getResourceType() == Storage.StorageResourceType.STORAGE_HOST || storage.getResourceType() == Storage.StorageResourceType.SECONDARY_STORAGE) {
            uploadMonitor.handleUploadSync(agentId);
        }
    }

    @Override
    public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
        return null;
    }

    public void setUploadInactive(Status reason) {
        uploadMonitor.handleUploadEvent(accountId, typeName, type, uploadId, reason, eventId);
    }

    public void logUploadStart() {
        //uploadMonitor.logEvent(accountId, event, "Storage server " + sserver.getName() + " started upload of " +type.toString() + " " + typeName, EventVO.LEVEL_INFO, eventId);
    }

    public void cancelTimeoutTask() {
        if (timeoutTask != null)
            timeoutTask.cancel();
    }

    public void cancelStatusTask() {
        if (statusTask != null)
            statusTask.cancel();
    }

    @Override
    public boolean processDisconnect(long agentId, com.cloud.host.Status state) {
        setDisconnected();
        return true;
    }

    @Override
    public void processHostAboutToBeRemoved(long hostId) {
    }

    @Override
    public void processHostRemoved(long hostId, long clusterId) {
    }

    @Override
    public boolean processTimeout(long agentId, long seq) {
        return true;
    }

    private void initStateMachine() {
        stateMap.put(Status.NOT_UPLOADED.toString(), new NotUploadedState(this));
        stateMap.put(Status.UPLOADED.toString(), new UploadCompleteState(this));
        stateMap.put(Status.UPLOAD_ERROR.toString(), new UploadErrorState(this));
        stateMap.put(Status.UPLOAD_IN_PROGRESS.toString(), new UploadInProgressState(this));
        stateMap.put(Status.ABANDONED.toString(), new UploadAbandonedState(this));
    }

    private UploadState getState(String stateName) {
        return stateMap.get(stateName);
    }

    private synchronized void transition(UploadEvent event, Object evtObj) {
        if (currState == null) {
            return;
        }
        String prevName = currState.getName();
        String nextState = currState.handleEvent(event, evtObj);
        if (nextState != null) {
            currState = getState(nextState);
            if (currState != null) {
                currState.onEntry(prevName, event, evtObj);
            } else {
                throw new CloudRuntimeException("Invalid next state: currState=" + prevName + ", evt=" + event + ", next=" + nextState);
            }
        } else {
            throw new CloudRuntimeException("Unhandled event transition: currState=" + prevName + ", evt=" + event);
        }
    }

    public Date getLastUpdated() {
        return lastUpdated;
    }

    public void setLastUpdated() {
        lastUpdated = new Date();
    }

    public void log(String message, Level level) {
        logger.log(level, message + ", " + type.toString() + " = " + typeName + " at host " + sserver.getName());
    }

    public void setDisconnected() {
        transition(UploadEvent.DISCONNECT, null);
    }

    public void scheduleStatusCheck(com.cloud.agent.api.storage.UploadProgressCommand.RequestType getStatus) {
        if (statusTask != null)
            statusTask.cancel();

        statusTask = new StatusTask(this, getStatus);
        timer.schedule(statusTask, STATUS_POLL_INTERVAL);
    }

    public void scheduleTimeoutTask(long delay) {
        if (timeoutTask != null)
            timeoutTask.cancel();

        timeoutTask = new TimeoutTask(this);
        timer.schedule(timeoutTask, delay);
        if (logger.isDebugEnabled()) {
            log("Scheduling timeout at " + delay + " ms", Level.DEBUG);
        }
    }

    public void updateDatabase(Status state, String uploadErrorString) {
        resultObj.setResultString(uploadErrorString);
        resultObj.setState(state.toString());
        asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
        asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));

        UploadVO vo = uploadDao.createForUpdate();
        vo.setUploadState(state);
        vo.setLastUpdated(new Date());
        vo.setErrorString(uploadErrorString);
        uploadDao.update(getUploadId(), vo);
    }

    public void updateDatabase(Status state, String uploadUrl, String uploadErrorString) {
        resultObj.setResultString(uploadErrorString);
        resultObj.setState(state.toString());
        asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
        asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));

        UploadVO vo = uploadDao.createForUpdate();
        vo.setUploadState(state);
        vo.setLastUpdated(new Date());
        vo.setUploadUrl(uploadUrl);
        vo.setJobId(null);
        vo.setUploadPercent(0);
        vo.setErrorString(uploadErrorString);

        uploadDao.update(getUploadId(), vo);
    }

    private Long getUploadId() {
        return uploadId;
    }

    public synchronized void updateDatabase(UploadAnswer answer) {

        if (answer.getErrorString().startsWith("553")) {
            answer.setErrorString(answer.getErrorString().concat("Please check if the file name already exists."));
        }
        resultObj.setResultString(answer.getErrorString());
        resultObj.setState(answer.getUploadStatus().toString());
        resultObj.setUploadPercent(answer.getUploadPct());

        if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS) {
            asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
            asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));
        } else if (answer.getUploadStatus() == Status.UPLOADED) {
            resultObj.setResultString("Success");
            asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, ApiSerializerHelper.toSerializedString(resultObj));
        } else {
            asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, ApiSerializerHelper.toSerializedString(resultObj));
        }
        UploadVO updateBuilder = uploadDao.createForUpdate();
        updateBuilder.setUploadPercent(answer.getUploadPct());
        updateBuilder.setUploadState(answer.getUploadStatus());
        updateBuilder.setLastUpdated(new Date());
        updateBuilder.setErrorString(answer.getErrorString());
        updateBuilder.setJobId(answer.getJobId());

        uploadDao.update(getUploadId(), updateBuilder);
    }

    public void sendCommand(RequestType reqType) {
        if (getJobId() != null) {
            if (logger.isTraceEnabled()) {
                log("Sending progress command ", Level.TRACE);
            }
            try {
                EndPoint ep = _epSelector.select(sserver);
                if (ep == null) {
                    String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
                    logger.error(errMsg);
                    return;
                }
                ep.sendMessageAsync(new UploadProgressCommand(getCommand(), getJobId(), reqType), new Callback(ep.getId(), this));
            } catch (Exception e) {
                logger.debug("Send command failed", e);
                setDisconnected();
            }
        }

    }

    private UploadCommand getCommand() {
        return cmd;
    }

    public void logDisconnect() {
        logger.warn("Unable to monitor upload progress of " + typeName + " at host " + sserver.getName());
    }

    public void scheduleImmediateStatusCheck(RequestType request) {
        if (statusTask != null)
            statusTask.cancel();
        statusTask = new StatusTask(this, request);
        timer.schedule(statusTask, SMALL_DELAY);
    }

    public void setCurrState(Status uploadState) {
        currState = getState(currState.toString());
    }

    public static class Callback implements AsyncCompletionCallback<Answer> {
        long id;
        Listener listener;

        public Callback(long id, Listener listener) {
            this.id = id;
            this.listener = listener;
        }

        @Override
        public void complete(Answer answer) {
            listener.processAnswers(id, -1, new Answer[] {answer});
        }
    }
}
