blob: 35856b0ee9bdf1c842acb238eca817517f16c080 [file] [log] [blame]
/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
* ***************************************************************/
package org.apache.james.task.eventsourcing
import java.io.Closeable
import java.time.Duration
import java.util
import javax.inject.Inject
import org.apache.james.eventsourcing.eventstore.{EventStore, History}
import org.apache.james.eventsourcing.{AggregateId, Subscriber}
import org.apache.james.task.TaskManager.ReachedTimeoutException
import org.apache.james.task._
import org.apache.james.task.eventsourcing.TaskCommand._
import com.google.common.annotations.VisibleForTesting
import javax.annotation.PreDestroy
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scheduler.Schedulers
class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
workQueueSupplier: WorkQueueSupplier,
val eventStore: EventStore,
val executionDetailsProjection: TaskExecutionDetailsProjection,
val hostname: Hostname,
val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable {
private def workDispatcher: Subscriber = {
case Created(aggregateId, _, task, _) =>
val taskWithId = new TaskWithId(aggregateId.taskId, task)
workQueue.submit(taskWithId)
case CancelRequested(aggregateId, _, _) =>
workQueue.cancel(aggregateId.taskId)
case _ =>
}
import scala.collection.JavaConverters._
private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _
private val eventSourcingSystem = ScalaEventSourcingSystem(
handlers = Set(
new CreateCommandHandler(loadHistory, hostname),
new StartCommandHandler(loadHistory, hostname),
new RequestCancelCommandHandler(loadHistory, hostname),
new CompleteCommandHandler(loadHistory),
new CancelCommandHandler(loadHistory),
new FailCommandHandler(loadHistory),
new UpdateCommandHandler(loadHistory)),
subscribers = Set(
executionDetailsProjection.asSubscriber(hostname),
workDispatcher,
terminationSubscriber),
eventStore = eventStore)
private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
override def submit(task: Task): TaskId = {
val taskId = TaskId.generateTaskId
val command = Create(taskId, task)
eventSourcingSystem.dispatch(command)
taskId
}
override def getExecutionDetails(id: TaskId): TaskExecutionDetails = executionDetailsProjection.load(id)
.getOrElse(throw new TaskNotFoundException())
override def list: util.List[TaskExecutionDetails] = listScala.asJava
override def list(status: TaskManager.Status): util.List[TaskExecutionDetails] = listScala
.filter(details => details.getStatus == status)
.asJava
private def listScala: List[TaskExecutionDetails] = executionDetailsProjection
.list
.flatMap(details => executionDetailsProjection.load(details.taskId))
override def cancel(id: TaskId): Unit = {
val command = RequestCancel(id)
eventSourcingSystem.dispatch(command)
}
@throws(classOf[TaskNotFoundException])
@throws(classOf[ReachedTimeoutException])
override def await(id: TaskId, timeout: Duration): TaskExecutionDetails = {
try {
val details = Mono.fromSupplier[TaskExecutionDetails](() => getExecutionDetails(id))
.filter(_.getStatus.isFinished)
val findEvent = Flux.from(terminationSubscriber.listenEvents)
.filter {
case event: TaskEvent => event.getAggregateId.taskId == id
case _ => false
}
.next()
.then(details)
Flux.merge(findEvent, details)
.subscribeOn(Schedulers.boundedElastic)
.blockFirst(timeout)
} catch {
case _: IllegalStateException => throw new ReachedTimeoutException
}
}
@PreDestroy
override def close(): Unit = {
workQueue.close()
}
}