blob: 5d2422220e6230c36ce3f901e29e989a7117f401 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailbox.elasticsearch.events;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.MessageMapperFactory;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticSearchListeningMessageSearchIndex.class);
private final static String ID_SEPARATOR = ":";
private final ElasticSearchIndexer indexer;
private final ElasticSearchSearcher searcher;
private final MessageToElasticSearchJson messageToElasticSearchJson;
@Inject
public ElasticSearchListeningMessageSearchIndex(MessageMapperFactory factory, ElasticSearchIndexer indexer,
ElasticSearchSearcher searcher, MessageToElasticSearchJson messageToElasticSearchJson) {
super(factory);
this.indexer = indexer;
this.messageToElasticSearchJson = messageToElasticSearchJson;
this.searcher = searcher;
}
@Override
public ListenerType getType() {
return ListenerType.ONCE;
}
@Override
public EnumSet<SearchCapabilities> getSupportedCapabilities() {
return EnumSet.of(SearchCapabilities.MultimailboxSearch, SearchCapabilities.Text);
}
@Override
public Iterator<Long> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
MailboxId mailboxId = mailbox.getMailboxId();
MultimailboxesSearchQuery query = MultimailboxesSearchQuery.from(searchQuery).inMailboxes(mailboxId).build();
return searcher
.search(ImmutableList.of(session.getUser()), query)
.get(mailboxId)
.iterator();
}
@Override
public Map<MailboxId, Collection<Long>> search(MailboxSession session, MultimailboxesSearchQuery searchQuery)
throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
return searcher.search(ImmutableList.of(session.getUser()), searchQuery).asMap();
}
@Override
public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws MailboxException {
try {
indexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJson(message, ImmutableList.of(session.getUser())));
} catch (Exception e) {
LOGGER.error("Error when indexing message " + message.getUid(), e);
}
}
@Override
public void delete(MailboxSession session, Mailbox mailbox, List<Long> expungedUids) throws MailboxException {
try {
indexer.deleteMessages(expungedUids.stream()
.map(uid -> indexIdFor(mailbox, uid))
.collect(Collectors.toList()));
} catch (Exception e) {
LOGGER.error("Error when deleting messages {} in mailbox {} from index", mailbox.getMailboxId().serialize(), expungedUids, e);
}
}
@Override
public void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxException {
try {
indexer.deleteAllMatchingQuery(
termQuery(
JsonMessageConstants.MAILBOX_ID,
mailbox.getMailboxId().serialize()));
} catch (Exception e) {
LOGGER.error("Error when deleting all messages in mailbox {}", mailbox.getMailboxId().serialize(), e);
}
}
@Override
public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws MailboxException {
try {
indexer.updateMessages(updatedFlagsList.stream()
.map(updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
.collect(Collectors.toList()));
} catch (Exception e) {
LOGGER.error("Error when updating index on mailbox {}", mailbox.getMailboxId().serialize(), e);
}
}
private ElasticSearchIndexer.UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) {
try {
return new ElasticSearchIndexer.UpdatedRepresentation(
indexIdFor(mailbox, updatedFlags.getUid()),
messageToElasticSearchJson.getUpdatedJsonMessagePart(
updatedFlags.getNewFlags(),
updatedFlags.getModSeq()));
} catch (JsonProcessingException e) {
throw new RuntimeException("Error while creating updatedDocumentParts", e);
}
}
private String indexIdFor(Mailbox mailbox, long messageId) {
return String.join(ID_SEPARATOR, mailbox.getMailboxId().serialize(), String.valueOf(messageId));
}
}