blob: 5a54f95a6c9c400dfd79a5e15970a0a26470e6a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.standalone
import java.io.File
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.{Accept, Authorization, BasicHttpCredentials}
import akka.http.scaladsl.model.{HttpHeader, HttpMethods, HttpRequest, MediaTypes, StatusCode, StatusCodes, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.IOUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.database.CouchDbRestClient
import org.apache.openwhisk.http.PoolingRestClient
import org.apache.openwhisk.http.PoolingRestClient._
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{containerName, createRunCmd}
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.concurrent.{ExecutionContext, Future}
class CouchDBLauncher(docker: StandaloneDockerClient, port: Int, dataDir: File)(implicit logging: Logging,
ec: ExecutionContext,
actorSystem: ActorSystem,
tid: TransactionId) {
case class CouchDBConfig(image: String,
user: String,
password: String,
prefix: String,
volumesEnabled: Boolean,
subjectViews: List[String],
whiskViews: List[String],
activationViews: List[String])
private val dbConfig = loadConfigOrThrow[CouchDBConfig](StandaloneConfigKeys.couchDBConfigKey)
private val couchClient = new PoolingRestClient("http", StandaloneDockerSupport.getLocalHostName(), port, 100)
private val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(dbConfig.user, dbConfig.password)), Accept(MediaTypes.`application/json`))
private val subjectDb = dbConfig.prefix + "subjects"
private val activationsDb = dbConfig.prefix + "activations"
private val whisksDb = dbConfig.prefix + "whisks"
private val resourcePrefix = "couch"
def run(): Future[ServiceContainer] = {
if (dataDir.list().nonEmpty) {
logging.info(this, s"Using pre-existing database from ${dataDir.getAbsolutePath}")
} else {
logging.info(this, s"Creating new database at ${dataDir.getAbsolutePath}")
}
for {
(_, dbSvcs) <- runCouch()
_ <- waitForCouchDB()
_ <- createDbIfNotExist("_users")
_ <- createDbWithViews(subjectDb, dbConfig.subjectViews)
_ <- createDbWithViews(whisksDb, dbConfig.whiskViews)
_ <- createDbWithViews(activationsDb, dbConfig.activationViews)
_ <- {
updateConfig()
logging.info(
this,
s"CouchDB started successfully at http://${StandaloneDockerSupport
.getLocalHostName()}:$port/_utils . Username: [${dbConfig.user}], Password: [${dbConfig.password}]")
Future.successful(Done)
}
} yield dbSvcs
}
def runCouch(): Future[(StandaloneDockerContainer, ServiceContainer)] = {
logging.info(this, s"Starting CouchDB at $port")
val baseParams = Map("-p" -> Set(s"$port:5984"))
val params =
if (dbConfig.volumesEnabled) baseParams + ("-v" -> Set(s"${dataDir.getAbsolutePath}:/opt/couchdb/data"))
else baseParams
val env = Map("COUCHDB_USER" -> dbConfig.user, "COUCHDB_PASSWORD" -> dbConfig.password)
val name = containerName("couch")
val args = createRunCmd(name, env, params)
val f = docker.runDetached(dbConfig.image, args, shouldPull = true)
val sc = ServiceContainer(
port,
s"http://localhost:$port/_utils",
s"$name, Username: [${dbConfig.user}], Password: [${dbConfig.password}]")
f.map(c => (c, sc))
}
def waitForCouchDB(): Future[Done] = {
new ServerStartupCheck(Uri(s"http://${StandaloneDockerSupport.getLocalHostName()}:$port/_utils/"), "CouchDB")
.waitForServerToStart()
Future.successful(Done)
}
private def createDbWithViews(dbName: String, views: List[String]): Future[Done] = {
for {
_ <- createDbIfNotExist(dbName)
_ <- createDocs(views, dbName)
} yield Done
}
private def createDbIfNotExist(dbName: String): Future[Done] = {
for {
userDbExist <- doesDbExist(dbName)
_ <- if (userDbExist) Future.successful(Done) else createDb(dbName)
} yield Done
}
private def doesDbExist(dbName: String): Future[Boolean] = {
requestString(mkRequest(HttpMethods.HEAD, uri(dbName), headers = baseHeaders))
.map {
case Right(_) => true
case Left(StatusCodes.NotFound) => false
case Left(s) => throw new IllegalStateException("Unknown status code while checking user db" + s)
}
}
private def createDb(dbName: String): Future[Done] = {
requestString(mkRequest(HttpMethods.PUT, uri(dbName), headers = baseHeaders))
.map {
case Right(_) => Done
case Left(s) => throw new IllegalStateException(("Unknown status code while creating user db" + s))
}
}
private def createDocs(jsonFiles: List[String], dbName: String): Future[Done] = {
val client = createDbClient(dbName)
val f = jsonFiles
.map { p =>
val s = IOUtils.resourceToString(s"/$resourcePrefix/$p", UTF_8)
val js = s.parseJson.asJsObject
logging.info(this, s"Creating view doc from file $p for db $dbName")
createDocIfRequired(js, dbName, client)
}
Future.sequence(f).map(_ => client.shutdown()).map(_ => Done)
}
private def createDocIfRequired(doc: JsObject, dbName: String, client: CouchDbRestClient): Future[Done] = {
val id = doc.fields("_id").convertTo[String]
for {
jsOpt <- getDoc(id, client)
_ <- jsOpt match {
case Some(js) => {
val rev = js.fields("_rev").convertTo[String]
val docWithRev = JsObject(doc.fields + ("_rev" -> JsString(rev)))
if (docWithRev != js) {
logging.info(this, s"Updating doc $id for db ${dbName}")
createDoc(id, Some(rev), doc, client)
} else Future.successful(Done)
}
case None => {
logging.info(this, s"Creating doc $id for db ${dbName}")
createDoc(id, None, doc, client)
}
}
} yield Done
}
private def getDoc(id: String, client: CouchDbRestClient): Future[Option[JsObject]] = {
client.getDoc(id).map {
case Right(js) => Some(js)
case Left(StatusCodes.NotFound) => None
case Left(s) => throw new IllegalStateException(s"Unknown status code while fetching doc $id" + s)
}
}
private def createDoc(id: String, rev: Option[String], doc: JsObject, client: CouchDbRestClient): Future[Done] = {
val f = rev match {
case Some(r) => client.putDoc(id, r, doc)
case None => client.putDoc(id, doc)
}
f.map {
case Right(_) => Done
case Left(s) => throw new IllegalStateException(s"Unknown status code while creating doc $id" + s)
}
}
// Properly encodes the potential slashes in each segment.
private def uri(segments: Any*): Uri = {
val encodedSegments = segments.map(s => URLEncoder.encode(s.toString, UTF_8.name))
Uri(s"/${encodedSegments.mkString("/")}")
}
private def createDbClient(dbName: String) =
new NonEscapingClient(
"http",
StandaloneDockerSupport.getLocalHostName(),
port,
dbConfig.user,
dbConfig.password,
dbName)(actorSystem, logging)
private def updateConfig(): Unit = {
//The config needs to pushed via system property and then the Typesafe ConfigFactory cache
//should be purged such that config gets read again and hence read these system properties
setp("host", StandaloneDockerSupport.getLocalHostName())
setp("port", port.toString)
setp("password", dbConfig.password)
setp("username", dbConfig.user)
setp("protocol", "http")
setp("provider", "CouchDB")
setp("databases.WhiskActivation", activationsDb)
setp("databases.WhiskAuth", subjectDb)
setp("databases.WhiskEntity", whisksDb)
System.setProperty(s"whisk.spi.ArtifactStoreProvider", "org.apache.openwhisk.core.database.CouchDbStoreProvider")
ConfigFactory.invalidateCaches()
logging.info(this, "Invalidated config cached")
}
private def setp(key: String, value: String): Unit = {
System.setProperty(s"whisk.couchdb.$key", value)
}
/**
* This is similar to PoolingRestClient#requestJson just that here we materialize to String. As some of the db
* related operation return with empty body
*/
private def requestString(futureRequest: Future[HttpRequest]): Future[Either[StatusCode, String]] = {
couchClient.request(futureRequest).flatMap { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity.withoutSizeLimit).to[String].map(Right.apply)
} else {
Unmarshal(response.entity).to[String].flatMap { body =>
val statusCode = response.status
val reason =
if (body.nonEmpty) s"${statusCode.reason} (details: $body)" else statusCode.reason
val customStatusCode = StatusCodes
.custom(intValue = statusCode.intValue, reason = reason, defaultMessage = statusCode.defaultMessage)
// This is important, as it drains the entity stream.
// Otherwise the connection stays open and the pool dries up.
response.discardEntityBytes().future.map(_ => Left(customStatusCode))
}
}
}
}
}
private class NonEscapingClient(protocol: String,
host: String,
port: Int,
username: String,
password: String,
db: String)(implicit system: ActorSystem, logging: Logging)
extends CouchDbRestClient(protocol, host, port, username, password, db) {
//Do not escape the design doc id like _design/subjects etc
override protected def uri(segments: Any*): Uri = Uri(s"/${segments.mkString("/")}")
}