blob: 13387f55fc270035986a0681442147ee2645564d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database.test
import java.io.File
import java.time.Instant
import scala.concurrent.duration._
import scala.language.implicitConversions
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import akka.http.scaladsl.model.StatusCodes
import common.StreamLogging
import common.TestUtils
import common.WaitFor
import common.WhiskProperties
import common.WskActorSystem
import spray.json._
import spray.json.DefaultJsonProtocol._
@RunWith(classOf[JUnitRunner])
class ReplicatorTests
extends FlatSpec
with Matchers
with ScalaFutures
with WskActorSystem
with WaitFor
with StreamLogging
with DatabaseScriptTestUtils {
val testDbPrefix = s"replicatortest_$dbPrefix"
val replicatorClient =
new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, "_replicator")
replicatorClient.createDb().futureValue
val replicator = WhiskProperties.getFileRelativeToWhiskHome("tools/db/replicateDbs.py").getAbsolutePath
val designDocPath =
WhiskProperties.getFileRelativeToWhiskHome("ansible/files/filter_design_document.json").getAbsolutePath
implicit def toDuration(dur: FiniteDuration) = java.time.Duration.ofMillis(dur.toMillis)
def toEpochSeconds(i: Instant) = i.toEpochMilli / 1000
/** Removes a document from the _replicator-database */
def removeReplicationDoc(id: String) = {
println(s"Removing replication doc: $id")
val response = replicatorClient.getDoc(id).futureValue
val rev = response.right.toOption.map(_.fields("_rev").convertTo[String])
rev.map(replicatorClient.deleteDoc(id, _).futureValue)
}
/** Runs the replicator script to replicate databases */
def runReplicator(sourceDbUrl: DatabaseUrl,
targetDbUrl: DatabaseUrl,
dbPrefix: String,
expires: FiniteDuration,
continuous: Boolean = false,
exclude: List[String] = List.empty,
excludeBaseName: List[String] = List.empty) = {
println(
s"Running replicator: ${sourceDbUrl.safeUrl}, ${targetDbUrl.safeUrl}, $dbPrefix, $expires, $continuous, $exclude, $excludeBaseName")
val continuousFlag = if (continuous) Some("--continuous") else None
val excludeFlag = Seq(exclude.mkString(",")).filter(_.nonEmpty).flatMap(ex => Seq("--exclude", ex))
val excludeBaseNameFlag =
Seq(excludeBaseName.mkString(",")).filter(_.nonEmpty).flatMap(ex => Seq("--excludeBaseName", ex))
val cmd = Seq(
python,
replicator,
"--sourceDbUrl",
sourceDbUrl.url,
"--targetDbUrl",
targetDbUrl.url,
"replicate",
"--dbPrefix",
dbPrefix,
"--expires",
expires.toSeconds.toString) ++ continuousFlag ++ excludeFlag ++ excludeBaseNameFlag
val rr = TestUtils.runCmd(0, new File("."), cmd: _*)
val Seq(created, deletedDoc, deleted) =
Seq("create backup: ", "deleting backup document: ", "deleting backup: ").map { prefix =>
rr.stdout.linesIterator.collect {
case line if line.startsWith(prefix) => line.replace(prefix, "")
}.toList
}
println(s"Created: $created")
println(s"DeletedDocs: $deletedDoc")
println(s"Deleted: $deleted")
(created, deletedDoc, deleted)
}
/** Runs the replicator script to replay databases */
def runReplay(sourceDbUrl: DatabaseUrl, targetDbUrl: DatabaseUrl, dbPrefix: String) = {
println(s"Running replay: ${sourceDbUrl.safeUrl}, ${targetDbUrl.safeUrl}, $dbPrefix")
val rr = TestUtils.runCmd(
0,
new File("."),
WhiskProperties.python,
WhiskProperties.getFileRelativeToWhiskHome("tools/db/replicateDbs.py").getAbsolutePath,
"--sourceDbUrl",
sourceDbUrl.url,
"--targetDbUrl",
targetDbUrl.url,
"replay",
"--dbPrefix",
dbPrefix)
val line = """([\w-]+) -> ([\w-]+) \(([\w-]+)\)""".r.unanchored
val replays = rr.stdout.linesIterator.collect {
case line(backup, target, id) => (backup, target, id)
}.toList
println(s"Replays created: $replays")
replays
}
/** Wait for a replication to finish */
def waitForReplication(dbName: String) = {
val timeout = 5.minutes
val replicationResult = waitfor(() => {
val replicatorDoc = replicatorClient.getDoc(dbName).futureValue
replicatorDoc shouldBe 'right
val state = replicatorDoc.right.get.fields.get("_replication_state")
println(s"Waiting for replication, state: $state")
state.contains("completed".toJson)
}, totalWait = timeout)
assert(replicationResult, s"replication did not finish in $timeout")
}
/** Compares to databases to full equality */
def compareDatabases(sourceDb: String, targetDb: String, filterUsed: Boolean) = {
val originalDocs = getAllDocs(sourceDb)
val replicatedDocs = getAllDocs(targetDb)
if (!filterUsed) {
replicatedDocs shouldBe originalDocs
} else {
val filteredReplicatedDocs = replicatedDocs.fields("rows").convertTo[List[JsObject]]
val filteredOriginalDocs = originalDocs
.fields("rows")
.convertTo[List[JsObject]]
.filterNot(_.fields("id").convertTo[String].startsWith("_design/"))
filteredReplicatedDocs shouldBe filteredOriginalDocs
}
}
// Do cleanups of possibly existing databases
val dbs = replicatorClient.dbs().futureValue
dbs shouldBe 'right
dbs.right.get.filter(dbname => dbname.contains(testDbPrefix)).map(dbName => removeDatabase(dbName))
val docs = replicatorClient.getAllDocs().futureValue
docs shouldBe 'right
docs.right.get
.fields("rows")
.convertTo[List[JsObject]]
.filter(_.fields("id").convertTo[String].contains(testDbPrefix))
.map(doc =>
replicatorClient
.deleteDoc(doc.fields("id").convertTo[String], doc.fields("value").asJsObject.fields("rev").convertTo[String]))
behavior of "Database replication script"
it should "replicate a database (snapshot)" in {
// Create a database to backup
val dbName = testDbPrefix + "database_for_single_replication"
val client = createDatabase(dbName, Some(designDocPath))
println(s"Creating testdocument")
val testDocument = JsObject("testKey" -> "testValue".toJson)
client.putDoc("testId", testDocument).futureValue
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
createdBackupDbs should have size 1
val backupDbName = createdBackupDbs.head
backupDbName should fullyMatch regex s"backup_\\d+_$dbName"
// Wait for the replication to finish
waitForReplication(backupDbName)
// Verify the replicated database is equal to the original database
compareDatabases(dbName, backupDbName, filterUsed = true)
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbName)
}
it should "do not replicate a database that is excluded" in {
// Create a database to backup
val dbNameToBackup = testDbPrefix + "database_for_single_replication_with_exclude"
val nExClient = createDatabase(dbNameToBackup, Some(designDocPath))
val excludedName = "some_excluded_name"
val exClient = createDatabase(testDbPrefix + excludedName, Some(designDocPath))
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, exclude = List(excludedName))
createdBackupDbs should have size 1
val backupDbName = createdBackupDbs.head
backupDbName should fullyMatch regex s"backup_\\d+_$dbNameToBackup"
// Wait for the replication to finish
waitForReplication(backupDbName)
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbNameToBackup)
removeDatabase(testDbPrefix + excludedName)
}
it should "not replicate a database that basename is excluded" in {
// Create a database to backup
val dbNameToBackup = testDbPrefix + "database_for_single_replication_with_exclude_basename"
createDatabase(dbNameToBackup, Some(designDocPath))
val excludedName = "some_excluded_name"
createDatabase(testDbPrefix + excludedName + "-postfix123", Some(designDocPath))
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) =
runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, excludeBaseName = List(excludedName))
createdBackupDbs should have size 1
val backupDbName = createdBackupDbs.head
backupDbName should fullyMatch regex s"backup_\\d+_$dbNameToBackup"
// Wait for the replication to finish
waitForReplication(backupDbName)
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbNameToBackup)
removeDatabase(testDbPrefix + excludedName + "-postfix123")
}
it should "replicate a database (snapshot) even if the filter is not available" in {
// Create a db to backup
val dbName = testDbPrefix + "database_for_snapshout_without_filter"
val client = createDatabase(dbName, None)
println("Creating testdocuments")
val testDocuments = Seq(
JsObject("testKey" -> "testValue".toJson, "_id" -> "doc1".toJson),
JsObject("testKey" -> "testValue".toJson, "_id" -> "_design/doc1".toJson))
val documents = testDocuments.map { doc =>
val res = client.putDoc(doc.fields("_id").convertTo[String], doc).futureValue
res shouldBe 'right
res.right.get
}
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
createdBackupDbs should have size 1
val backupDbName = createdBackupDbs.head
backupDbName should fullyMatch regex s"backup_\\d+_$dbName"
// Wait for the replication to finish
waitForReplication(backupDbName)
// Verify the replicated database is equal to the original database
compareDatabases(dbName, backupDbName, filterUsed = false)
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbName)
}
it should "replicate a database (snapshot) and deleted documents and design documents should not be in the snapshot" in {
// Create a database to backup
val dbName = testDbPrefix + "database_for_single_replication_design_and_deleted_docs"
val client = createDatabase(dbName, Some(designDocPath))
println(s"Creating testdocument")
val testDocuments = Seq(
JsObject("testKey" -> "testValue".toJson, "_id" -> "doc1".toJson),
JsObject("testKey" -> "testValue".toJson, "_id" -> "doc2".toJson),
JsObject("testKey" -> "testValue".toJson, "_id" -> "_design/doc1".toJson))
val documents = testDocuments.map { doc =>
val res = client.putDoc(doc.fields("_id").convertTo[String], doc).futureValue
res shouldBe 'right
res.right.get
}
// Delete second document again
val indexOfDocumentToDelete = 1
val idOfDeletedDocument = documents(indexOfDocumentToDelete).fields("id").convertTo[String]
client.deleteDoc(idOfDeletedDocument, documents(indexOfDocumentToDelete).fields("rev").convertTo[String])
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
createdBackupDbs should have size 1
val backupDbName = createdBackupDbs.head
backupDbName should fullyMatch regex s"backup_\\d+_$dbName"
// Wait for the replication to finish
waitForReplication(backupDbName)
// Verify the replicated database is equal to the original database
compareDatabases(dbName, backupDbName, filterUsed = true)
// Check that deleted doc has not been copied to snapshot
val snapshotClient =
new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, backupDbName)
val snapshotResponse = snapshotClient.getAllDocs(keys = Some(List(idOfDeletedDocument))).futureValue
snapshotResponse shouldBe 'right
val results = snapshotResponse.right.get.fields("rows").convertTo[List[JsObject]]
results should have size 1
// If deleted doc would be in db, the document id and rev would have been returned
results.head shouldBe JsObject("key" -> idOfDeletedDocument.toJson, "error" -> "not_found".toJson)
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbName)
}
it should "continuously update a database" in {
// Create a database to backup
val dbName = testDbPrefix + "database_for_continuous_replication"
val backupDbName = s"continuous_$dbName"
// Pre-test cleanup of previously created entities
removeDatabase(backupDbName, true)
removeReplicationDoc(backupDbName)
val client = createDatabase(dbName, Some(designDocPath))
// Trigger replication and verify the created databases have the correct format
val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, continuous = true)
createdBackupDbs should have size 1
createdBackupDbs.head shouldBe backupDbName
// Wait for the replicated database to appear
val backupClient = waitForDatabase(backupDbName)
// Create a document in the old database
println(s"Creating testdocument")
val docId = "testId"
val testDocument = JsObject("testKey" -> "testValue".toJson)
client.putDoc(docId, testDocument).futureValue
// Wait for the document to appear
waitForDocument(backupClient, docId)
// Verify the replicated database is equal to the original database
compareDatabases(backupDbName, dbName, filterUsed = false)
// Stop the replication
val replication = replicatorClient.getDoc(backupDbName).futureValue
replication shouldBe 'right
val replicationDoc = replication.right.get
replicatorClient.deleteDoc(
replicationDoc.fields("_id").convertTo[String],
replicationDoc.fields("_rev").convertTo[String])
// Remove all created databases
createdBackupDbs.foreach(removeDatabase(_))
createdBackupDbs.foreach(removeReplicationDoc)
removeDatabase(dbName)
}
it should "remove outdated databases and replicationDocs" in {
val now = Instant.now()
val expires = 10.minutes
println(s"Now is: ${toEpochSeconds(now)}")
// Create a database that is already expired
val expired = now.minus(expires + 5.minutes)
val expiredName = s"backup_${toEpochSeconds(expired)}_${testDbPrefix}expired_backup"
val expiredClient = createDatabase(expiredName, Some(designDocPath))
replicatorClient.putDoc(expiredName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue
// Create a database that is not yet expired
val notExpired = now.plus(expires - 5.minutes)
val notExpiredName = s"backup_${toEpochSeconds(notExpired)}_${testDbPrefix}notexpired_backup"
val notExpiredClient = createDatabase(notExpiredName, Some(designDocPath))
replicatorClient.putDoc(notExpiredName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue
// Trigger replication and verify the expired database is deleted while the unexpired one is kept
val (createdDatabases, deletedReplicationDocs, deletedDatabases) =
runReplicator(dbUrl, dbUrl, testDbPrefix, expires)
deletedReplicationDocs should (contain(expiredName) and not contain notExpiredName)
deletedDatabases should (contain(expiredName) and not contain notExpiredName)
expiredClient.getAllDocs().futureValue shouldBe Left(StatusCodes.NotFound)
notExpiredClient.getAllDocs().futureValue shouldBe 'right
// Cleanup backup database
createdDatabases.foreach(removeDatabase(_))
createdDatabases.foreach(removeReplicationDoc)
removeReplicationDoc(notExpiredName)
removeDatabase(notExpiredName)
}
it should "not remove outdated databases with other prefix" in {
val now = Instant.now()
val expires = 10.minutes
println(s"Now is: ${toEpochSeconds(now)}")
val expired = now.minus(expires + 5.minutes)
// Create a database that is expired with correct prefix
val correctPrefixName = s"backup_${toEpochSeconds(expired)}_${testDbPrefix}expired_backup_correct_prefix"
val correctPrefixClient = createDatabase(correctPrefixName, Some(designDocPath))
replicatorClient.putDoc(correctPrefixName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue
// Create a database that is expired with wrong prefix
val wrongPrefix = s"replicatortest_wrongprefix_$dbPrefix"
val wrongPrefixName = s"backup_${toEpochSeconds(expired)}_${wrongPrefix}expired_backup_wrong_prefix"
val wrongPrefixClient = createDatabase(wrongPrefixName, Some(designDocPath))
replicatorClient.putDoc(wrongPrefixName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue
// Trigger replication and verify the expired database with correct prefix is deleted while the db with the wrong prefix is kept
val (createdDatabases, deletedReplicationDocs, deletedDatabases) =
runReplicator(dbUrl, dbUrl, testDbPrefix, expires)
deletedReplicationDocs should (contain(correctPrefixName) and not contain wrongPrefixName)
deletedDatabases should (contain(correctPrefixName) and not contain wrongPrefixName)
correctPrefixClient.getAllDocs().futureValue shouldBe Left(StatusCodes.NotFound)
wrongPrefixClient.getAllDocs().futureValue shouldBe 'right
// Cleanup backup database
createdDatabases.foreach(removeDatabase(_))
createdDatabases.foreach(removeReplicationDoc)
removeReplicationDoc(wrongPrefixName)
removeDatabase(wrongPrefixName)
}
it should "replay a database" in {
val now = Instant.now()
val dbName = testDbPrefix + "database_to_be_restored"
val backupPrefix = s"backup_${toEpochSeconds(now)}_"
val backupDbName = backupPrefix + dbName
// Create a database that looks like a backup
val backupClient = createDatabase(backupDbName, Some(designDocPath))
println(s"Creating testdocument")
backupClient.putDoc("testId", JsObject("testKey" -> "testValue".toJson)).futureValue
// Run the replay script
val (_, _, replicationId) = runReplay(dbUrl, dbUrl, backupPrefix).head
// Wait for the replication to finish
waitForReplication(replicationId)
// Verify the replicated database is equal to the original database
compareDatabases(backupDbName, dbName, filterUsed = false)
// Cleanup databases
removeReplicationDoc(replicationId)
removeDatabase(backupDbName)
removeDatabase(dbName)
}
}