blob: 2eb4cf18027a7bad8a78d3f54456cb77edc3280a [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailbox.elasticsearch.search;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.mailbox.MailboxSession.User;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
import org.apache.james.mailbox.elasticsearch.query.SortConverter;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxId.Factory;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Multimap;
public class ElasticSearchSearcher {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
private static final TimeValue TIMEOUT = new TimeValue(60000);
public static final int DEFAULT_SIZE = 100;
private final Client client;
private final QueryConverter queryConverter;
private final int size;
private final Factory mailboxIdFactory;
@Inject
public ElasticSearchSearcher(Client client, QueryConverter queryConverter, MailboxId.Factory mailboxIdFactory) {
this(client, queryConverter, DEFAULT_SIZE, mailboxIdFactory);
}
public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size, MailboxId.Factory mailboxIdFactory) {
this.client = client;
this.queryConverter = queryConverter;
this.size = size;
this.mailboxIdFactory = mailboxIdFactory;
}
public Multimap<MailboxId, Long> search(List<User> users, MultimailboxesSearchQuery query) throws MailboxException {
return new ScrollIterable(client, getSearchRequestBuilder(client, users, query)).stream()
.flatMap(this::transformResponseToUidStream)
.collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight));
}
private SearchRequestBuilder getSearchRequestBuilder(Client client, List<User> users, MultimailboxesSearchQuery query) {
return query.getSearchQuery().getSorts()
.stream()
.reduce(
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
.setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
.setScroll(TIMEOUT)
.addFields(JsonMessageConstants.ID, JsonMessageConstants.MAILBOX_ID)
.setQuery(queryConverter.from(users, query))
.setSize(size),
(searchBuilder, sort) -> searchBuilder.addSort(SortConverter.convertSort(sort)),
(partialResult1, partialResult2) -> partialResult1);
}
private Stream<Pair<MailboxId, Long>> transformResponseToUidStream(SearchResponse searchResponse) {
return StreamSupport.stream(searchResponse.getHits().spliterator(), false)
.map(this::extractContentFromHit)
.filter(Optional::isPresent)
.map(Optional::get);
}
private Optional<Pair<MailboxId, Long>> extractContentFromHit(SearchHit hit) {
SearchHitField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
SearchHitField uid = hit.field(JsonMessageConstants.ID);
if (mailboxId != null && uid != null) {
Number uidAsNumber = uid.getValue();
return Optional.of(Pair.of(mailboxIdFactory.fromString(mailboxId.getValue()), uidAsNumber.longValue()));
} else {
LOGGER.warn("Can not extract UID and/or MailboxId for search result " + hit.getId());
return Optional.empty();
}
}
}