JAMES-3378 email query filter by keyword
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala
index a4a7068..6afcdcf 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailQueryMethodContract.scala
@@ -24,16 +24,18 @@
 import java.time.format.DateTimeFormatter
 import java.util.Date
 import java.util.concurrent.TimeUnit
+import java.util.stream.Stream
 
 import com.google.common.hash.Hashing
 import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
 import io.restassured.RestAssured.{`given`, requestSpecification}
 import io.restassured.http.ContentType.JSON
+import javax.mail.Flags
 import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
 import org.apache.http.HttpStatus.SC_OK
 import org.apache.james.GuiceJamesServer
 import org.apache.james.jmap.http.UserCredential
-import org.apache.james.jmap.model.UTCDate
+import org.apache.james.jmap.model.{Keyword, UTCDate}
 import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
 import org.apache.james.mailbox.MessageManager.AppendCommand
 import org.apache.james.mailbox.model.{MailboxPath, MessageId}
@@ -43,6 +45,20 @@
 import org.awaitility.Awaitility
 import org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS
 import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+object EmailQueryMethodContract{
+  def jmapSystemKeywords : Stream[Arguments] = {
+    Stream.of(
+      Arguments.of(new Flags(Flags.Flag.SEEN), "$Seen"),
+      Arguments.of(new Flags(Flags.Flag.ANSWERED), "$Answered"),
+      Arguments.of(new Flags(Flags.Flag.FLAGGED), "$Flagged"),
+      Arguments.of(new Flags(Flags.Flag.DRAFT), "$Draft"),
+      Arguments.of(new Flags("$Forwarded"), "$Forwarded")
+    )
+  }
+}
 
 trait EmailQueryMethodContract {
 
@@ -858,6 +874,107 @@
     }
   }
 
+  @ParameterizedTest
+  @MethodSource(value = Array("jmapSystemKeywords"))
+  def listMailsBySystemKeywordShouldReturnOnlyMailsWithThisSystemKeyword(keywordFlag: Flags, keywordName: String, server: GuiceJamesServer): Unit = {
+    val message: Message = Message.Builder
+      .of
+      .setSubject("test")
+      .setBody("testmail", StandardCharsets.UTF_8)
+      .build
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    val messageId = server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessage(BOB.asString, MailboxPath.inbox(BOB), AppendCommand.builder().withFlags(keywordFlag).build(message))
+      .getMessageId
+    server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessage(BOB.asString, MailboxPath.inbox(BOB), AppendCommand.builder().build(message))
+      .getMessageId
+
+    val request =
+      s"""{
+         |  "using": [
+         |    "urn:ietf:params:jmap:core",
+         |    "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [[
+         |    "Email/query",
+         |    {
+         |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
+         |      "filter" : {
+         |        "hasKeyword": "$keywordName"
+         |      }
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      val response = `given`
+        .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+        .body(request)
+        .when
+        .post
+        .`then`
+        .statusCode(SC_OK)
+        .contentType(JSON)
+        .extract
+        .body
+        .asString
+
+      assertThatJson(response)
+        .inPath("$.methodResponses[0][1].ids")
+        .isEqualTo(s"""["${messageId.serialize()}"]""")
+    }
+  }
+
+  @Test
+  def listMailsByCustomKeywordShouldReturnOnlyMailsWithThisCustomKeyword(server: GuiceJamesServer): Unit = {
+    val message: Message = Message.Builder
+      .of
+      .setSubject("test")
+      .setBody("testmail", StandardCharsets.UTF_8)
+      .build
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    val messageId = server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessage(BOB.asString, MailboxPath.inbox(BOB), AppendCommand.builder().withFlags(new Flags("custom")).build(message))
+      .getMessageId
+    server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessage(BOB.asString, MailboxPath.inbox(BOB), AppendCommand.builder().build(message))
+      .getMessageId
+
+    val request =
+      s"""{
+         |  "using": [
+         |    "urn:ietf:params:jmap:core",
+         |    "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [[
+         |    "Email/query",
+         |    {
+         |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
+         |      "filter" : {
+         |        "hasKeyword": "custom"
+         |      }
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      val response = `given`
+        .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+        .body(request)
+        .when
+        .post
+        .`then`
+        .statusCode(SC_OK)
+        .contentType(JSON)
+        .extract
+        .body
+        .asString
+
+      assertThatJson(response)
+        .inPath("$.methodResponses[0][1].ids")
+        .isEqualTo(s"""["${messageId.serialize()}"]""")
+    }
+  }
+
   private def generateQueryState(messages: MessageId*): String = {
     Hashing.murmur3_32()
       .hashUnencodedChars(messages.toList.map(_.serialize()).mkString(" "))
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/EmailQuerySerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/EmailQuerySerializer.scala
index 335f574..955a11c 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/EmailQuerySerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/EmailQuerySerializer.scala
@@ -37,6 +37,7 @@
     case _ => JsError()
   }
 
+  private implicit val keywordReads: Reads[Keyword] = Json.valueReads[Keyword]
   private implicit val filterConditionReads: Reads[FilterCondition] = Json.reads[FilterCondition]
   private implicit val emailQueryRequestReads: Reads[EmailQueryRequest] = Json.reads[EmailQueryRequest]
   private implicit val canCalculateChangeWrites: Writes[CanCalculateChange] = Json.valueWrites[CanCalculateChange]
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailQuery.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailQuery.scala
index d5a0302..81fc4c2 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailQuery.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailQuery.scala
@@ -19,13 +19,14 @@
 
 package org.apache.james.jmap.mail
 
-import org.apache.james.jmap.model.{AccountId, CanCalculateChange, Limit, Position, QueryState, UTCDate}
+import org.apache.james.jmap.model.{AccountId, CanCalculateChange, Keyword, Limit, Position, QueryState, UTCDate}
 import org.apache.james.mailbox.model.{MailboxId, MessageId}
 
 case class FilterCondition(inMailbox: Option[MailboxId],
                            inMailboxOtherThan: Option[Seq[MailboxId]],
                            before: Option[UTCDate],
-                           after: Option[UTCDate])
+                           after: Option[UTCDate],
+                           hasKeyword: Option[Keyword])
 
 case class EmailQueryRequest(accountId: AccountId, filter: Option[FilterCondition])
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/utils/search/MailboxFilter.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/utils/search/MailboxFilter.scala
index 3bb4cf8..49ce892 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/utils/search/MailboxFilter.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/utils/search/MailboxFilter.scala
@@ -20,8 +20,9 @@
 
 import java.util.Date
 
+import javax.mail.Flags
 import org.apache.james.jmap.mail.EmailQueryRequest
-import org.apache.james.mailbox.model.SearchQuery.{Conjunction, ConjunctionCriterion, Criterion, DateComparator, DateOperator, DateResolution, InternalDateCriterion}
+import org.apache.james.mailbox.model.SearchQuery.{Conjunction, ConjunctionCriterion, Criterion, DateComparator, DateOperator, DateResolution, FlagCriterion, InternalDateCriterion}
 import org.apache.james.mailbox.model.{MultimailboxesSearchQuery, SearchQuery}
 
 import scala.jdk.CollectionConverters._
@@ -60,7 +61,7 @@
 
   object QueryFilter {
     def buildQuery(request: EmailQueryRequest): SearchQuery.Builder = {
-      List(ReceivedBefore, ReceivedAfter).foldLeft(new SearchQuery.Builder())((builder, filter) => filter.toQuery(builder, request))
+      List(ReceivedBefore, ReceivedAfter, HasKeyWord).foldLeft(new SearchQuery.Builder())((builder, filter) => filter.toQuery(builder, request))
     }
   }
 
@@ -84,4 +85,15 @@
       case None => builder
     }
   }
+
+  case object HasKeyWord extends QueryFilter {
+    override def toQuery(builder: SearchQuery.Builder, request: EmailQueryRequest): SearchQuery.Builder =  request.filter.flatMap(_.hasKeyword) match {
+      case Some(keyword) =>
+        keyword.asSystemFlag match {
+          case Some(systemFlag) => builder.andCriteria(SearchQuery.flagIsSet(systemFlag))
+          case None => builder.andCriteria(SearchQuery.flagIsSet(keyword.flagName))
+        }
+      case None => builder
+    }
+  }
 }