blob: 3e105a6fa3fb55a40ef32d9ffc35e59d7f9b3498 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.rfc8621.contract
import java.net.{ProtocolException, URI}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
import org.apache.james.GuiceJamesServer
import org.apache.james.core.Username
import org.apache.james.jmap.JmapGuiceProbe
import org.apache.james.jmap.api.change.State
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.core.{PushState, UuidState}
import org.apache.james.jmap.rfc8621.contract.Fixture._
import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbe
import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.MailboxACL.Right
import org.apache.james.mailbox.model.{MailboxACL, MailboxPath}
import org.apache.james.mime4j.dom.Message
import org.apache.james.modules.protocols.SmtpGuiceProbe
import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
import org.awaitility.Awaitility
import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
import org.awaitility.core.ConditionFactory
import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
import play.api.libs.json.{JsString, Json}
import reactor.core.scala.publisher.SMono
import reactor.core.scheduler.Schedulers
import sttp.capabilities.WebSockets
import sttp.client3.monad.IdMonad
import sttp.client3.okhttp.OkHttpSyncBackend
import sttp.client3.{Identity, RequestT, SttpBackend, asWebSocket, basicRequest}
import sttp.model.Uri
import sttp.monad.MonadError
import sttp.ws.{WebSocket, WebSocketFrame}
import scala.jdk.CollectionConverters._
import scala.util.Try
trait WebSocketContract {
private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.and.`with`.pollDelay(ONE_HUNDRED_MILLISECONDS)
.await
.atMost(10, TimeUnit.SECONDS)
private lazy val backend: SttpBackend[Identity, WebSockets] = OkHttpSyncBackend()
private lazy implicit val monadError: MonadError[Identity] = IdMonad
@BeforeEach
def setUp(server: GuiceJamesServer): Unit = {
server.getProbe(classOf[DataProbeImpl])
.fluent()
.addDomain(DOMAIN.asString())
.addUser(ANDRE.asString(), ANDRE_PASSWORD)
.addUser(BOB.asString(), BOB_PASSWORD)
.addUser(DAVID.asString(), "secret")
}
@Test
@Timeout(180)
def apiRequestsShouldBeProcessed(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "Request",
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "arg1data",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "@type":"Response",
| "requestId":"req-36",
| "sessionState":"2c9f1b12-b35a-43e6-9af2-0106fb53a943",
| "methodResponses":[
| ["Core/echo",
| {
| "arg1":"arg1data",
| "arg2":"arg2data"
| },"c1"]
| ]
|}
|""".stripMargin)
}
@Test
@Timeout(180)
def executingSeveralAPICallsShouldBePossible(server: GuiceJamesServer): Unit = {
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
List({
ws.send(WebSocketFrame.text(
"""{
| "@type": "Request",
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "1",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
}, {
Thread.sleep(200)
ws.send(WebSocketFrame.text(
"""{
| "@type": "Request",
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "2",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
})
.send(backend)
.body
assertThat(response.toOption.get.asJava)
.hasSize(2)
}
@Test
@Timeout(180)
def nonJsonPayloadShouldTriggerError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text("The quick brown fox"))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "status":400,
| "detail":"The request was successfully parsed as JSON but did not match the type signature of the Request object: List((,List(JsonValidationError(List(Unrecognized token 'The': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)\"The quick brown fox\"; line: 1, column: 4]),List()))))",
| "type":"urn:ietf:params:jmap:error:notRequest",
| "requestId":null,
| "@type":"RequestError"
|}""".stripMargin)
}
@Test
@Timeout(180)
def handshakeShouldBeAuthenticated(server: GuiceJamesServer): Unit = {
assertThatThrownBy(() =>
unauthenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text("The quick brown fox"))
ws.receive().asPayload
})
.send(backend)
.body)
.hasRootCause(new ProtocolException("Expected HTTP 101 response but was '401 Unauthorized'"))
}
@Test
@Timeout(180)
def noTypeFieldShouldTriggerError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "arg1data",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "status":400,
| "detail":"The request was successfully parsed as JSON but did not match the type signature of the Request object: List((,List(JsonValidationError(List(Missing @type field on a webSocket inbound message),List()))))",
| "type":"urn:ietf:params:jmap:error:notRequest",
| "requestId":null,
| "@type":"RequestError"
|}""".stripMargin)
}
@Test
@Timeout(180)
def badTypeFieldShouldTriggerError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": 42,
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "arg1data",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "status":400,
| "detail":"The request was successfully parsed as JSON but did not match the type signature of the Request object: List((,List(JsonValidationError(List(Invalid @type field on a webSocket inbound message: expecting a JsString, got 42),List()))))",
| "type":"urn:ietf:params:jmap:error:notRequest",
| "requestId":null,
| "@type":"RequestError"
|}""".stripMargin)
}
@Test
@Timeout(180)
def unknownTypeFieldShouldTriggerError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "unknown",
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "arg1data",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "status":400,
| "detail":"The request was successfully parsed as JSON but did not match the type signature of the Request object: List((,List(JsonValidationError(List(Unknown @type field on a webSocket inbound message: unknown),List()))))",
| "type":"urn:ietf:params:jmap:error:notRequest",
| "requestId":null,
| "@type":"RequestError"
|}""".stripMargin)
}
@Test
@Timeout(180)
def clientSendingARespondTypeFieldShouldTriggerError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "Response",
| "id": "req-36",
| "using": [ "urn:ietf:params:jmap:core"],
| "methodCalls": [
| [
| "Core/echo",
| {
| "arg1": "arg1data",
| "arg2": "arg2data"
| },
| "c1"
| ]
| ]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "status":400,
| "detail":"The request was successfully parsed as JSON but did not match the type signature of the Request object: List((,List(JsonValidationError(List(Unknown @type field on a webSocket inbound message: Response),List()))))",
| "type":"urn:ietf:params:jmap:error:notRequest",
| "requestId":null,
| "@type":"RequestError"
|}""".stripMargin)
}
@Test
@Timeout(180)
def requestLevelErrorShouldReturnAPIError(server: GuiceJamesServer): Unit = {
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "Request",
| "using": [
| "urn:ietf:params:jmap:core",
| "urn:ietf:params:jmap:mail"],
| "methodCalls": [[
| "Mailbox/get",
| {
| "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
| "properties": ["invalidProperty"]
| },
| "c1"]]
|}""".stripMargin))
ws.receive().asPayload
})
.send(backend)
.body
assertThatJson(response.toOption.get)
.isEqualTo("""{
| "@type": "Response",
| "requestId": null,
| "sessionState": "2c9f1b12-b35a-43e6-9af2-0106fb53a943",
| "methodResponses": [["error",{"type":"invalidArguments","description":"The following properties [invalidProperty] do not exist."},"c1"]]
|}""".stripMargin)
}
@Test
@Timeout(180)
def pushEnableRequestsShouldBeProcessed(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val accountId: AccountId = AccountId.fromUsername(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
.contains(stateChange)
}
@Test
@Timeout(180)
def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
val davidPath = MailboxPath.inbox(DAVID)
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
// DAVID delegates BOB to access his account
server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["EmailDelivery"]
|}""".stripMargin))
Thread.sleep(100)
// DAVID has a new mail therefore EmailDelivery change
sendEmailTo(server, DAVID)
List(
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
// Bob should receive DAVID's EmailDelivery state change
assertThat(response.toOption.get.asJava)
.hasSize(1)
assertThatJson(response.toOption.get.asJava.get(0))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
}
@Test
@Timeout(180)
def ownerUserShouldStillReceiveHisChangesWhenHeDelegatesHisAccountToOtherUsers(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
// BOB delegates DAVID to access his account
server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(BOB, DAVID)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["EmailDelivery"]
|}""".stripMargin))
Thread.sleep(100)
// BOB has a new mail therefore EmailDelivery change
sendEmailTo(server, BOB)
List(
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
// Bob should receive his EmailDelivery state change
assertThat(response.toOption.get.asJava)
.hasSize(1)
assertThatJson(response.toOption.get.asJava.get(0))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
}
@Test
@Timeout(180)
def bobShouldReceiveHisChangesAndHisDelegatedAccountChanges(server: GuiceJamesServer): Unit = {
val davidPath = MailboxPath.inbox(DAVID)
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
// DAVID delegates BOB to access his account
server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["EmailDelivery"]
|}""".stripMargin))
Thread.sleep(100)
sendEmailTo(server, DAVID)
sendEmailTo(server, BOB)
sendEmailTo(server, DAVID)
sendEmailTo(server, BOB)
List(
ws.receive().asPayload,
ws.receive().asPayload,
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
// Bob should receive DAVID's change and his changes
assertThat(response.toOption.get.asJava)
.hasSize(4)
assertThatJson(response.toOption.get.asJava.get(0))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
assertThatJson(response.toOption.get.asJava.get(1))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
assertThatJson(response.toOption.get.asJava.get(2))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
assertThatJson(response.toOption.get.asJava.get(1))
.isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
}
@Test
@Timeout(180)
def mixingPushAndResponsesShouldBeSupported(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
def createEmail(ws: WebSocket[Identity]): Identity[Unit] = ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"]
|}""".stripMargin))
Thread.sleep(100)
createEmail(ws)
createEmail(ws)
createEmail(ws)
createEmail(ws)
createEmail(ws)
List.range(0, 10)
.map(i => ws.receive().asPayload)
})
.send(backend)
.body
// 5 changes, each one generate one response, one state change
assertThat(response.toOption.get.asJava).hasSize(10)
}
@Test
@Timeout(180)
def pushShouldHandleVacationResponses(server: GuiceJamesServer): Unit = {
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["VacationResponse"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:vacationresponse"],
| "methodCalls": [
| ["VacationResponse/set", {
| "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
| "update": {
| "singleton": {
| "isEnabled": true,
| "fromDate": "2014-10-30T14:12:00Z",
| "toDate": "2014-11-30T14:12:00Z",
| "subject": "I am in vacation",
| "textBody": "I'm currently enjoying life. Please disturb me later",
| "htmlBody": "I'm currently enjoying <b>life</b>. <br/>Please disturb me later"
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
assertThat(response.toOption.get.asJava).hasSize(2) // vacation notification + API response
assertThat(response.toOption.get.filter(s => s.contains(""""@type":"StateChange"""")).asJava)
.hasSize(1)
.allMatch(s => s.contains("VacationResponse"))
}
@Test
@Timeout(180)
// For client compatibility purposes
def specifiedUnHandledDataTypesShouldNotBeRejected(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val accountId: AccountId = AccountId.fromUsername(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email", "VacationResponse", "Thread", "Identity", "EmailSubmission", "EmailDelivery"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
.contains(stateChange)
}
@Test
@Timeout(180)
// For client compatibility purposes
def emailDeliveryShouldNotIncludeFlagUpdatesAndDeletes(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
val responseAsJson = Json.parse(ws.receive().asPayload)
.\("methodResponses")
.\(0).\(1)
.\("created")
.\("aaaaaa")
val messageId = responseAsJson
.\("id")
.get.asInstanceOf[JsString].value
Thread.sleep(100)
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email", "VacationResponse", "Thread", "Identity", "EmailSubmission", "EmailDelivery"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "@type": "Request",
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "update": {
| "$messageId":{
| "keywords": {
| "music": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
val stateChange1 = ws.receive().asPayload
val response1 = ws.receive().asPayload
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "@type": "Request",
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "destroy": ["$messageId"]
| }, "c1"]]
|}""".stripMargin))
Thread.sleep(100)
val stateChange2 = ws.receive().asPayload
val response2 = ws.receive().asPayload
List(response1, response2, stateChange1, stateChange2)
})
.send(backend)
.body
assertThat(response.toOption.get.asJava)
.hasSize(4) // update flags response + email state change notif + destroy response + email state change notif + mailbox state change notif (count)
assertThat(response.toOption.get.filter(s => s.startsWith("{\"@type\":\"StateChange\"")).asJava)
.hasSize(2)
.noneMatch(s => s.contains("EmailDelivery"))
}
@Test
@Timeout(180)
def dataTypesShouldDefaultToAll(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val accountId: AccountId = AccountId.fromUsername(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": null
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
.contains(stateChange)
}
@Test
@Timeout(180)
def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer): Unit = {
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": null
|}""".stripMargin))
Thread.sleep(100)
// Andre send mail to Bob
sendEmailTo(server, BOB)
List(
ws.receive().asPayload)
})
.send(backend)
.body
// Bob should receive EmailDelivery state change
assertThat(response.toOption.get.asJava)
.hasSize(1) // state change notification
.allMatch(s => s.contains("EmailDelivery"))
}
@Test
@Timeout(180)
def shouldNotPushEmailDeliveryChangeWhenCreateDraftMail(server: GuiceJamesServer): Unit = {
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": null
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(
ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
.noneMatch(s => s.contains("EmailDelivery"))
}
@Test
@Timeout(180)
def pushEnableShouldUpdatePreviousSubscriptions(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
val accountId: AccountId = AccountId.fromUsername(BOB)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
List(ws.receive().asPayload,
ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}"""
assertThat(response.toOption.get.asJava)
.hasSize(2) // Method response + Mailbox state change, no Email notification
.contains(mailboxStateChange)
}
@Test
@Timeout(180)
def pushShouldSupportDelegation(server: GuiceJamesServer): Unit = {
val mailboxProbe: MailboxProbeImpl = server.getProbe(classOf[MailboxProbeImpl])
val andrePath = MailboxPath.inbox(ANDRE)
mailboxProbe.createMailbox(andrePath)
server.getProbe(classOf[ACLProbeImpl])
.replaceRights(andrePath, BOB.asString, new MailboxACL.Rfc4314Rights(Right.Lookup, Right.Read))
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"]
|}""".stripMargin))
Thread.sleep(100)
val message: Message = Message.Builder
.of
.setSubject("test")
.setBody("testmail", StandardCharsets.UTF_8)
.build
mailboxProbe.appendMessage(ANDRE.asString(), andrePath, AppendCommand.from(message))
Thread.sleep(100)
List(ws.receive().asPayload)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val accountId: AccountId = AccountId.fromUsername(BOB)
val emailState: State = jmapGuiceProbe.getLatestEmailStateWithDelegation(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxStateWithDelegation(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(1)
.contains(stateChange)
}
@Test
@Timeout(180)
def pushCancelRequestsShouldDisableNotification(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val accountId: AccountId = AccountId.fromUsername(BOB)
val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, List[String]] =
authenticatedRequest(server)
.response(asWebSocket[Identity, List[String]] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"]
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushDisable"
|}""".stripMargin))
Thread.sleep(100)
ws.send(WebSocketFrame.text(
s"""{
| "@type": "Request",
| "id": "req-36",
| "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
| "methodCalls": [
| ["Email/set", {
| "accountId": "$ACCOUNT_ID",
| "create": {
| "aaaaaa":{
| "mailboxIds": {
| "${mailboxId.serialize}": true
| }
| }
| }
| }, "c1"]]
|}""".stripMargin))
val response = ws.receive().asPayload
val maybeNotification: String = Try(SMono.fromCallable(() =>
ws.receive().asPayload)
.subscribeOn(Schedulers.newSingle("test"))
.block(scala.concurrent.duration.Duration.fromNanos(100000000)))
.fold(e => "No notification received", s => s)
List(response, maybeNotification)
})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString
val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString
val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}"""
val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"$emailState"}}}"""
assertThat(response.toOption.get.asJava)
.hasSize(2) // Email create response + no notification message
.contains("No notification received")
.doesNotContain(mailboxStateChange, emailStateChange)
}
@Test
@Timeout(180)
def pushCancelRequestAsFirstMessageShouldBeProcessedNormally(server: GuiceJamesServer): Unit = {
Thread.sleep(100)
authenticatedRequest(server)
.response(asWebSocket[Identity, Unit] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushDisable"
|}""".stripMargin))
Thread.sleep(100)
assertThat(ws.isOpen()).isTrue
})
.send(backend)
.body
}
@Test
@Timeout(180)
def pushEnableRequestWithPushStateShouldReturnServerState(server: GuiceJamesServer): Unit = {
val bobPath = MailboxPath.inbox(BOB)
val accountId: AccountId = AccountId.fromUsername(BOB)
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
Thread.sleep(100)
val response: Either[String, String] =
authenticatedRequest(server)
.response(asWebSocket[Identity, String] {
ws =>
ws.send(WebSocketFrame.text(
"""{
| "@type": "WebSocketPushEnable",
| "dataTypes": ["Mailbox", "Email"],
| "pushState": "aaa"
|}""".stripMargin))
Thread.sleep(100)
ws.receive().asPayload})
.send(backend)
.body
Thread.sleep(100)
val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val globalState: PushState = PushState.from(UuidState(mailboxState.getValue), UuidState(emailState.getValue))
val pushEnableResponse: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}","Email":"${emailState.getValue}"}},"pushState":"${globalState.value}"}"""
assertThat(response.toOption.get)
.isEqualTo(pushEnableResponse)
}
private def authenticatedRequest(server: GuiceJamesServer): RequestT[Identity, Either[String, String], Any] = {
val port = server.getProbe(classOf[JmapGuiceProbe])
.getJmapPort
.getValue
basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
.header("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
.header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
}
private def unauthenticatedRequest(server: GuiceJamesServer): RequestT[Identity, Either[String, String], Any] = {
val port = server.getProbe(classOf[JmapGuiceProbe])
.getJmapPort
.getValue
basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
.header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
}
private def sendEmailTo(server: GuiceJamesServer, recipient: Username): Unit = {
val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
.authenticate(ANDRE.asString, ANDRE_PASSWORD)
.sendMessage(ANDRE.asString, recipient.asString())
smtpMessageSender.close()
awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
}
}