JAMES-3539 Backend should clean expired Push subscriptions
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
index dbcb157..8df62b6 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
@@ -22,6 +22,7 @@
 import org.apache.james.CassandraExtension;
 import org.apache.james.CassandraRabbitMQJamesConfiguration;
 import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.ClockExtension;
 import org.apache.james.DockerOpenSearchExtension;
 import org.apache.james.JamesServerBuilder;
 import org.apache.james.JamesServerExtension;
@@ -51,6 +52,7 @@
         .extension(new CassandraExtension())
         .extension(new RabbitMQExtension())
         .extension(new AwsS3BlobStoreExtension())
+        .extension(new ClockExtension())
         .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
             .overrideWith(new TestJMAPServerModule(), new PushSubscriptionProbeModule())
             .overrideWith(binder -> binder.bind(PushClientConfiguration.class).toInstance(PushClientConfiguration.UNSAFE_DEFAULT())))
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
index 58a091a..2c02505 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
@@ -47,7 +47,7 @@
 import org.apache.james.jmap.http.UserCredential
 import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
 import org.apache.james.jmap.rfc8621.contract.PushSubscriptionSetMethodContract.TIME_FORMATTER
-import org.apache.james.utils.{DataProbeImpl, GuiceProbe}
+import org.apache.james.utils.{DataProbeImpl, GuiceProbe, UpdatableTickingClock}
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.SoftAssertions
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -687,6 +687,49 @@
   }
 
   @Test
+  def getShouldNotReturnExpiredSubscriptionAndTriggerTheDeletion(server: GuiceJamesServer, clock: UpdatableTickingClock): Unit = {
+    val probe = server.getProbe(classOf[PushSubscriptionProbe])
+    val pushSubscription1 = probe
+      .createPushSubscription(username = BOB,
+        url = PushSubscriptionServerURL(new URI("https://example.com/push/?device=X8980fc&client=12c6d086").toURL()),
+        deviceId = DeviceClientId("12c6d086"),
+        types = Seq(MailboxTypeName))
+    probe.validatePushSubscription(BOB, pushSubscription1.id)
+
+    clock.setInstant(ZonedDateTime.now().plusDays(100).toInstant)
+
+    assertThatJson(`given`
+      .body("""{
+              |    "using": ["urn:ietf:params:jmap:core"],
+              |    "methodCalls": [ [ "PushSubscription/get", { }, "c1" ] ]
+              |}""".stripMargin)
+      .when
+        .post
+      .`then`
+        .statusCode(SC_OK)
+        .contentType(JSON)
+        .extract
+        .body
+        .asString)
+        .isEqualTo(
+          s"""{
+             |    "sessionState": "${SESSION_STATE.value}",
+             |    "methodResponses": [
+             |        [
+             |            "PushSubscription/get",
+             |            {
+             |                "list": []
+             |            },
+             |            "c1"
+             |        ]
+             |    ]
+             |}""".stripMargin)
+
+    assertThat(probe.retrievePushSubscription(BOB, pushSubscription1.id))
+      .isNull()
+  }
+
+  @Test
   def getByIdShouldReturnRecords(server: GuiceJamesServer): Unit = {
     val probe = server.getProbe(classOf[PushSubscriptionProbe])
     val pushSubscription1 = probe
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
index 93f3e03..96f12a6 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
@@ -21,6 +21,7 @@
 
 import static org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
 
+import org.apache.james.ClockExtension;
 import org.apache.james.JamesServerBuilder;
 import org.apache.james.JamesServerExtension;
 import org.apache.james.MemoryJamesConfiguration;
@@ -40,6 +41,7 @@
             .configurationFromClasspath()
             .usersRepository(DEFAULT)
             .build())
+        .extension(new ClockExtension())
         .server(configuration -> MemoryJamesServerMain.createServer(configuration)
             .overrideWith(new TestJMAPServerModule(), new PushSubscriptionProbeModule())
             .overrideWith(binder -> binder.bind(PushClientConfiguration.class).toInstance(PushClientConfiguration.UNSAFE_DEFAULT())))
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
index e46b8cb..e1dc360 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
@@ -19,9 +19,12 @@
 
 package org.apache.james.jmap.method
 
+import java.time.Clock
+
 import eu.timepit.refined.auto._
 import jakarta.inject.Inject
-import org.apache.james.jmap.api.model.PushSubscriptionId
+import org.apache.james.jmap.api.model.{PushSubscription, PushSubscriptionId}
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast
 import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
 import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE}
 import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
@@ -50,7 +53,8 @@
                                           val pushSubscriptionRepository: PushSubscriptionRepository,
                                           val metricFactory: MetricFactory,
                                           val sessionSupplier: SessionSupplier,
-                                          val sessionTranslator: SessionTranslator) extends MethodWithoutAccountId[PushSubscriptionGetRequest] with Startable {
+                                          val sessionTranslator: SessionTranslator,
+                                          val clock: Clock) extends MethodWithoutAccountId[PushSubscriptionGetRequest] with Startable {
   override val methodName: Invocation.MethodName = MethodName("PushSubscription/get")
   override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE)
 
@@ -76,16 +80,26 @@
 
   private def retrieveAllRecords(session: MailboxSession): SMono[PushSubscriptionGetResults] =
     SFlux(pushSubscriptionRepository.list(session.getUser))
+      .flatMap(cleanExpiredSubscriptionIfNeeded(session, _))
       .map(PushSubscriptionDTO.from)
       .collectSeq
       .map(dtos => PushSubscriptionGetResults(dtos, Set()))
 
+  private def cleanExpiredSubscriptionIfNeeded(session: MailboxSession, subscription: PushSubscription): SMono[PushSubscription] =
+    if (isInThePast(subscription.expires, clock)) {
+      SMono(pushSubscriptionRepository.revoke(session.getUser, subscription.id))
+        .`then`(SMono.empty)
+    } else {
+      SMono.fromCallable(() => subscription)
+    }
+
   private def retrieveRecords(unparsedIds: Ids, session: MailboxSession): SMono[PushSubscriptionGetResults] = {
     val ids: Set[PushSubscriptionId] = unparsedIds.value
       .flatMap(unparsedId => PushSubscriptionId.parse(unparsedId.serialise).toOption)
       .toSet
 
     SFlux(pushSubscriptionRepository.get(session.getUser, ids.asJava))
+      .flatMap(cleanExpiredSubscriptionIfNeeded(session, _))
       .map(PushSubscriptionDTO.from)
       .collectSeq()
       .map(dtos => PushSubscriptionGetResults(dtos, unparsedIds.value.toSet -- dtos.map(dto => UnparsedPushSubscriptionId.of(dto.id))))