blob: 91e0e178debce95acea0d6d6613662c0a370a8f5 [file] [log] [blame]
/***************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.task.eventsourcing.cassandra
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.{BoundStatement, Row}
import com.datastax.oss.driver.api.core.data.UdtValue
import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, deleteFrom, insertInto, selectFrom}
import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule}
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer
import org.apache.james.task._
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
import org.apache.james.util.ReactorUtils
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scala.publisher.SMono
import java.time.Instant
import java.util.Optional
import jakarta.inject.Inject
import scala.compat.java8.OptionConverters._
class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: CqlSession, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) {
private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
private val insertStatement = session.prepare(insertInto(TABLE_NAME)
.value(TASK_ID, bindMarker(TASK_ID))
.value(TYPE, bindMarker(TYPE))
.value(STATUS, bindMarker(STATUS))
.value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE))
.value(SUBMITTED_NODE, bindMarker(SUBMITTED_NODE))
.value(STARTED_DATE, bindMarker(STARTED_DATE))
.value(RAN_NODE, bindMarker(RAN_NODE))
.value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
.value(CANCELED_DATE, bindMarker(CANCELED_DATE))
.value(CANCEL_REQUESTED_NODE, bindMarker(CANCEL_REQUESTED_NODE))
.value(FAILED_DATE, bindMarker(FAILED_DATE))
.value(ADDITIONAL_INFORMATION, bindMarker(ADDITIONAL_INFORMATION))
.build())
private val selectStatement = session.prepare(selectFrom(TABLE_NAME)
.all()
.whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID))
.build())
private val listStatement = session.prepare(selectFrom(TABLE_NAME).all().build())
private val removeStatement = session.prepare(deleteFrom(TABLE_NAME)
.whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID))
.build())
def saveDetails(details: TaskExecutionDetails): Mono[Void] =
Mono.from(serializeAdditionalInformation(details)
.flatMap(serializeAdditionalInformation => {
val boundStatement = insertStatement.bind()
.setUuid(TASK_ID, details.getTaskId.getValue)
.setString(TYPE, details.getType.asString())
.setString(STATUS, details.getStatus.getValue)
.setUdtValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
.setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
val bindOptionalFieldOperations = List(
(statement: BoundStatement) => bindOptionalUDTValue(statement, STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate)),
(statement: BoundStatement) => bindOptionalStringValue(statement, RAN_NODE, details.getRanNode.map[String](_.asString)),
(statement: BoundStatement) => bindOptionalUDTValue(statement, COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate)),
(statement: BoundStatement) => bindOptionalUDTValue(statement, CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate)),
(statement: BoundStatement) => bindOptionalStringValue(statement, CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)),
(statement: BoundStatement) => bindOptionalUDTValue(statement, FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getFailedDate)),
(statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation),
)
val fullyBoundStatement = bindOptionalFieldOperations.foldLeft(boundStatement)((statement, bindFieldOperation) => {
bindFieldOperation(statement)
})
SMono(cassandraAsyncExecutor.executeVoid(fullyBoundStatement))
}))
private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) =
fieldValue.asScala match {
case Some(value) => statement.setString(fieldName, value)
case None => statement
}
private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UdtValue]) =
fieldValue.asScala match {
case Some(value) => statement.setUdtValue(fieldName, value)
case None => statement
}
private def serializeAdditionalInformation(details: TaskExecutionDetails): SMono[Optional[String]] = SMono.fromCallable(() =>details
.getAdditionalInformation
.map(jsonTaskAdditionalInformationSerializer.serialize(_)))
.subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = cassandraAsyncExecutor
.executeSingleRow(selectStatement.bind().setUuid(TASK_ID, taskId.getValue))
.map(readRow)
def listDetails(): Flux[TaskExecutionDetails] = cassandraAsyncExecutor
.executeRows(listStatement.bind())
.map(readRow)
def listDetailsByBeforeDate(beforeDate: Instant): Flux[TaskExecutionDetails] =
listDetails()
.filter(detail => detail.getSubmittedDate.toInstant.isBefore(beforeDate))
def remove(details: TaskExecutionDetails) : Mono[Void] =
cassandraAsyncExecutor.executeVoid(removeStatement.bind()
.setUuid(TASK_ID, details.getTaskId.getValue))
private def readRow(row: Row): TaskExecutionDetails = {
val taskType = TaskType.of(row.getString(TYPE))
new TaskExecutionDetails(
taskId = TaskId.fromUUID(row.getUuid(TASK_ID)),
`type` = TaskType.of(row.getString(TYPE)),
status = TaskManager.Status.fromString(row.getString(STATUS)),
submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUdtValue(SUBMITTED_DATE)),
submittedNode = Hostname(row.getString(SUBMITTED_NODE)),
startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(STARTED_DATE)),
ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)),
completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(COMPLETED_DATE)),
canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(CANCELED_DATE)),
cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(FAILED_DATE)),
additionalInformation = () => deserializeAdditionalInformation(taskType, row))
}
private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] =
Optional.ofNullable(row.getString(ADDITIONAL_INFORMATION))
.map(additionalInformation => jsonTaskAdditionalInformationSerializer.deserialize(additionalInformation))
}