blob: 0c158ba260d8eca72346fa36317a1aa103e14f91 [file] [log] [blame]
/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
* ***************************************************************/
package org.apache.james.task.eventsourcing
import java.util
import org.apache.james.eventsourcing.eventstore.History
import org.apache.james.eventsourcing.{Event, EventId}
import org.apache.james.task.Task.Result
import org.apache.james.task.TaskExecutionDetails.AdditionalInformation
import org.apache.james.task.TaskManager.Status
import org.apache.james.task.{Hostname, Task}
import scala.collection.JavaConverters._
class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) {
private val currentStatus: Option[Status] = history
.getEvents
.asScala
.foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event))
.status
def create(task: Task, hostname: Hostname): util.List[Event] = {
if (currentStatus.isEmpty) {
createEventWithId(Created(aggregateId, _, task, hostname))
} else Nil.asJava
}
private[eventsourcing] def start(hostname: Hostname): util.List[Event] = {
currentStatus match {
case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname))
case _ => Nil.asJava
}
}
def requestCancel(hostname: Hostname): util.List[Event] = {
currentStatus match {
case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _, hostname))
case _ => Nil.asJava
}
}
private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = {
currentStatus match {
case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
case Some(Status.COMPLETED) => Nil.asJava
case Some(Status.FAILED) => Nil.asJava
case Some(Status.WAITING) => Nil.asJava
case Some(Status.CANCELLED) => Nil.asJava
case _ => Nil.asJava
}
}
private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
currentStatus match {
case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation))
case _ => Nil.asJava
}
}
private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): util.List[Event] = {
currentStatus match {
case Some(status) if !status.isFinished => createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception))
case _ => Nil.asJava
}
}
private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
currentStatus match {
case Some(status) if !status.isFinished => createEventWithId(Cancelled(aggregateId, _, additionalInformation))
case _ => Nil.asJava
}
}
private def createEventWithId(event: EventId => Event): util.List[Event] =
List(history.getNextEventId)
.map({ eventId => event(eventId) })
.asJava
}
object TaskAggregate {
def fromHistory(aggregateId: TaskAggregateId, history: History) = new TaskAggregate(aggregateId, history)
}