| /* |
| * Copyright 2015-2016 IBM Corporation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package whisk.core.container |
| |
| import whisk.common.Logging |
| import whisk.common.SimpleExec |
| import whisk.common.TransactionId |
| import whisk.core.entity.ActionLimits |
| import java.io.File |
| import scala.util.Try |
| import scala.language.postfixOps |
| import whisk.common.LoggingMarkers |
| import akka.event.Logging.ErrorLevel |
| import whisk.common.PrintStreamEmitter |
| |
| /** |
| * Information from docker ps. |
| */ |
| case class ContainerState(id: String, image: String, name: String) |
| |
| trait ContainerUtils extends Logging { |
| |
| /** Defines the docker host, optional **/ |
| val dockerhost: String |
| |
| def makeEnvVars(env: Map[String, String]): Array[String] = { |
| env.map { |
| kv => s"-e ${kv._1}=${kv._2}" |
| }.mkString(" ").split(" ").filter { x => x.nonEmpty } |
| } |
| |
| /** |
| * Creates a container instance and runs it. |
| * |
| * @param image the docker image to run |
| * @return container id and container host |
| */ |
| def bringup(name: Option[String], image: String, network: String, cpuShare:Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = { |
| val id = makeContainer(name, image, network, cpuShare, env, args, limits, policy) |
| val host = id.flatMap(_ => getContainerHostAndPort(name)) |
| (id, host) |
| } |
| |
| /** |
| * Pulls container images. |
| */ |
| def pullImage(image: String)(implicit transid: TransactionId): DockerOutput = ContainerUtils.pullImage(dockerhost, image) |
| |
| /* |
| * TODO: The file handle and process limits should be moved to some global limits config. |
| */ |
| def makeContainer(name: Option[String], image: String, network: String, cpuShare: Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): ContainerId = { |
| val nameOption = name.map(n => Array("--name", n)).getOrElse(Array.empty[String]) |
| val cpuArg = Array("-c", cpuShare.toString) |
| val memoryArg = Array("-m", s"${limits.memory()}m") |
| val capabilityArg = Array("--cap-drop", "NET_RAW", "--cap-drop", "NET_ADMIN") |
| val consulServiceIgnore = Array("-e", "SERVICE_IGNORE=true") |
| val fileHandleLimit = Array("--ulimit", "nofile=64:64") |
| val processLimit = Array("--ulimit", "nproc=512:512") |
| val securityOpts = policy map { p => Array("--security-opt", s"apparmor:${p}") } getOrElse (Array.empty[String]) |
| val containerNetwork = Array("--net", network) |
| val cmd = Array("run") ++ makeEnvVars(env) ++ consulServiceIgnore ++ nameOption ++ cpuArg ++ memoryArg ++ |
| capabilityArg ++ fileHandleLimit ++ processLimit ++ securityOpts ++ containerNetwork ++ Array("-d", image) ++ args |
| runDockerCmd(cmd: _*) |
| } |
| |
| def killContainer(name: String)(implicit transid: TransactionId): DockerOutput = killContainer(Some(name)) |
| |
| def killContainer(container: ContainerName)(implicit transid: TransactionId): DockerOutput = { |
| container flatMap { name => runDockerCmd("kill", name) } |
| } |
| |
| def getContainerLogs(container: ContainerName)(implicit transid: TransactionId): DockerOutput = { |
| container flatMap { name => runDockerCmd("logs", name) } |
| } |
| |
| def pauseContainer(name: String)(implicit transid: TransactionId): DockerOutput = { |
| runDockerCmd(true, Array("pause", name)) |
| } |
| |
| def unpauseContainer(name: String)(implicit transid: TransactionId): DockerOutput = { |
| runDockerCmd(true, Array("unpause", name)) |
| } |
| |
| def rmContainer(container: String)(implicit transid: TransactionId): DockerOutput = rmContainer(Some(container)) |
| |
| /** |
| * Forcefully removes a container, can be used on a running container but not a paused one. |
| */ |
| def rmContainer(container: ContainerName)(implicit transid: TransactionId): DockerOutput = { |
| container flatMap { name => runDockerCmd("rm", "-f", name) } |
| } |
| |
| /* |
| * List containers (-a if all). |
| */ |
| def listContainers(all: Boolean)(implicit transid: TransactionId): Array[ContainerState] = { |
| val tmp = Array("ps", "--no-trunc") |
| val cmd = if (all) tmp :+ "-a" else tmp |
| runDockerCmd(cmd: _*) map { output => |
| val lines = output.split("\n").drop(1) // skip the header |
| lines.map(parsePsOutput) |
| } getOrElse Array() |
| } |
| |
| def getDockerLogSize(containerId: String, mounted: Boolean)(implicit transid: TransactionId): Long = { |
| try { |
| getDockerLogFile(containerId, mounted).length |
| } catch { |
| case e: Exception => |
| error(this, s"getDockerLogSize failed on $containerId") |
| 0 |
| } |
| } |
| |
| /** |
| * Reads the contents of the file at the given position. |
| * It is assumed that the contents does exist and that region is not changing concurrently. |
| */ |
| def getDockerLogContent(containerId: String, start: Long, end: Long, mounted: Boolean)(implicit transid: TransactionId): Array[Byte] = { |
| var fis: java.io.FileInputStream = null |
| try { |
| val file = getDockerLogFile(containerId, mounted) |
| fis = new java.io.FileInputStream(file) |
| val channel = fis.getChannel().position(start) |
| var remain = (end - start).toInt |
| val buffer = java.nio.ByteBuffer.allocate(remain) |
| while (remain > 0) { |
| val read = channel.read(buffer) |
| if (read > 0) |
| remain = read - read.toInt |
| } |
| buffer.array |
| } catch { |
| case e: Exception => |
| error(this, s"getDockerLogContent failed on $containerId") |
| Array() |
| } finally { |
| if (fis != null) fis.close() |
| } |
| |
| } |
| |
| def getContainerHostAndPort(container: ContainerName)(implicit transid: TransactionId): ContainerIP = { |
| container map { name => |
| runDockerCmd("inspect", "--format", "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'", name) map { |
| output => appendPort(output.substring(1, output.length - 1)) |
| } |
| } getOrElse None |
| } |
| |
| def runDockerCmd(args: String*)(implicit transid: TransactionId): DockerOutput = runDockerCmd(false, args) |
| |
| /** |
| * Synchronously runs the given docker command returning stdout if successful. |
| */ |
| def runDockerCmd(skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput = |
| ContainerUtils.runDockerCmd(dockerhost, skipLogError, args)(transid) |
| |
| // If running outside a container, then logs files are in docker's own |
| // /var/lib/docker/containers. If running inside a container, is mounted at /containers. |
| // Root access is needed when running outside the container. |
| private def dockerContainerDir(mounted: Boolean) = { |
| if (mounted) "/containers" else "/var/lib/docker/containers" |
| } |
| |
| /** |
| * Gets the filename of the docker logs of other containers that is mapped back into the invoker. |
| */ |
| private def getDockerLogFile(containerId: String, mounted: Boolean) = { |
| new java.io.File(s"""${dockerContainerDir(mounted)}/$containerId/$containerId-json.log""").getCanonicalFile() |
| } |
| |
| private def parsePsOutput(line: String) = { |
| val tokens = line.split("\\s+") |
| ContainerState(tokens(0), tokens(1), tokens(tokens.length - 1)) |
| } |
| |
| protected def appendPort(host: String) = s"$host:8080" |
| } |
| |
| object ContainerUtils extends Logging { |
| |
| private implicit val emitter: PrintStreamEmitter = this |
| |
| /** |
| * Synchronously runs the given docker command returning stdout if successful. |
| */ |
| def runDockerCmd(dockerhost: String, skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput = { |
| val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args(0))) |
| getDockerCmd(dockerhost) map { _ ++ args } map { |
| SimpleExec.syncRunCmd(_)(transid) |
| } map { |
| case (stdout, stderr, exitCode) => |
| if (exitCode == 0) { |
| transid.finished(this, start) |
| Some(stdout.trim) |
| } else { |
| if (!skipLogError) { |
| transid.failed(this, start, s"stdout:\n$stdout\nstderr:\n$stderr", ErrorLevel) |
| } else { |
| transid.failed(this, start) |
| } |
| None |
| } |
| } getOrElse { |
| transid.failed(this, start, "docker executable not found", ErrorLevel) |
| None |
| } |
| } |
| |
| private def file(path: String) = Try { new File(path) } filter { _.exists } toOption |
| |
| def getDockerCmd(dockerhost: String) = { |
| val dockerLoc = file("/usr/bin/docker") orElse file("/usr/local/bin/docker") |
| if (dockerhost == "localhost") { |
| dockerLoc map { f => Array(f.toString) } |
| } else { |
| dockerLoc map { f => Array(f.toString, "--host", s"tcp://$dockerhost") } |
| } |
| } |
| |
| /** |
| * Pulls container images. |
| */ |
| def pullImage(dockerhost: String, image: String)(implicit transid: TransactionId): DockerOutput = { |
| val cmd = Array("pull", image) |
| runDockerCmd(dockerhost, false, cmd) |
| } |
| |
| } |