/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.amaterasu.leader.yarn

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.kotlin.KotlinModule

import kotlinx.coroutines.runBlocking

import org.apache.activemq.broker.BrokerService

import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.logging.KLogging
import org.apache.amaterasu.common.utils.ActiveNotifier
import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.leader.common.execution.JobLoader
import org.apache.amaterasu.leader.common.execution.JobManager
import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
import org.apache.amaterasu.leader.common.launcher.AmaOpts
import org.apache.amaterasu.leader.common.utilities.DataLoader
import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
import org.apache.commons.lang.exception.ExceptionUtils

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.retry.ExponentialBackoffRetry

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.*
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
import org.apache.hadoop.yarn.client.api.async.NMClientAsync
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
import org.apache.hadoop.yarn.util.Records

import org.apache.zookeeper.CreateMode
import java.io.*

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.LinkedBlockingQueue

import javax.jms.MessageConsumer

class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler {


    lateinit var address: String

    val broker: BrokerService = BrokerService()
    private val conf = YarnConfiguration()


    private val actionsBuffer = ConcurrentLinkedQueue<ActionData>()
    private val completedContainersAndTaskIds = ConcurrentHashMap<Long, String>()
    private val containersIdsToTask = ConcurrentHashMap<Long, ActionData>()
    private val yamlMapper = ObjectMapper(YAMLFactory())

    private lateinit var propPath: String
    private lateinit var props: FileInputStream
    private lateinit var jobManager: JobManager
    private lateinit var zkClient: CuratorFramework
    private lateinit var env: String
    private lateinit var rmClient: AMRMClientAsync<AMRMClient.ContainerRequest>
    private lateinit var frameworkFactory: FrameworkProvidersFactory
    private lateinit var config: ClusterConfig
    private lateinit var fs: FileSystem
    private lateinit var consumer: MessageConsumer
    private lateinit var configManager: ConfigManager
    private lateinit var notifier: ActiveNotifier
    private lateinit var nmClient: NMClientAsync

    init {
        yamlMapper.registerModule(KotlinModule())
    }

    fun execute(opts: AmaOpts) {

        propPath = System.getenv("PWD") + "/amaterasu.properties"
        props = FileInputStream(File(propPath))

        // no need for HDFS double check (nod to Aaron Rodgers)
        // jars on HDFS should have been verified by the YARN zkClient
        config = ClusterConfig.apply(props)
        fs = FileSystem.get(conf)

        initJob(opts)

        // now that the job was initiated, the curator zkClient is Started and we can
        // register the broker's address
        zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/${jobManager.jobId}/broker")
        zkClient.setData().forPath("/${jobManager.jobId}/broker", address.toByteArray())

        // once the broker is registered, we can remove the barrier so clients can connect
        log.info("/${jobManager.jobId}-report-barrier")
        val barrier = DistributedBarrier(zkClient, "/${jobManager.jobId}-report-barrier")
        barrier.removeBarrier()

        consumer = MessagingClientUtil.setupMessaging(address)
        notifier = ActiveNotifier(address)

        log.info("number of messages ${broker.adminView.totalMessageCount}")

        // Initialize clients to ResourceManager and NodeManagers
        nmClient = NMClientAsyncImpl(YarnNMCallbackHandler(notifier))
        nmClient.init(conf)
        nmClient.start()

        rmClient = startRMClient()

        val items = mutableListOf<FrameworkSetupProvider>()

        for (provider in frameworkFactory.providers.values) {
            items.add(provider)
        }
        val configItems = items.flatMap { it.configurationItems.asIterable() }
        configManager = ConfigManager(env, "repo", configItems)


        val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
        val maxMem = registrationResponse.maximumResourceCapability.memorySize
        val maxVCores = registrationResponse.maximumResourceCapability.virtualCores

        while (!jobManager.outOfActions) {
            val capability = Records.newRecord(Resource::class.java)

            val actionData = jobManager.nextActionData
            if (actionData != null) {

                notifier.info("requesting container fo ${actionData.name}")
                val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
                val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)

                var mem: Long = driverConfiguration.memory.toLong()
                mem = Math.min(mem, maxMem)
                capability.memorySize = mem

                var cpu = driverConfiguration.cpus
                cpu = Math.min(cpu, maxVCores)
                capability.virtualCores = cpu

                createTaskConfiguration(actionData)
                requestContainer(actionData, capability)

            }
        }

        log.info("Finished requesting containers")
        readLine()
    }

    private fun initJob(opts: AmaOpts) {

        this.env = opts.env
        frameworkFactory = FrameworkProvidersFactory(env, config)

        try {
            val retryPolicy = ExponentialBackoffRetry(1000, 3)
            zkClient = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
            zkClient.start()
        } catch (e: Exception) {
            log.error("Error connecting to zookeeper", e)
            throw e
        }

        val zkPath = zkClient.checkExists().forPath("/${opts.newJobId}")

        log.info("zkPath is $zkPath")
        if (zkPath != null) {
            log.info("resuming job" + opts.newJobId)
            jobManager = JobLoader.reloadJob(
                    opts.newJobId,
                    opts.userName,
                    opts.password,
                    zkClient,
                    config.Jobs().tasks().attempts(),
                    LinkedBlockingQueue<ActionData>())

        } else {
            log.info("new job is being created")
            try {

                jobManager = JobLoader.loadJob(
                        opts.repo,
                        opts.branch,
                        opts.newJobId,
                        opts.userName,
                        opts.password,
                        zkClient,
                        config.Jobs().tasks().attempts(),
                        LinkedBlockingQueue<ActionData>())
            } catch (e: Exception) {
                log.error("Error creating JobManager.", e)
                throw e
            }

        }

        jobManager.start()
        log.info("Started jobManager")
    }

    override fun onContainersAllocated(containers: MutableList<Container>?) = runBlocking {
        containers?.let {
            for (container in it) {

                log.info("container ${container.id} allocated")
                if (actionsBuffer.isNotEmpty()) {
                    val actionData = actionsBuffer.poll()
                    //val cd = async {
                    try {
                        log.info("container ${container.id} allocated")

                        val framework = frameworkFactory.getFramework(actionData.groupId)
                        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
                        val ctx = Records.newRecord(ContainerLaunchContext::class.java)

                        val envConf = configManager.getActionConfiguration(actionData.name, actionData.config)
                        val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address))

                        notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}")
                        ctx.commands = commands
                        ctx.tokens = allTokens()

                        ctx.localResources = setupContainerResources(framework, runnerProvider, actionData)
                        ctx.environment = framework.environmentVariables

                        nmClient.startContainerAsync(container, ctx)

                        jobManager.actionStarted(actionData.id)
                        containersIdsToTask[container.id.containerId] = actionData
                        notifier.info("created container for ${actionData.name} created")

                        log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
                    } catch (e: Exception) {
                        notifier.error("", "error launching container with ${e.message} in ${ExceptionUtils.getStackTrace(e)}")
                        requestContainer(actionData, container.resource)
                    }
                }
            }
        }
    }!!

    private fun allTokens(): ByteBuffer {
        // creating the credentials for container execution
        val credentials = UserGroupInformation.getCurrentUser().credentials
        val dob = DataOutputBuffer()
        credentials.writeTokenStorageToStream(dob)

        // removing the AM->RM token so that containers cannot access it.
        val iter = credentials.allTokens.iterator()
        log.info("Executing with tokens:")
        for (token in iter) {
            log.info(token.toString())
            if (token.kind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
        }
        return ByteBuffer.wrap(dob.data, 0, dob.length)
    }

    /**
     * Creates the map of resources to be copied into the container
     * @framework The frameworkSetupProvider for the action
     * @runnerProvider the actions runner provider
     */
    private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider, actionData: ActionData): Map<String, LocalResource> {

        val yarnJarPath = Path(config.yarn().hdfsJarsPath())

        // Getting framework (group) resources
        val result = framework.groupResources.map { it.path to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it.path))) }.toMap().toMutableMap()

        // Getting runner resources
        result.putAll(runnerProvider.runnerResources.map { it to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }.toMap())

        // getting the action specific resources
        result.putAll(runnerProvider.getActionResources(jobManager.jobId, actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) })

        // getting the action specific dependencies
        runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") }

        result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) })

        // Adding the Amaterasu configuration files
        result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
        result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))

        // getting the action executable
        val executable = runnerProvider.getActionExecutable(jobManager.jobId, actionData)

        // setting the action executable
        distributeFile(executable, "${jobManager.jobId}/${actionData.name}/")
        result[File(executable).name] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$executable")))

        result.forEach { log.debug("entry ${it.key} with value ${it.value}") }

        return result.map { x -> x.key.removePrefix("/") to x.value }.toMap()
    }

    private fun createTaskConfiguration(actionData: ActionData) {

        // setting up the configuration files for the container
        val envYaml = configManager.getActionConfigContent(actionData.name, actionData.config)
        writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")

        val dataStores = DataLoader.getTaskData(actionData, env).exports
        val dataStoresYaml = yamlMapper.writeValueAsString(dataStores)
        writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")

        val datasets = DataLoader.getDatasets(env)
        writeConfigFile(datasets, jobManager.jobId, actionData.name, "datasets.yaml")

        writeConfigFile("jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")

    }

    private fun writeConfigFile(content: String, jobId: String, actionName: String, fileName: String) {

        val actionDistPath = createDistPath("$jobId/$actionName/$fileName")
        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
        val targetPath = Path.mergePaths(yarnJarPath, actionDistPath)

        val outputStream = fs.create(targetPath)
        outputStream.write(content.toByteArray())
        outputStream.close()
        log.info("written file $targetPath")

    }

    private fun distributeFile(file: String, distributionPath: String) {


        val actionDistPath = createDistPath("$distributionPath/$file")
        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
        val targetPath = Path.mergePaths(yarnJarPath, actionDistPath)

        notifier.info("copying file $file, file status ${File(file).exists()} to $targetPath")

        log.info("target is $targetPath")

        fs.copyFromLocalFile(false, true, Path(file), targetPath)

    }

    private fun createDistPath(path: String): Path = Path("/dist/$path")

    private fun startRMClient(): AMRMClientAsync<AMRMClient.ContainerRequest> {
        val client = AMRMClientAsync.createAMRMClientAsync<AMRMClient.ContainerRequest>(1000, this)
        client.init(conf)
        client.start()
        return client
    }

    private fun createLocalResourceFromPath(path: Path): LocalResource {

        val stat = fs.getFileStatus(path)
        val fileResource = Records.newRecord(LocalResource::class.java)

        fileResource.shouldBeUploadedToSharedCache = true
        fileResource.visibility = LocalResourceVisibility.PUBLIC
        fileResource.resource = URL.fromPath(path)
        fileResource.size = stat.len
        fileResource.timestamp = stat.modificationTime
        fileResource.type = LocalResourceType.FILE
        fileResource.visibility = LocalResourceVisibility.PUBLIC
        return fileResource

    }

    private fun requestContainer(actionData: ActionData, capability: Resource) {

        actionsBuffer.add(actionData)
        log.info("About to ask container for action ${actionData.id} with mem ${capability.memorySize} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")

        // we have an action to schedule, let's request a container
        val priority: Priority = Records.newRecord(Priority::class.java)
        priority.priority = 1
        val containerReq = AMRMClient.ContainerRequest(capability, null, null, priority)
        rmClient.addContainerRequest(containerReq)
        log.info("Asked container for action ${actionData.id}")

    }

    override fun onNodesUpdated(updatedNodes: MutableList<NodeReport>?) {
        log.info("Nodes change. Nothing to report.")
    }

    override fun onShutdownRequest() {
        log.error("Shutdown requested.")
        stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
    }

    override fun getProgress(): Float {
        return jobManager.registeredActions.size.toFloat() / completedContainersAndTaskIds.size
    }

    override fun onError(e: Throwable?) {
        notifier.error("Error running a container ${e!!.message!!}", ExceptionUtils.getStackTrace(e))
        stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
    }

    override fun onContainersCompleted(statuses: MutableList<ContainerStatus>?) {
        for (status in statuses!!) {
            if (status.state == ContainerState.COMPLETE) {

                val containerId = status.containerId.containerId
                val task = containersIdsToTask[containerId]
                rmClient.releaseAssignedContainer(status.containerId)

                val taskId = task!!.id
                if (status.exitStatus == 0) {

                    //completedContainersAndTaskIds.put(containerId, task.id)
                    jobManager.actionComplete(taskId)
                    notifier.info("Container $containerId Complete with task $taskId with success.")
                } else {
                    // TODO: Check the getDiagnostics value and see if appropriate
                    jobManager.actionFailed(taskId, status.diagnostics)
                    notifier.error("Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics)
                }
            }
        }

        if (jobManager.outOfActions) {
            log.info("Finished all tasks successfully! Wow!")
            jobManager.actionsCount()
            stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
        } else {
            log.info("jobManager.registeredActions.size: ${jobManager.registeredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
        }
    }

    private fun stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String) {

        try {
            rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
        } catch (ex: YarnException) {

            log.error("Failed to unregister application", ex)
        } catch (e: IOException) {
            log.error("Failed to unregister application", e)
        }
        rmClient.stop()
        nmClient.stop()
    }

    companion object {
        @JvmStatic
        fun main(args: Array<String>) = AppMasterArgsParser().main(args)

    }
}
