/*
 * Copyright 2015-2016 IBM Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package whisk.core.container

import whisk.common.Logging
import whisk.common.SimpleExec
import whisk.common.TransactionId
import whisk.core.entity.ActionLimits
import java.io.File
import scala.util.Try
import scala.language.postfixOps
import whisk.common.LoggingMarkers
import akka.event.Logging.ErrorLevel
import whisk.common.PrintStreamEmitter

/**
 * Information from docker ps.
 */
case class ContainerState(id: String, image: String, name: String)

trait ContainerUtils extends Logging {

    /** Defines the docker host, optional **/
    val dockerhost: String

    def makeEnvVars(env: Map[String, String]): Array[String] = {
        env.map {
            kv => s"-e ${kv._1}=${kv._2}"
        }.mkString(" ").split(" ").filter { x => x.nonEmpty }
    }

    /**
     * Creates a container instance and runs it.
     *
     * @param image the docker image to run
     * @return container id and container host
     */
    def bringup(name: Option[String], image: String, network: String, cpuShare:Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = {
        val id = makeContainer(name, image, network, cpuShare, env, args, limits, policy)
        val host = id.flatMap(_ => getContainerHostAndPort(name))
        (id, host)
    }

    /**
     * Pulls container images.
     */
    def pullImage(image: String)(implicit transid: TransactionId): DockerOutput = ContainerUtils.pullImage(dockerhost, image)

    /*
     * TODO: The file handle and process limits should be moved to some global limits config.
     */
    def makeContainer(name: Option[String], image: String, network: String, cpuShare: Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): ContainerId = {
        val nameOption = name.map(n => Array("--name", n)).getOrElse(Array.empty[String])
        val cpuArg = Array("-c", cpuShare.toString)
        val memoryArg = Array("-m", s"${limits.memory()}m")
        val capabilityArg = Array("--cap-drop", "NET_RAW", "--cap-drop", "NET_ADMIN")
        val consulServiceIgnore = Array("-e", "SERVICE_IGNORE=true")
        val fileHandleLimit = Array("--ulimit", "nofile=64:64")
        val processLimit = Array("--ulimit", "nproc=512:512")
        val securityOpts = policy map { p => Array("--security-opt", s"apparmor:${p}") } getOrElse (Array.empty[String])
        val containerNetwork = Array("--net", network)
        val cmd = Array("run") ++ makeEnvVars(env) ++ consulServiceIgnore ++ nameOption ++ cpuArg ++ memoryArg ++
            capabilityArg ++ fileHandleLimit ++ processLimit ++ securityOpts ++ containerNetwork ++ Array("-d", image) ++ args
        runDockerCmd(cmd: _*)
    }

    def killContainer(name: String)(implicit transid: TransactionId): DockerOutput = killContainer(Some(name))

    def killContainer(container: ContainerName)(implicit transid: TransactionId): DockerOutput = {
        container flatMap { name => runDockerCmd("kill", name) }
    }

    def getContainerLogs(container: ContainerName)(implicit transid: TransactionId): DockerOutput = {
        container flatMap { name => runDockerCmd("logs", name) }
    }

    def pauseContainer(name: String)(implicit transid: TransactionId): DockerOutput = {
        runDockerCmd(true, Array("pause", name))
    }

    def unpauseContainer(name: String)(implicit transid: TransactionId): DockerOutput = {
        runDockerCmd(true, Array("unpause", name))
    }

    def rmContainer(container: String)(implicit transid: TransactionId): DockerOutput = rmContainer(Some(container))

    /**
     * Forcefully removes a container, can be used on a running container but not a paused one.
     */
    def rmContainer(container: ContainerName)(implicit transid: TransactionId): DockerOutput = {
        container flatMap { name => runDockerCmd("rm", "-f", name) }
    }

    /*
     * List containers (-a if all).
     */
    def listContainers(all: Boolean)(implicit transid: TransactionId): Array[ContainerState] = {
        val tmp = Array("ps", "--no-trunc")
        val cmd = if (all) tmp :+ "-a" else tmp
        runDockerCmd(cmd: _*) map { output =>
            val lines = output.split("\n").drop(1) // skip the header
            lines.map(parsePsOutput)
        } getOrElse Array()
    }

    def getDockerLogSize(containerId: String, mounted: Boolean)(implicit transid: TransactionId): Long = {
        try {
            getDockerLogFile(containerId, mounted).length
        } catch {
            case e: Exception =>
                error(this, s"getDockerLogSize failed on $containerId")
                0
        }
    }

    /**
     * Reads the contents of the file at the given position.
     * It is assumed that the contents does exist and that region is not changing concurrently.
     */
    def getDockerLogContent(containerId: String, start: Long, end: Long, mounted: Boolean)(implicit transid: TransactionId): Array[Byte] = {
        var fis: java.io.FileInputStream = null
        try {
            val file = getDockerLogFile(containerId, mounted)
            fis = new java.io.FileInputStream(file)
            val channel = fis.getChannel().position(start)
            var remain = (end - start).toInt
            val buffer = java.nio.ByteBuffer.allocate(remain)
            while (remain > 0) {
                val read = channel.read(buffer)
                if (read > 0)
                    remain = read - read.toInt
            }
            buffer.array
        } catch {
            case e: Exception =>
                error(this, s"getDockerLogContent failed on $containerId")
                Array()
        } finally {
            if (fis != null) fis.close()
        }

    }

    def getContainerHostAndPort(container: ContainerName)(implicit transid: TransactionId): ContainerIP = {
        container map { name =>
            runDockerCmd("inspect", "--format", "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'", name) map {
                output => appendPort(output.substring(1, output.length - 1))
            }
        } getOrElse None
    }

    def runDockerCmd(args: String*)(implicit transid: TransactionId): DockerOutput = runDockerCmd(false, args)

    /**
     * Synchronously runs the given docker command returning stdout if successful.
     */
    def runDockerCmd(skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput =
        ContainerUtils.runDockerCmd(dockerhost, skipLogError, args)(transid)

    // If running outside a container, then logs files are in docker's own
    // /var/lib/docker/containers.  If running inside a container, is mounted at /containers.
    // Root access is needed when running outside the container.
    private def dockerContainerDir(mounted: Boolean) = {
        if (mounted) "/containers" else "/var/lib/docker/containers"
    }

    /**
     * Gets the filename of the docker logs of other containers that is mapped back into the invoker.
     */
    private def getDockerLogFile(containerId: String, mounted: Boolean) = {
        new java.io.File(s"""${dockerContainerDir(mounted)}/$containerId/$containerId-json.log""").getCanonicalFile()
    }

    private def parsePsOutput(line: String) = {
        val tokens = line.split("\\s+")
        ContainerState(tokens(0), tokens(1), tokens(tokens.length - 1))
    }

    protected def appendPort(host: String) = s"$host:8080"
}

object ContainerUtils extends Logging {

    private implicit val emitter: PrintStreamEmitter = this

    /**
     * Synchronously runs the given docker command returning stdout if successful.
     */
    def runDockerCmd(dockerhost: String, skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput = {
        val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args(0)))
        getDockerCmd(dockerhost) map { _ ++ args } map {
            SimpleExec.syncRunCmd(_)(transid)
        } map {
            case (stdout, stderr, exitCode) =>
                if (exitCode == 0) {
                    transid.finished(this, start)
                    Some(stdout.trim)
                } else {
                    if (!skipLogError) {
                        transid.failed(this, start, s"stdout:\n$stdout\nstderr:\n$stderr", ErrorLevel)
                    } else {
                        transid.failed(this, start)
                    }
                    None
                }
        } getOrElse {
            transid.failed(this, start, "docker executable not found", ErrorLevel)
            None
        }
    }

    private def file(path: String) = Try { new File(path) } filter { _.exists } toOption

    def getDockerCmd(dockerhost: String) = {
        val dockerLoc = file("/usr/bin/docker") orElse file("/usr/local/bin/docker")
        if (dockerhost == "localhost") {
            dockerLoc map { f => Array(f.toString) }
        } else {
            dockerLoc map { f => Array(f.toString, "--host", s"tcp://$dockerhost") }
        }
    }

    /**
     * Pulls container images.
     */
    def pullImage(dockerhost: String, image: String)(implicit transid: TransactionId): DockerOutput = {
        val cmd = Array("pull", image)
        runDockerCmd(dockerhost, false, cmd)
    }

}
