JAMES-2813 handle UpdateAdditionalInformation command into the aggregate
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
index f31c33a..42b4cc0 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -33,6 +33,8 @@
case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId)
+case class AdditionalInformationUpdated(aggregateId: TaskAggregateId, override val eventId: EventId, additionalInformation: AdditionalInformation) extends TaskEvent(aggregateId, eventId)
+
case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId)
case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result, additionalInformation: Option[AdditionalInformation]) extends TerminalTaskEvent(aggregateId, eventId, additionalInformation)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 9b902d2..0c158ba 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -58,6 +58,18 @@
}
}
+ private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = {
+ currentStatus match {
+ case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ case Some(Status.COMPLETED) => Nil.asJava
+ case Some(Status.FAILED) => Nil.asJava
+ case Some(Status.WAITING) => Nil.asJava
+ case Some(Status.CANCELLED) => Nil.asJava
+ case _ => Nil.asJava
+ }
+ }
+
private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
currentStatus match {
case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation))
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
index 6c8b20e..15abba5 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
@@ -31,6 +31,8 @@
case class Start(id: TaskId) extends TaskCommand
+ case class UpdateAdditionalInformation(id: TaskId, additionalInformation: AdditionalInformation) extends TaskCommand
+
case class Complete(id: TaskId, result: Result, additionalInformation: Option[AdditionalInformation]) extends TaskCommand
case class RequestCancel(id: TaskId) extends TaskCommand
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
new file mode 100644
index 0000000..03797da
--- /dev/null
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.task.eventsourcing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.eventstore.History;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryReferenceWithCounterTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Streams;
+import scala.None;
+import scala.None$;
+
+class TaskAggregateTest {
+
+ static final Hostname HOSTNAME = Hostname.apply("foo");
+ static final TaskAggregateId ID = TaskAggregateId.apply(TaskId.generateTaskId());
+
+ History buildHistory(Function<EventId, Event>... events) {
+ return History.of(
+ Streams.zip(
+ Stream.iterate(EventId.first(), EventId::next),
+ Arrays.stream(events),
+ (id, event) -> event.apply(id))
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
+ }
+
+ @Test
+ void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME)
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3)))
+ .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3)));
+ }
+
+ @Test
+ void givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> CancelRequested.apply(ID, eventId, HOSTNAME)
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3)))
+ .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3)));
+ }
+
+ @Test
+ void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, task, HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, task.type(), None$.empty())
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
+ }
+
+ @Test
+ void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, task, HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> Failed.apply(ID, eventId, task.type(), None$.empty())
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
+ }
+
+ @Test
+ void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, task, HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> Cancelled.apply(ID, eventId, task.type(), None$.empty())
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
+ }
+}