blob: be1045933b6d9c567325a1ae1314abd3f9bbe323 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database
import akka.actor.ActorSystem
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import spray.json._
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.database.StoreUtils._
import org.apache.openwhisk.core.entity.Attachments.Attached
import org.apache.openwhisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID}
import org.apache.openwhisk.http.Messages
import scala.concurrent.Future
import scala.util.Try
/**
* Basic client to put and delete artifacts in a data store.
*
* @param dbProtocol the protocol to access the database with (http/https)
* @param dbHost the host to access database from
* @param dbPort the port on the host
* @param dbUserName the user name to access database as
* @param dbPassword the secret for the user name required to access the database
* @param dbName the name of the database to operate on
* @param serializerEvidence confirms the document abstraction is serializable to a Document with an id
*/
class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: String,
dbHost: String,
dbPort: Int,
dbUsername: String,
dbPassword: String,
dbName: String,
useBatching: Boolean = false,
val inliningConfig: InliningConfig,
val attachmentStore: Option[AttachmentStore])(
implicit system: ActorSystem,
val logging: Logging,
jsonFormat: RootJsonFormat[DocumentAbstraction],
docReader: DocumentReader)
extends ArtifactStore[DocumentAbstraction]
with DefaultJsonProtocol
with AttachmentSupport[DocumentAbstraction] {
protected[core] implicit val executionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
private val couchScheme = "couch"
val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(couchScheme)
private val client: CouchDbRestClient =
new CouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, dbName)
// This the the amount of allowed parallel requests for each entity, before batching starts. If there are already maxOpenDbRequests
// and more documents need to be stored, then all arriving documents will be put into batches (if enabled) to avoid a long queue.
private val maxOpenDbRequests = system.settings.config.getInt("akka.http.host-connection-pool.max-connections") / 2
private val batcher: Batcher[JsObject, Either[ArtifactStoreException, DocInfo]] =
new Batcher(500, maxOpenDbRequests)(put(_)(TransactionId.dbBatcher))
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
val asJson = d.toDocumentRecord
val id: String = asJson.fields("_id").convertTo[String].trim
val rev: Option[String] = asJson.fields.get("_rev").map(_.convertTo[String])
require(!id.isEmpty, "document id must be defined")
val docinfoStr = s"id: $id, rev: ${rev.getOrElse("null")}"
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$dbName' saving document: '${docinfoStr}'")
val f = if (useBatching) {
batcher.put(asJson).map { e =>
e match {
case Right(response) =>
transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
response
case Left(e: DocumentConflictException) =>
transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
// For compatibility.
throw DocumentConflictException("conflict on 'put'")
case Left(e: ArtifactStoreException) =>
transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; ${e.getMessage}.")
throw PutException("error on 'put'")
}
}
} else {
val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = rev match {
case Some(r) =>
client =>
client.putDoc(id, r, asJson)
case None =>
client =>
client.putDoc(id, asJson)
}
request(client).map {
case Right(response) =>
transid.finished(this, start, s"[PUT] '$dbName' completed document: '${docinfoStr}', response: '$response'")
response.convertTo[DocInfo]
case Left(StatusCodes.Conflict) =>
transid.finished(this, start, s"[PUT] '$dbName', document: '${docinfoStr}'; conflict.")
// For compatibility.
throw DocumentConflictException("conflict on 'put'")
case Left(code) =>
transid.failed(
this,
start,
s"[PUT] '$dbName' failed to put document: '${docinfoStr}'; http status: '${code}'",
ErrorLevel)
throw new PutException("Unexpected http response code: " + code)
}
}
reportFailure(
f,
failure =>
transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
}
private def put(ds: Seq[JsObject])(
implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
val count = ds.size
val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
MetricEmitter.emitHistogramMetric(LoggingMarkers.DATABASE_BATCH_SIZE, ds.size)
val f = client.putDocs(ds).map {
_ match {
case Right(response) =>
transid.finished(this, start, s"'$dbName' completed $count documents")
response.convertTo[Seq[BulkEntityResult]].map { singleResult =>
singleResult.error
.map {
case "conflict" => Left(DocumentConflictException("conflict on 'bulk_put'"))
case e => Left(PutException(s"Unexpected $e: ${singleResult.reason.getOrElse("")} on 'bulk_put'"))
}
.getOrElse {
Right(singleResult.toDocInfo)
}
}
case Left(code) =>
transid.failed(this, start, s"'$dbName' failed to put documents, http status: '${code}'", ErrorLevel)
throw new PutException("Unexpected http response code: " + code)
}
}
reportFailure(
f,
failure =>
transid.failed(this, start, s"[PUT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
}
override protected[database] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = {
require(doc != null && doc.rev.asString != null, "doc revision required for delete")
val start = transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] '$dbName' deleting document: '$doc'")
val f = client.deleteDoc(doc.id.id, doc.rev.rev).map { e =>
e match {
case Right(response) =>
transid.finished(this, start, s"[DEL] '$dbName' completed document: '$doc', response: $response")
response.fields("ok").convertTo[Boolean]
case Left(StatusCodes.NotFound) =>
transid.finished(this, start, s"[DEL] '$dbName', document: '${doc}'; not found.")
// for compatibility
throw NoDocumentException("not found on 'delete'")
case Left(StatusCodes.Conflict) =>
transid.finished(this, start, s"[DEL] '$dbName', document: '${doc}'; conflict.")
throw DocumentConflictException("conflict on 'delete'")
case Left(code) =>
transid.failed(
this,
start,
s"[DEL] '$dbName' failed to delete document: '${doc}'; http status: '${code}'",
ErrorLevel)
throw new DeleteException("Unexpected http response code: " + code)
}
}
reportFailure(
f,
failure =>
transid.failed(
this,
start,
s"[DEL] '$dbName' internal error, doc: '$doc', failure: '${failure.getMessage}'",
ErrorLevel))
}
override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
attachmentHandler: Option[(A, Attached) => A] = None)(
implicit transid: TransactionId,
ma: Manifest[A]): Future[A] = {
val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$dbName' finding document: '$doc'")
require(doc != null, "doc undefined")
val request: CouchDbRestClient => Future[Either[StatusCode, JsObject]] = if (doc.rev.rev != null) { client =>
client.getDoc(doc.id.id, doc.rev.rev)
} else { client =>
client.getDoc(doc.id.id)
}
val f = request(client).map { e =>
e match {
case Right(response) =>
transid.finished(this, start, s"[GET] '$dbName' completed: found document '$doc'")
val deserializedDoc = deserialize[A, DocumentAbstraction](doc, response)
attachmentHandler.map(processAttachments(deserializedDoc, response, _)).getOrElse(deserializedDoc)
case Left(StatusCodes.NotFound) =>
transid.finished(this, start, s"[GET] '$dbName', document: '${doc}'; not found.")
// for compatibility
throw NoDocumentException("not found on 'get'")
case Left(code) =>
transid.failed(this, start, s"[GET] '$dbName' failed to get document: '${doc}'; http status: '${code}'")
throw new GetException("Unexpected http response code: " + code)
}
} recoverWith {
case e: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity)
}
reportFailure(
f,
failure =>
transid.failed(
this,
start,
s"[GET] '$dbName' internal error, doc: '$doc', failure: '${failure.getMessage}'",
ErrorLevel))
}
override protected[core] def query(table: String,
startKey: List[Any],
endKey: List[Any],
skip: Int,
limit: Int,
includeDocs: Boolean,
descending: Boolean,
reduce: Boolean,
stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = {
require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true")
require(skip >= 0, "skip should be non negative")
require(limit >= 0, "limit should be non negative")
// Apparently you have to do that in addition to setting "descending"
val (realStartKey, realEndKey) = if (descending) {
(endKey, startKey)
} else {
(startKey, endKey)
}
val Array(firstPart, secondPart) = table.split("/")
val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$dbName' searching '$table")
val f = client
.executeView(firstPart, secondPart)(
startKey = realStartKey,
endKey = realEndKey,
skip = Some(skip),
limit = Some(limit),
stale = stale,
includeDocs = includeDocs,
descending = descending,
reduce = reduce)
.map {
case Right(response) =>
val rows = response.fields("rows").convertTo[List[JsObject]]
val out = if (reduce && !rows.isEmpty) {
assert(rows.length == 1, s"result of reduced view contains more than one value: '$rows'")
rows.head.fields("value").convertTo[List[JsObject]]
} else if (reduce) {
List(JsObject.empty)
} else {
rows
}
transid.finished(this, start, s"[QUERY] '$dbName' completed: matched ${out.size}")
out
case Left(code) =>
transid.failed(this, start, s"Unexpected http response code: $code", ErrorLevel)
throw new QueryException("Unexpected http response code: " + code)
}
reportFailure(
f,
failure =>
transid.failed(this, start, s"[QUERY] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
}
protected[core] def count(table: String, startKey: List[Any], endKey: List[Any], skip: Int, stale: StaleParameter)(
implicit transid: TransactionId): Future[Long] = {
require(skip >= 0, "skip should be non negative")
val Array(firstPart, secondPart) = table.split("/")
val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] '$dbName' searching '$table")
val f = client
.executeView(firstPart, secondPart)(startKey = startKey, endKey = endKey, stale = stale, reduce = true)
.map {
case Right(response) =>
val rows = response.fields("rows").convertTo[List[JsObject]]
val out = if (rows.nonEmpty) {
assert(rows.length == 1, s"result of reduced view contains more than one value: '$rows'")
val count = rows.head.fields("value").convertTo[Long]
if (count > skip) count - skip else 0L
} else 0L
transid.finished(this, start, s"[COUNT] '$dbName' completed: count $out")
out
case Left(code) =>
transid.failed(this, start, s"Unexpected http response code: $code", ErrorLevel)
throw new QueryException("Unexpected http response code: " + code)
}
reportFailure(
f,
failure =>
transid.failed(this, start, s"[COUNT] '$dbName' internal error, failure: '${failure.getMessage}'", ErrorLevel))
}
override protected[database] def putAndAttach[A <: DocumentAbstraction](
doc: A,
update: (A, Attached) => A,
contentType: ContentType,
docStream: Source[ByteString, _],
oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
attachmentStore match {
case Some(as) =>
attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as)
case None =>
attachToCouch(doc, update, contentType, docStream)
}
}
private def attachToCouch[A <: DocumentAbstraction](
doc: A,
update: (A, Attached) => A,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {
if (maxInlineSize.toBytes == 0) {
val uri = uriFrom(scheme = attachmentScheme, path = UUID().asString)
for {
attached <- Future.successful(Attached(uri.toString, contentType))
i1 <- put(update(doc, attached))
i2 <- attach(i1, uri.path.toString, attached.attachmentType, docStream)
} yield (i2, attached)
} else {
for {
bytesOrSource <- inlineOrAttach(docStream)
uri = uriOf(bytesOrSource, UUID().asString)
attached <- {
val a = bytesOrSource match {
case Left(bytes) => Attached(uri.toString, contentType, Some(bytes.size), Some(digest(bytes)))
case Right(_) => Attached(uri.toString, contentType)
}
Future.successful(a)
}
i1 <- put(update(doc, attached))
i2 <- bytesOrSource match {
case Left(_) => Future.successful(i1)
case Right(s) => attach(i1, uri.path.toString, attached.attachmentType, s)
}
} yield (i2, attached)
}
}
private def attach(doc: DocInfo, name: String, contentType: ContentType, docStream: Source[ByteString, _])(
implicit transid: TransactionId): Future[DocInfo] = {
val start = transid.started(
this,
LoggingMarkers.DATABASE_ATT_SAVE,
s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'")
require(doc != null, "doc undefined")
require(doc.rev.rev != null, "doc revision must be specified")
val f = client.putAttachment(doc.id.id, doc.rev.rev, name, contentType, docStream).map { e =>
e match {
case Right(response) =>
transid
.finished(this, start, s"[ATT_PUT] '$dbName' completed uploading attachment '$name' of document '$doc'")
val id = response.fields("id").convertTo[String]
val rev = response.fields("rev").convertTo[String]
DocInfo ! (id, rev)
case Left(StatusCodes.NotFound) =>
transid
.finished(this, start, s"[ATT_PUT] '$dbName' uploading attachment '$name' of document '$doc'; not found")
throw NoDocumentException("Not found on 'readAttachment'.")
case Left(code) =>
transid.failed(
this,
start,
s"[ATT_PUT] '$dbName' failed to upload attachment '$name' of document '$doc'; http status '$code'")
throw new PutException("Unexpected http response code: " + code)
}
}
reportFailure(
f,
failure =>
transid.failed(
this,
start,
s"[ATT_PUT] '$dbName' internal error, name: '$name', doc: '$doc', failure: '${failure.getMessage}'",
ErrorLevel))
}
override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
val name = attached.attachmentName
val attachmentUri = Uri(name)
attachmentUri.scheme match {
case AttachmentSupport.MemScheme =>
memorySource(attachmentUri).runWith(sink)
case s if s == couchScheme || attachmentUri.isRelative =>
//relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile'
//Compared to current approach of '<scheme>:<name>'
readAttachmentFromCouch(doc, attachmentUri, sink)
case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink)
case _ =>
throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri")
}
}
private def readAttachmentFromCouch[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
val name = attachmentUri.path
val start = transid.started(
this,
LoggingMarkers.DATABASE_ATT_GET,
s"[ATT_GET] '$dbName' finding attachment '$name' of document '$doc'")
require(doc != null, "doc undefined")
require(doc.rev.rev != null, "doc revision must be specified")
val g =
client
.getAttachment[T](doc.id.id, doc.rev.rev, attachmentUri.path.toString, sink)
.map {
case Right((_, result)) =>
transid.finished(this, start, s"[ATT_GET] '$dbName' completed: found attachment '$name' of document '$doc'")
result
case Left(StatusCodes.NotFound) =>
transid.finished(
this,
start,
s"[ATT_GET] '$dbName', retrieving attachment '$name' of document '$doc'; not found.")
throw NoDocumentException("Not found on 'readAttachment'.")
case Left(code) =>
transid.failed(
this,
start,
s"[ATT_GET] '$dbName' failed to get attachment '$name' of document '$doc'; http status: '$code'")
throw new GetException("Unexpected http response code: " + code)
}
reportFailure(
g,
failure =>
transid.failed(
this,
start,
s"[ATT_GET] '$dbName' internal error, name: '$name', doc: '$doc', failure: '${failure.getMessage}'",
ErrorLevel))
}
override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] =
attachmentStore
.map(as => as.deleteAttachments(doc.id))
.getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted.
override def shutdown(): Unit = {
Await.result(client.shutdown(), 30.seconds)
attachmentStore.foreach(_.shutdown())
}
private def processAttachments[A <: DocumentAbstraction](doc: A,
js: JsObject,
attachmentHandler: (A, Attached) => A): A = {
js.fields
.get("_attachments")
.map {
case JsObject(fields) if fields.size == 1 =>
val (name, value) = fields.head
value.asJsObject.getFields("content_type", "digest", "length") match {
case Seq(JsString(contentTypeValue), JsString(digest), JsNumber(length)) =>
val contentType = ContentType.parse(contentTypeValue) match {
case Right(ct) => ct
case Left(_) => ContentTypes.NoContentType //Should not happen
}
attachmentHandler(
doc,
Attached(getAttachmentName(name), contentType, Some(length.longValue), Some(digest)))
case x =>
throw DeserializationException("Attachment json does not have required fields" + x)
}
case x => throw DeserializationException("Multiple attachments found" + x)
}
.getOrElse(doc)
}
/**
* Determines if the attachment scheme confirms to new UUID based scheme or not
* and generates the name based on that
*/
private def getAttachmentName(name: String): String = {
Try(java.util.UUID.fromString(name))
.map(_ => uriFrom(scheme = attachmentScheme, path = name).toString)
.getOrElse(name)
}
private def reportFailure[T, U](f: Future[T], onFailure: Throwable => U): Future[T] = {
f.failed.foreach {
case _: ArtifactStoreException => // These failures are intentional and shouldn't trigger the catcher.
case x => onFailure(x)
}
f
}
}