blob: 9487241b7dc5a283d228e2f601d995017bfc9042 [file] [log] [blame]
/**
* *************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
***************************************************************/
package org.apache.james.mailbox.opensearch.search;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.james.backends.opensearch.DockerElasticSearchExtension;
import org.apache.james.backends.opensearch.ElasticSearchIndexer;
import org.apache.james.backends.opensearch.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.opensearch.IndexAttachments;
import org.apache.james.mailbox.opensearch.MailboxElasticSearchConstants;
import org.apache.james.mailbox.opensearch.MailboxIdRoutingKeyFactory;
import org.apache.james.mailbox.opensearch.MailboxIndexCreationUtil;
import org.apache.james.mailbox.opensearch.events.ElasticSearchListeningMessageSearchIndex;
import org.apache.james.mailbox.opensearch.json.MessageToElasticSearchJson;
import org.apache.james.mailbox.opensearch.query.CriterionConverter;
import org.apache.james.mailbox.opensearch.query.QueryConverter;
import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.tika.TikaConfiguration;
import org.apache.james.mailbox.tika.TikaExtension;
import org.apache.james.mailbox.tika.TikaHttpClientImpl;
import org.apache.james.mailbox.tika.TikaTextExtractor;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.mime4j.dom.Message;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionFactory;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
class ElasticSearchSearcherTest {
static final int SEARCH_SIZE = 1;
private static final Username USERNAME = Username.of("user");
private static final ConditionFactory CALMLY_AWAIT = Awaitility
.with().pollInterval(ONE_HUNDRED_MILLISECONDS)
.and().pollDelay(ONE_HUNDRED_MILLISECONDS)
.await();
@RegisterExtension
static TikaExtension tika = new TikaExtension();
@RegisterExtension
DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
TikaTextExtractor textExtractor;
ReactorElasticSearchClient client;
private InMemoryMailboxManager storeMailboxManager;
@BeforeEach
void setUp() throws Exception {
textExtractor = new TikaTextExtractor(new RecordingMetricFactory(),
new TikaHttpClientImpl(TikaConfiguration.builder()
.host(tika.getIp())
.port(tika.getPort())
.timeoutInMillis(tika.getTimeoutInMillis())
.build()));
client = MailboxIndexCreationUtil.prepareDefaultClient(
elasticSearch.getDockerElasticSearch().clientProvider().get(),
elasticSearch.getDockerElasticSearch().configuration());
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
.preProvisionnedFakeAuthenticator()
.fakeAuthorizator()
.inVmEventBus()
.defaultAnnotationLimits()
.defaultMessageParser()
.listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
preInstanciationStage.getMapperFactory(),
ImmutableSet.of(),
new ElasticSearchIndexer(client,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS),
new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
new MessageToElasticSearchJson(textExtractor, ZoneId.of("Europe/Paris"), IndexAttachments.YES),
preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory))
.noPreDeletionHooks()
.storeQuotaManager()
.build();
storeMailboxManager = resources.getMailboxManager();
}
@AfterEach
void tearDown() throws IOException {
client.close();
}
@Test
void searchingInALargeNumberOfMailboxesShouldReturnAllMailboxesMessagesUid() throws Exception {
MailboxSession session = MailboxSessionUtil.create(USERNAME);
int numberOfMailboxes = 700;
List<MailboxPath> mailboxPaths = IntStream
.range(0, numberOfMailboxes)
.mapToObj(index -> MailboxPath.forUser(USERNAME, "mailbox" + index))
.collect(ImmutableList.toImmutableList());
List<MailboxId> mailboxIds = mailboxPaths.stream()
.map(Throwing.<MailboxPath, MailboxId>function(mailboxPath -> storeMailboxManager.createMailbox(mailboxPath, session).get()).sneakyThrow())
.collect(ImmutableList.toImmutableList());
List<ComposedMessageId> composedMessageIds = mailboxPaths.stream()
.map(Throwing.<MailboxPath, ComposedMessageId>function(mailboxPath -> addMessage(session, mailboxPath)).sneakyThrow())
.collect(ImmutableList.toImmutableList());
awaitForElasticSearch(QueryBuilders.matchAllQuery(), composedMessageIds.size());
MultimailboxesSearchQuery multimailboxesSearchQuery = MultimailboxesSearchQuery
.from(SearchQuery.of(SearchQuery.all()))
.inMailboxes(mailboxIds)
.build();
List<MessageId> expectedMessageIds = composedMessageIds
.stream()
.map(ComposedMessageId::getMessageId)
.collect(ImmutableList.toImmutableList());
assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1)
.collectList().block())
.containsExactlyInAnyOrderElementsOf(expectedMessageIds);
}
private ComposedMessageId addMessage(MailboxSession session, MailboxPath mailboxPath) throws Exception {
MessageManager messageManager = storeMailboxManager.getMailbox(mailboxPath, session);
String recipient = "user@example.com";
return messageManager.appendMessage(MessageManager.AppendCommand.from(
Message.Builder.of()
.setTo(recipient)
.setBody("Hello", StandardCharsets.UTF_8)),
session)
.getId();
}
private void awaitForElasticSearch(QueryBuilder query, long totalHits) {
CALMLY_AWAIT.atMost(Durations.TEN_SECONDS)
.untilAsserted(() -> assertThat(client.search(
new SearchRequest(MailboxElasticSearchConstants.DEFAULT_MAILBOX_INDEX.getValue())
.source(new SearchSourceBuilder().query(query)),
RequestOptions.DEFAULT)
.block()
.getHits().getTotalHits().value).isEqualTo(totalHits));
}
}