/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.openwhisk.core.database.test

import java.io.File
import java.time.Instant

import scala.concurrent.duration._
import scala.language.implicitConversions

import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner

import akka.http.scaladsl.model.StatusCodes
import common.StreamLogging
import common.TestUtils
import common.WaitFor
import common.WhiskProperties
import common.WskActorSystem
import spray.json._
import spray.json.DefaultJsonProtocol._

@RunWith(classOf[JUnitRunner])
class ReplicatorTests
    extends FlatSpec
    with Matchers
    with ScalaFutures
    with WskActorSystem
    with WaitFor
    with StreamLogging
    with DatabaseScriptTestUtils {

  val testDbPrefix = s"replicatortest_$dbPrefix"

  val replicatorClient =
    new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, "_replicator")
  replicatorClient.createDb().futureValue

  val replicator = WhiskProperties.getFileRelativeToWhiskHome("tools/db/replicateDbs.py").getAbsolutePath
  val designDocPath =
    WhiskProperties.getFileRelativeToWhiskHome("ansible/files/filter_design_document.json").getAbsolutePath

  implicit def toDuration(dur: FiniteDuration) = java.time.Duration.ofMillis(dur.toMillis)
  def toEpochSeconds(i: Instant) = i.toEpochMilli / 1000

  /** Removes a document from the _replicator-database */
  def removeReplicationDoc(id: String) = {
    println(s"Removing replication doc: $id")
    val response = replicatorClient.getDoc(id).futureValue
    val rev = response.right.toOption.map(_.fields("_rev").convertTo[String])
    rev.map(replicatorClient.deleteDoc(id, _).futureValue)
  }

  /** Runs the replicator script to replicate databases */
  def runReplicator(sourceDbUrl: DatabaseUrl,
                    targetDbUrl: DatabaseUrl,
                    dbPrefix: String,
                    expires: FiniteDuration,
                    continuous: Boolean = false,
                    exclude: List[String] = List.empty,
                    excludeBaseName: List[String] = List.empty) = {
    println(
      s"Running replicator: ${sourceDbUrl.safeUrl}, ${targetDbUrl.safeUrl}, $dbPrefix, $expires, $continuous, $exclude, $excludeBaseName")

    val continuousFlag = if (continuous) Some("--continuous") else None
    val excludeFlag = Seq(exclude.mkString(",")).filter(_.nonEmpty).flatMap(ex => Seq("--exclude", ex))
    val excludeBaseNameFlag =
      Seq(excludeBaseName.mkString(",")).filter(_.nonEmpty).flatMap(ex => Seq("--excludeBaseName", ex))
    val cmd = Seq(
      python,
      replicator,
      "--sourceDbUrl",
      sourceDbUrl.url,
      "--targetDbUrl",
      targetDbUrl.url,
      "replicate",
      "--dbPrefix",
      dbPrefix,
      "--expires",
      expires.toSeconds.toString) ++ continuousFlag ++ excludeFlag ++ excludeBaseNameFlag
    val rr = TestUtils.runCmd(0, new File("."), cmd: _*)

    val Seq(created, deletedDoc, deleted) =
      Seq("create backup: ", "deleting backup document: ", "deleting backup: ").map { prefix =>
        rr.stdout.linesIterator.collect {
          case line if line.startsWith(prefix) => line.replace(prefix, "")
        }.toList
      }

    println(s"Created: $created")
    println(s"DeletedDocs: $deletedDoc")
    println(s"Deleted: $deleted")

    (created, deletedDoc, deleted)
  }

  /** Runs the replicator script to replay databases */
  def runReplay(sourceDbUrl: DatabaseUrl, targetDbUrl: DatabaseUrl, dbPrefix: String) = {
    println(s"Running replay: ${sourceDbUrl.safeUrl}, ${targetDbUrl.safeUrl}, $dbPrefix")
    val rr = TestUtils.runCmd(
      0,
      new File("."),
      WhiskProperties.python,
      WhiskProperties.getFileRelativeToWhiskHome("tools/db/replicateDbs.py").getAbsolutePath,
      "--sourceDbUrl",
      sourceDbUrl.url,
      "--targetDbUrl",
      targetDbUrl.url,
      "replay",
      "--dbPrefix",
      dbPrefix)

    val line = """([\w-]+) -> ([\w-]+) \(([\w-]+)\)""".r.unanchored
    val replays = rr.stdout.linesIterator.collect {
      case line(backup, target, id) => (backup, target, id)
    }.toList

    println(s"Replays created: $replays")

    replays
  }

  /** Wait for a replication to finish */
  def waitForReplication(dbName: String) = {
    val timeout = 5.minutes
    val replicationResult = waitfor(() => {
      val replicatorDoc = replicatorClient.getDoc(dbName).futureValue
      replicatorDoc shouldBe 'right

      val state = replicatorDoc.right.get.fields.get("_replication_state")
      println(s"Waiting for replication, state: $state")

      state.contains("completed".toJson)
    }, totalWait = timeout)

    assert(replicationResult, s"replication did not finish in $timeout")
  }

  /** Compares to databases to full equality */
  def compareDatabases(sourceDb: String, targetDb: String, filterUsed: Boolean) = {
    val originalDocs = getAllDocs(sourceDb)
    val replicatedDocs = getAllDocs(targetDb)

    if (!filterUsed) {
      replicatedDocs shouldBe originalDocs
    } else {
      val filteredReplicatedDocs = replicatedDocs.fields("rows").convertTo[List[JsObject]]
      val filteredOriginalDocs = originalDocs
        .fields("rows")
        .convertTo[List[JsObject]]
        .filterNot(_.fields("id").convertTo[String].startsWith("_design/"))

      filteredReplicatedDocs shouldBe filteredOriginalDocs
    }
  }

  // Do cleanups of possibly existing databases
  val dbs = replicatorClient.dbs().futureValue
  dbs shouldBe 'right
  dbs.right.get.filter(dbname => dbname.contains(testDbPrefix)).map(dbName => removeDatabase(dbName))

  val docs = replicatorClient.getAllDocs().futureValue
  docs shouldBe 'right
  docs.right.get
    .fields("rows")
    .convertTo[List[JsObject]]
    .filter(_.fields("id").convertTo[String].contains(testDbPrefix))
    .map(doc =>
      replicatorClient
        .deleteDoc(doc.fields("id").convertTo[String], doc.fields("value").asJsObject.fields("rev").convertTo[String]))

  behavior of "Database replication script"

  it should "replicate a database (snapshot)" in {
    // Create a database to backup
    val dbName = testDbPrefix + "database_for_single_replication"
    val client = createDatabase(dbName, Some(designDocPath))

    println(s"Creating testdocument")
    val testDocument = JsObject("testKey" -> "testValue".toJson)
    client.putDoc("testId", testDocument).futureValue

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
    createdBackupDbs should have size 1
    val backupDbName = createdBackupDbs.head
    backupDbName should fullyMatch regex s"backup_\\d+_$dbName"

    // Wait for the replication to finish
    waitForReplication(backupDbName)

    // Verify the replicated database is equal to the original database
    compareDatabases(dbName, backupDbName, filterUsed = true)

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbName)
  }

  it should "do not replicate a database that is excluded" in {
    // Create a database to backup
    val dbNameToBackup = testDbPrefix + "database_for_single_replication_with_exclude"
    val nExClient = createDatabase(dbNameToBackup, Some(designDocPath))

    val excludedName = "some_excluded_name"
    val exClient = createDatabase(testDbPrefix + excludedName, Some(designDocPath))

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, exclude = List(excludedName))
    createdBackupDbs should have size 1
    val backupDbName = createdBackupDbs.head
    backupDbName should fullyMatch regex s"backup_\\d+_$dbNameToBackup"

    // Wait for the replication to finish
    waitForReplication(backupDbName)

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbNameToBackup)
    removeDatabase(testDbPrefix + excludedName)
  }

  it should "not replicate a database that basename is excluded" in {
    // Create a database to backup
    val dbNameToBackup = testDbPrefix + "database_for_single_replication_with_exclude_basename"
    createDatabase(dbNameToBackup, Some(designDocPath))

    val excludedName = "some_excluded_name"
    createDatabase(testDbPrefix + excludedName + "-postfix123", Some(designDocPath))

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) =
      runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, excludeBaseName = List(excludedName))
    createdBackupDbs should have size 1
    val backupDbName = createdBackupDbs.head
    backupDbName should fullyMatch regex s"backup_\\d+_$dbNameToBackup"

    // Wait for the replication to finish
    waitForReplication(backupDbName)

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbNameToBackup)
    removeDatabase(testDbPrefix + excludedName + "-postfix123")
  }

  it should "replicate a database (snapshot) even if the filter is not available" in {
    // Create a db to backup
    val dbName = testDbPrefix + "database_for_snapshout_without_filter"
    val client = createDatabase(dbName, None)

    println("Creating testdocuments")
    val testDocuments = Seq(
      JsObject("testKey" -> "testValue".toJson, "_id" -> "doc1".toJson),
      JsObject("testKey" -> "testValue".toJson, "_id" -> "_design/doc1".toJson))
    val documents = testDocuments.map { doc =>
      val res = client.putDoc(doc.fields("_id").convertTo[String], doc).futureValue
      res shouldBe 'right
      res.right.get
    }

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
    createdBackupDbs should have size 1
    val backupDbName = createdBackupDbs.head
    backupDbName should fullyMatch regex s"backup_\\d+_$dbName"

    // Wait for the replication to finish
    waitForReplication(backupDbName)

    // Verify the replicated database is equal to the original database
    compareDatabases(dbName, backupDbName, filterUsed = false)

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbName)
  }

  it should "replicate a database (snapshot) and deleted documents and design documents should not be in the snapshot" in {
    // Create a database to backup
    val dbName = testDbPrefix + "database_for_single_replication_design_and_deleted_docs"
    val client = createDatabase(dbName, Some(designDocPath))

    println(s"Creating testdocument")
    val testDocuments = Seq(
      JsObject("testKey" -> "testValue".toJson, "_id" -> "doc1".toJson),
      JsObject("testKey" -> "testValue".toJson, "_id" -> "doc2".toJson),
      JsObject("testKey" -> "testValue".toJson, "_id" -> "_design/doc1".toJson))
    val documents = testDocuments.map { doc =>
      val res = client.putDoc(doc.fields("_id").convertTo[String], doc).futureValue
      res shouldBe 'right
      res.right.get
    }

    // Delete second document again
    val indexOfDocumentToDelete = 1
    val idOfDeletedDocument = documents(indexOfDocumentToDelete).fields("id").convertTo[String]
    client.deleteDoc(idOfDeletedDocument, documents(indexOfDocumentToDelete).fields("rev").convertTo[String])

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes)
    createdBackupDbs should have size 1
    val backupDbName = createdBackupDbs.head
    backupDbName should fullyMatch regex s"backup_\\d+_$dbName"

    // Wait for the replication to finish
    waitForReplication(backupDbName)

    // Verify the replicated database is equal to the original database
    compareDatabases(dbName, backupDbName, filterUsed = true)

    // Check that deleted doc has not been copied to snapshot
    val snapshotClient =
      new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort.toInt, dbUsername, dbPassword, backupDbName)
    val snapshotResponse = snapshotClient.getAllDocs(keys = Some(List(idOfDeletedDocument))).futureValue
    snapshotResponse shouldBe 'right
    val results = snapshotResponse.right.get.fields("rows").convertTo[List[JsObject]]
    results should have size 1
    // If deleted doc would be in db, the document id and rev would have been returned
    results.head shouldBe JsObject("key" -> idOfDeletedDocument.toJson, "error" -> "not_found".toJson)

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbName)
  }

  it should "continuously update a database" in {
    // Create a database to backup
    val dbName = testDbPrefix + "database_for_continuous_replication"
    val backupDbName = s"continuous_$dbName"

    // Pre-test cleanup of previously created entities
    removeDatabase(backupDbName, true)
    removeReplicationDoc(backupDbName)

    val client = createDatabase(dbName, Some(designDocPath))

    // Trigger replication and verify the created databases have the correct format
    val (createdBackupDbs, _, _) = runReplicator(dbUrl, dbUrl, testDbPrefix, 10.minutes, continuous = true)
    createdBackupDbs should have size 1
    createdBackupDbs.head shouldBe backupDbName

    // Wait for the replicated database to appear
    val backupClient = waitForDatabase(backupDbName)

    // Create a document in the old database
    println(s"Creating testdocument")
    val docId = "testId"
    val testDocument = JsObject("testKey" -> "testValue".toJson)
    client.putDoc(docId, testDocument).futureValue

    // Wait for the document to appear
    waitForDocument(backupClient, docId)

    // Verify the replicated database is equal to the original database
    compareDatabases(backupDbName, dbName, filterUsed = false)

    // Stop the replication
    val replication = replicatorClient.getDoc(backupDbName).futureValue
    replication shouldBe 'right
    val replicationDoc = replication.right.get
    replicatorClient.deleteDoc(
      replicationDoc.fields("_id").convertTo[String],
      replicationDoc.fields("_rev").convertTo[String])

    // Remove all created databases
    createdBackupDbs.foreach(removeDatabase(_))
    createdBackupDbs.foreach(removeReplicationDoc)
    removeDatabase(dbName)
  }

  it should "remove outdated databases and replicationDocs" in {
    val now = Instant.now()
    val expires = 10.minutes

    println(s"Now is: ${toEpochSeconds(now)}")

    // Create a database that is already expired
    val expired = now.minus(expires + 5.minutes)
    val expiredName = s"backup_${toEpochSeconds(expired)}_${testDbPrefix}expired_backup"
    val expiredClient = createDatabase(expiredName, Some(designDocPath))
    replicatorClient.putDoc(expiredName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue

    // Create a database that is not yet expired
    val notExpired = now.plus(expires - 5.minutes)
    val notExpiredName = s"backup_${toEpochSeconds(notExpired)}_${testDbPrefix}notexpired_backup"
    val notExpiredClient = createDatabase(notExpiredName, Some(designDocPath))
    replicatorClient.putDoc(notExpiredName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue

    // Trigger replication and verify the expired database is deleted while the unexpired one is kept
    val (createdDatabases, deletedReplicationDocs, deletedDatabases) =
      runReplicator(dbUrl, dbUrl, testDbPrefix, expires)
    deletedReplicationDocs should (contain(expiredName) and not contain notExpiredName)
    deletedDatabases should (contain(expiredName) and not contain notExpiredName)

    expiredClient.getAllDocs().futureValue shouldBe Left(StatusCodes.NotFound)
    notExpiredClient.getAllDocs().futureValue shouldBe 'right

    // Cleanup backup database
    createdDatabases.foreach(removeDatabase(_))
    createdDatabases.foreach(removeReplicationDoc)
    removeReplicationDoc(notExpiredName)
    removeDatabase(notExpiredName)
  }

  it should "not remove outdated databases with other prefix" in {
    val now = Instant.now()
    val expires = 10.minutes

    println(s"Now is: ${toEpochSeconds(now)}")

    val expired = now.minus(expires + 5.minutes)

    // Create a database that is expired with correct prefix
    val correctPrefixName = s"backup_${toEpochSeconds(expired)}_${testDbPrefix}expired_backup_correct_prefix"
    val correctPrefixClient = createDatabase(correctPrefixName, Some(designDocPath))
    replicatorClient.putDoc(correctPrefixName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue

    // Create a database that is expired with wrong prefix
    val wrongPrefix = s"replicatortest_wrongprefix_$dbPrefix"
    val wrongPrefixName = s"backup_${toEpochSeconds(expired)}_${wrongPrefix}expired_backup_wrong_prefix"
    val wrongPrefixClient = createDatabase(wrongPrefixName, Some(designDocPath))
    replicatorClient.putDoc(wrongPrefixName, JsObject("source" -> "".toJson, "target" -> "".toJson)).futureValue

    // Trigger replication and verify the expired database with correct prefix is deleted while the db with the wrong prefix is kept
    val (createdDatabases, deletedReplicationDocs, deletedDatabases) =
      runReplicator(dbUrl, dbUrl, testDbPrefix, expires)
    deletedReplicationDocs should (contain(correctPrefixName) and not contain wrongPrefixName)
    deletedDatabases should (contain(correctPrefixName) and not contain wrongPrefixName)

    correctPrefixClient.getAllDocs().futureValue shouldBe Left(StatusCodes.NotFound)
    wrongPrefixClient.getAllDocs().futureValue shouldBe 'right

    // Cleanup backup database
    createdDatabases.foreach(removeDatabase(_))
    createdDatabases.foreach(removeReplicationDoc)
    removeReplicationDoc(wrongPrefixName)
    removeDatabase(wrongPrefixName)
  }

  it should "replay a database" in {
    val now = Instant.now()
    val dbName = testDbPrefix + "database_to_be_restored"
    val backupPrefix = s"backup_${toEpochSeconds(now)}_"
    val backupDbName = backupPrefix + dbName

    // Create a database that looks like a backup
    val backupClient = createDatabase(backupDbName, Some(designDocPath))
    println(s"Creating testdocument")
    backupClient.putDoc("testId", JsObject("testKey" -> "testValue".toJson)).futureValue

    // Run the replay script
    val (_, _, replicationId) = runReplay(dbUrl, dbUrl, backupPrefix).head

    // Wait for the replication to finish
    waitForReplication(replicationId)

    // Verify the replicated database is equal to the original database
    compareDatabases(backupDbName, dbName, filterUsed = false)

    // Cleanup databases
    removeReplicationDoc(replicationId)
    removeDatabase(backupDbName)
    removeDatabase(dbName)
  }
}
