// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#![allow(unused_imports)]
#![allow(unused_variables)]

use std::collections::VecDeque;
use std::convert::TryInto;
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};

use std::collections::HashMap;
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_proto::teaclave_storage_service::*;
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::teaclave_service;
use teaclave_types::*;
use uuid::Uuid;

use anyhow::anyhow;
use anyhow::Result;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum TeaclaveSchedulerError {
    #[error("scheduler service error")]
    SchedulerServiceErr,
    #[error("data error")]
    DataError,
    #[error("storage error")]
    StorageError,
}

impl From<TeaclaveSchedulerError> for TeaclaveServiceResponseError {
    fn from(error: TeaclaveSchedulerError) -> Self {
        TeaclaveServiceResponseError::RequestError(error.to_string())
    }
}

#[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
#[derive(Clone)]
pub(crate) struct TeaclaveSchedulerService {
    storage_client: Arc<Mutex<TeaclaveStorageClient>>,
    task_queue: Arc<Mutex<VecDeque<StagedTask>>>,
}

impl TeaclaveSchedulerService {
    pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
        let mut i = 0;
        let channel = loop {
            match storage_service_endpoint.connect() {
                Ok(channel) => break channel,
                Err(_) => {
                    anyhow::ensure!(i < 10, "failed to connect to storage service");
                    log::debug!("Failed to connect to storage service, retry {}", i);
                    i += 1;
                }
            }
            std::thread::sleep(std::time::Duration::from_secs(3));
        };
        let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
        let service = Self {
            storage_client,
            task_queue,
        };

        Ok(service)
    }

    fn pull_staged_task<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
        let dequeue_request = DequeueRequest::new(key);
        let dequeue_response = self
            .storage_client
            .clone()
            .lock()
            .map_err(|_| TeaclaveSchedulerError::StorageError)?
            .dequeue(dequeue_request)?;
        T::from_slice(dequeue_response.value.as_slice())
            .map_err(|_| TeaclaveSchedulerError::DataError.into())
    }

    fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> {
        let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned());
        self.get_from_db(&key)
    }

    fn get_from_db<T: Storable>(&self, key: &ExternalID) -> Result<T> {
        anyhow::ensure!(T::match_prefix(&key.prefix), "Key prefix doesn't match.");
        let get_request = GetRequest::new(key.to_bytes());
        let response = self
            .storage_client
            .clone()
            .lock()
            .map_err(|_| anyhow!("Cannot lock storage client"))?
            .get(get_request)?;
        T::from_slice(response.value.as_slice())
    }

    fn put_into_db(&self, item: &impl Storable) -> Result<()> {
        let k = item.key();
        let v = item.to_vec()?;
        let put_request = PutRequest::new(k.as_slice(), v.as_slice());
        let _put_response = self
            .storage_client
            .clone()
            .lock()
            .map_err(|_| anyhow!("Cannot lock storage client"))?
            .put(put_request)?;
        Ok(())
    }
}

impl TeaclaveScheduler for TeaclaveSchedulerService {
    // Publisher
    fn publish_task(
        &self,
        request: Request<PublishTaskRequest>,
    ) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
        // XXX: Publisher is not implemented
        let mut task_queue = self
            .task_queue
            .lock()
            .map_err(|_| anyhow!("Cannot lock task queue"))?;
        let staged_task = request.message.staged_task;
        task_queue.push_back(staged_task);
        Ok(PublishTaskResponse {})
    }

    // Subscriber
    fn subscribe(
        &self,
        request: Request<SubscribeRequest>,
    ) -> TeaclaveServiceResponseResult<SubscribeResponse> {
        // TODO: subscribe a specific topic
        unimplemented!()
    }

    fn pull_task(
        &self,
        request: Request<PullTaskRequest>,
    ) -> TeaclaveServiceResponseResult<PullTaskResponse> {
        let key = StagedTask::get_queue_key().as_bytes();
        let staged_task = self.pull_staged_task(key)?;
        let response = PullTaskResponse::new(staged_task);
        Ok(response)
    }

    fn update_task_status(
        &self,
        request: Request<UpdateTaskStatusRequest>,
    ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
        let request = request.message;
        let ts = self.get_task_state(&request.task_id)?;
        let task: Task<Run> = ts.try_into()?;

        log::debug!("UpdateTaskStatus: Task {:?}", task);
        // Only TaskStatus::Running is implicitly allowed here.

        let ts = TaskState::from(task);
        self.put_into_db(&ts)?;
        Ok(UpdateTaskStatusResponse {})
    }

    fn update_task_result(
        &self,
        request: Request<UpdateTaskResultRequest>,
    ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
        let request = request.message;
        let ts = self.get_task_state(&request.task_id)?;
        let mut task: Task<Finish> = ts.try_into()?;

        if let TaskResult::Ok(outputs) = &request.task_result {
            for (key, auth_tag) in outputs.tags_map.iter() {
                let outfile = task.update_output_cmac(key, auth_tag)?;
                self.put_into_db(outfile)?;
            }
        };

        // Updating task result means we have finished execution
        task.update_result(request.task_result)?;
        log::debug!("UpdateTaskResult: Task {:?}", task);

        let ts = TaskState::from(task);
        self.put_into_db(&ts)?;
        Ok(UpdateTaskResultResponse {})
    }
}

#[cfg(test_mode)]
mod test_mode {
    use super::*;
}
