blob: ab7e65154ffd5f0b356c0a09e78ea71a6efcea52 [file] [log] [blame]
/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
* ***************************************************************/
package org.apache.james.task.eventsourcing.distributed
import java.util.Optional
import com.fasterxml.jackson.annotation.JsonProperty
import org.apache.james.eventsourcing.EventId
import org.apache.james.eventsourcing.eventstore.dto.EventDTO
import org.apache.james.json.DTOConverter
import org.apache.james.server.task.json.dto.{AdditionalInformationDTO, TaskDTO}
import org.apache.james.task.TaskExecutionDetails.AdditionalInformation
import org.apache.james.task.eventsourcing._
import org.apache.james.task.eventsourcing.distributed.distributed.{AdditionalInformationConverter, TaskConverter}
import org.apache.james.task.{Hostname, Task, TaskExecutionDetails, TaskId}
import scala.compat.java8.OptionConverters._
package object distributed {
type AdditionalInformationConverter = DTOConverter[TaskExecutionDetails.AdditionalInformation, AdditionalInformationDTO]
type TaskConverter = DTOConverter[Task, TaskDTO]
}
class NestedTaskDTOSerializerNotFound(val domainObject: Task) extends RuntimeException("Unable to find a serializer for " + domainObject) { }
class NestedTaskDTODeserializerNotFound(val dto: TaskDTO) extends RuntimeException("Unable to find a deserializer for " + dto) { }
class NestedAdditionalInformationDTODeserializerNotFound(val dto: AdditionalInformationDTO) extends RuntimeException("Unable to find a deserializer for " + dto) { }
class NestedAdditionalInformationDTOSerializerNotFound(val domainObject: AdditionalInformation) extends RuntimeException("Unable to find a serializer for " + domainObject) { }
sealed abstract class TaskEventDTO(val getType: String, val getAggregate: String, val getEvent: Int) extends EventDTO {
protected def domainAggregateId: TaskAggregateId = TaskAggregateId(TaskId.fromString(getAggregate))
protected def domainEventId: EventId = EventId.fromSerialized(getEvent)
}
case class CreatedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("task") getTask: TaskDTO,
@JsonProperty("hostname") getHostname: String)
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject(taskConverter: TaskConverter): Created = {
val task: Task = taskConverter.toDomainObject(getTask).orElseThrow(() => new NestedTaskDTODeserializerNotFound(getTask))
Created(domainAggregateId, domainEventId, task, Hostname(getHostname))
}
}
object CreatedDTO {
def fromDomainObject(taskConverter: TaskConverter)(event: Created, typeName: String): CreatedDTO = {
val taskDTO = taskConverter.toDTO(event.task).orElseThrow(() => new NestedTaskDTOSerializerNotFound(event.task))
CreatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, taskDTO, event.hostname.asString)
}
}
case class StartedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("hostname") getHostname: String)
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject: Started = Started(domainAggregateId, domainEventId, Hostname(getHostname))
}
object StartedDTO {
def fromDomainObject(event: Started, typeName: String): StartedDTO =
StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, event.hostname.asString)
}
case class CancelRequestedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("hostname") getHostname: String)
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject: CancelRequested = CancelRequested(domainAggregateId, domainEventId, Hostname(getHostname))
}
object CancelRequestedDTO {
def fromDomainObject(event: CancelRequested, typeName: String): CancelRequestedDTO =
CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, event.hostname.asString)
}
case class CompletedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("result") getResult: String,
@JsonProperty("additionalInformation") getAdditionalInformation: Optional[AdditionalInformationDTO])
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject(additionalInformationConverter: AdditionalInformationConverter): Completed = {
val additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation] = getAdditionalInformation
.map(dto => additionalInformationConverter.toDomainObject(dto).orElseThrow(() => new NestedAdditionalInformationDTODeserializerNotFound(dto)))
Completed(domainAggregateId, domainEventId, domainResult, additionalInformation.asScala)
}
private def domainResult: Task.Result = getResult match {
case "COMPLETED" => Task.Result.COMPLETED
case "PARTIAL" => Task.Result.PARTIAL
}
}
object CompletedDTO {
def fromDomainObject(dtoConverter: AdditionalInformationConverter)(event: Completed, typeName: String): CompletedDTO = {
val additionalInformationDTO: Optional[AdditionalInformationDTO] = event
.additionalInformation
.asJava
.map(domainObject => dtoConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject)))
CompletedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, resultToString(event.result), additionalInformationDTO)
}
private def resultToString(result: Task.Result): String = result match {
case Task.Result.COMPLETED => "COMPLETED"
case Task.Result.PARTIAL => "PARTIAL"
}
}
case class FailedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("additionalInformation") getAdditionalInformation: Optional[AdditionalInformationDTO],
@JsonProperty("errorMessage") getErrorMessage: Optional[String],
@JsonProperty("exception") getException: Optional[String])
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject(additionalInformationConverter: AdditionalInformationConverter): Failed = {
val additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation] = getAdditionalInformation
.map(dto => additionalInformationConverter.toDomainObject(dto).orElseThrow(() => new NestedAdditionalInformationDTODeserializerNotFound(dto)))
Failed(domainAggregateId, domainEventId, additionalInformation.asScala, getErrorMessage.asScala, getException.asScala)
}
}
object FailedDTO {
def fromDomainObject(dtoConverter: AdditionalInformationConverter)(event: Failed, typeName: String): FailedDTO = {
val additionalInformationDTO: Optional[AdditionalInformationDTO] = event
.additionalInformation
.asJava.map(domainObject => dtoConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject)))
FailedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO, event.errorMessage.asJava, event.exception.asJava)
}
}
case class CancelledDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("additionalInformation") getAdditionalInformation: Optional[AdditionalInformationDTO])
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject(additionalInformationConverter: AdditionalInformationConverter): Cancelled = {
val additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation] = getAdditionalInformation
.map(dto => additionalInformationConverter.toDomainObject(dto).orElseThrow(() => new NestedAdditionalInformationDTODeserializerNotFound(dto)))
Cancelled(domainAggregateId, domainEventId, additionalInformation.asScala)
}
}
object CancelledDTO {
def fromDomainObject(additionalInformationConverter: AdditionalInformationConverter)(event: Cancelled, typeName: String): CancelledDTO = {
val additionalInformationDTO: Optional[AdditionalInformationDTO] = event
.additionalInformation
.asJava
.map(domainObject => additionalInformationConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject)))
CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO)
}
}
case class AdditionalInformationUpdatedDTO(@JsonProperty("type") typeName: String,
@JsonProperty("aggregate") aggregateId: String,
@JsonProperty("event") eventId: Int,
@JsonProperty("additionalInformation") getAdditionalInformation: AdditionalInformationDTO)
extends TaskEventDTO(typeName, aggregateId, eventId) {
def toDomainObject(additionalInformationConverter: AdditionalInformationConverter): AdditionalInformationUpdated = {
val additionalInformation = additionalInformationConverter
.toDomainObject(getAdditionalInformation)
.orElseThrow(() => new NestedAdditionalInformationDTODeserializerNotFound(getAdditionalInformation))
AdditionalInformationUpdated(domainAggregateId, domainEventId, additionalInformation)
}
}
object AdditionalInformationUpdatedDTO {
def fromDomainObject(additionalInformationConverter: AdditionalInformationConverter)(event: AdditionalInformationUpdated, typeName: String): AdditionalInformationUpdatedDTO = {
val additionalInformationDTO = additionalInformationConverter
.toDTO(event.additionalInformation)
.orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(event.additionalInformation))
AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO)
}
}