JAMES-3539 Backend should clean expired Push subscriptions
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
index dbcb157..8df62b6 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedPushSubscriptionSetMethodTest.java
@@ -22,6 +22,7 @@
import org.apache.james.CassandraExtension;
import org.apache.james.CassandraRabbitMQJamesConfiguration;
import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.ClockExtension;
import org.apache.james.DockerOpenSearchExtension;
import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
@@ -51,6 +52,7 @@
.extension(new CassandraExtension())
.extension(new RabbitMQExtension())
.extension(new AwsS3BlobStoreExtension())
+ .extension(new ClockExtension())
.server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
.overrideWith(new TestJMAPServerModule(), new PushSubscriptionProbeModule())
.overrideWith(binder -> binder.bind(PushClientConfiguration.class).toInstance(PushClientConfiguration.UNSAFE_DEFAULT())))
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
index 58a091a..2c02505 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/PushSubscriptionSetMethodContract.scala
@@ -47,7 +47,7 @@
import org.apache.james.jmap.http.UserCredential
import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
import org.apache.james.jmap.rfc8621.contract.PushSubscriptionSetMethodContract.TIME_FORMATTER
-import org.apache.james.utils.{DataProbeImpl, GuiceProbe}
+import org.apache.james.utils.{DataProbeImpl, GuiceProbe, UpdatableTickingClock}
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.SoftAssertions
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -687,6 +687,49 @@
}
@Test
+ def getShouldNotReturnExpiredSubscriptionAndTriggerTheDeletion(server: GuiceJamesServer, clock: UpdatableTickingClock): Unit = {
+ val probe = server.getProbe(classOf[PushSubscriptionProbe])
+ val pushSubscription1 = probe
+ .createPushSubscription(username = BOB,
+ url = PushSubscriptionServerURL(new URI("https://example.com/push/?device=X8980fc&client=12c6d086").toURL()),
+ deviceId = DeviceClientId("12c6d086"),
+ types = Seq(MailboxTypeName))
+ probe.validatePushSubscription(BOB, pushSubscription1.id)
+
+ clock.setInstant(ZonedDateTime.now().plusDays(100).toInstant)
+
+ assertThatJson(`given`
+ .body("""{
+ | "using": ["urn:ietf:params:jmap:core"],
+ | "methodCalls": [ [ "PushSubscription/get", { }, "c1" ] ]
+ |}""".stripMargin)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(JSON)
+ .extract
+ .body
+ .asString)
+ .isEqualTo(
+ s"""{
+ | "sessionState": "${SESSION_STATE.value}",
+ | "methodResponses": [
+ | [
+ | "PushSubscription/get",
+ | {
+ | "list": []
+ | },
+ | "c1"
+ | ]
+ | ]
+ |}""".stripMargin)
+
+ assertThat(probe.retrievePushSubscription(BOB, pushSubscription1.id))
+ .isNull()
+ }
+
+ @Test
def getByIdShouldReturnRecords(server: GuiceJamesServer): Unit = {
val probe = server.getProbe(classOf[PushSubscriptionProbe])
val pushSubscription1 = probe
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
index 93f3e03..96f12a6 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryPushSubscriptionSetMethodTest.java
@@ -21,6 +21,7 @@
import static org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+import org.apache.james.ClockExtension;
import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
import org.apache.james.MemoryJamesConfiguration;
@@ -40,6 +41,7 @@
.configurationFromClasspath()
.usersRepository(DEFAULT)
.build())
+ .extension(new ClockExtension())
.server(configuration -> MemoryJamesServerMain.createServer(configuration)
.overrideWith(new TestJMAPServerModule(), new PushSubscriptionProbeModule())
.overrideWith(binder -> binder.bind(PushClientConfiguration.class).toInstance(PushClientConfiguration.UNSAFE_DEFAULT())))
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
index e46b8cb..e1dc360 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/PushSubscriptionGetMethod.scala
@@ -19,9 +19,12 @@
package org.apache.james.jmap.method
+import java.time.Clock
+
import eu.timepit.refined.auto._
import jakarta.inject.Inject
-import org.apache.james.jmap.api.model.PushSubscriptionId
+import org.apache.james.jmap.api.model.{PushSubscription, PushSubscriptionId}
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast
import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE}
import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
@@ -50,7 +53,8 @@
val pushSubscriptionRepository: PushSubscriptionRepository,
val metricFactory: MetricFactory,
val sessionSupplier: SessionSupplier,
- val sessionTranslator: SessionTranslator) extends MethodWithoutAccountId[PushSubscriptionGetRequest] with Startable {
+ val sessionTranslator: SessionTranslator,
+ val clock: Clock) extends MethodWithoutAccountId[PushSubscriptionGetRequest] with Startable {
override val methodName: Invocation.MethodName = MethodName("PushSubscription/get")
override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE)
@@ -76,16 +80,26 @@
private def retrieveAllRecords(session: MailboxSession): SMono[PushSubscriptionGetResults] =
SFlux(pushSubscriptionRepository.list(session.getUser))
+ .flatMap(cleanExpiredSubscriptionIfNeeded(session, _))
.map(PushSubscriptionDTO.from)
.collectSeq
.map(dtos => PushSubscriptionGetResults(dtos, Set()))
+ private def cleanExpiredSubscriptionIfNeeded(session: MailboxSession, subscription: PushSubscription): SMono[PushSubscription] =
+ if (isInThePast(subscription.expires, clock)) {
+ SMono(pushSubscriptionRepository.revoke(session.getUser, subscription.id))
+ .`then`(SMono.empty)
+ } else {
+ SMono.fromCallable(() => subscription)
+ }
+
private def retrieveRecords(unparsedIds: Ids, session: MailboxSession): SMono[PushSubscriptionGetResults] = {
val ids: Set[PushSubscriptionId] = unparsedIds.value
.flatMap(unparsedId => PushSubscriptionId.parse(unparsedId.serialise).toOption)
.toSet
SFlux(pushSubscriptionRepository.get(session.getUser, ids.asJava))
+ .flatMap(cleanExpiredSubscriptionIfNeeded(session, _))
.map(PushSubscriptionDTO.from)
.collectSeq()
.map(dtos => PushSubscriptionGetResults(dtos, unparsedIds.value.toSet -- dtos.map(dto => UnparsedPushSubscriptionId.of(dto.id))))