blob: f2116f1e4d918a7a8e6c1d35f007cb10beea7adf [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.es.v7;
import static org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.util.Optional;
import javax.inject.Inject;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
public class IndexCreationFactory {
public static class AliasSpecificationStep {
private final int nbShards;
private final int nbReplica;
private final int waitForActiveShards;
private final IndexName indexName;
private final ImmutableList.Builder<AliasName> aliases;
AliasSpecificationStep(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName) {
this.nbShards = nbShards;
this.nbReplica = nbReplica;
this.waitForActiveShards = waitForActiveShards;
this.indexName = indexName;
this.aliases = ImmutableList.builder();
}
public AliasSpecificationStep addAlias(AliasName aliasName) {
Preconditions.checkNotNull(aliasName);
this.aliases.add(aliasName);
return this;
}
public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) {
return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build()).createIndexAndAliases(client, Optional.empty());
}
public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client, XContentBuilder mappingContent) {
return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build()).createIndexAndAliases(client, Optional.of(mappingContent));
}
}
static class IndexCreationPerformer {
private final int nbShards;
private final int nbReplica;
private final int waitForActiveShards;
private final IndexName indexName;
private final ImmutableList<AliasName> aliases;
public IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName, ImmutableList<AliasName> aliases) {
this.nbShards = nbShards;
this.nbReplica = nbReplica;
this.waitForActiveShards = waitForActiveShards;
this.indexName = indexName;
this.aliases = aliases;
}
public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client, Optional<XContentBuilder> mappingContent) {
Preconditions.checkNotNull(indexName);
try {
createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica, waitForActiveShards), mappingContent);
aliases.forEach(Throwing.<AliasName>consumer(alias -> createAliasIfNeeded(client, indexName, alias))
.sneakyThrow());
} catch (IOException e) {
LOGGER.error("Error while creating index : ", e);
}
return client;
}
private void createAliasIfNeeded(ReactorElasticSearchClient client, IndexName indexName, AliasName aliasName) throws IOException {
if (!aliasExist(client, aliasName)) {
client.indices()
.updateAliases(
new IndicesAliasesRequest().addAliasAction(
new AliasActions(AliasActions.Type.ADD)
.index(indexName.getValue())
.alias(aliasName.getValue())),
RequestOptions.DEFAULT);
}
}
private boolean aliasExist(ReactorElasticSearchClient client, AliasName aliasName) throws IOException {
return client.indices()
.existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT);
}
private void createIndexIfNeeded(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder settings, Optional<XContentBuilder> mappingContent) throws IOException {
try {
CreateIndexRequest request = new CreateIndexRequest(indexName.getValue()).source(settings);
mappingContent.ifPresent(request::mapping);
client.indices().create(
request,
RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException exception) {
if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
LOGGER.info("Index [{}] already exists", indexName.getValue());
} else {
throw exception;
}
}
}
private XContentBuilder generateSetting(int nbShards, int nbReplica, int waitForActiveShards) throws IOException {
return jsonBuilder()
.startObject()
.startObject("settings")
.field("number_of_shards", nbShards)
.field("number_of_replicas", nbReplica)
.field("index.write.wait_for_active_shards", waitForActiveShards)
.startObject("analysis")
.startObject("normalizer")
.startObject(CASE_INSENSITIVE)
.field("type", "custom")
.startArray("char_filter")
.endArray()
.startArray("filter")
.value("lowercase")
.value("asciifolding")
.endArray()
.endObject()
.endObject()
.startObject("analyzer")
.startObject(KEEP_MAIL_AND_URL)
.field("tokenizer", "uax_url_email")
.startArray("filter")
.value("lowercase")
.value("stop")
.endArray()
.endObject()
.startObject(SNOWBALL_KEEP_MAIL_AND_URL)
.field("tokenizer", "uax_url_email")
.startArray("filter")
.value("lowercase")
.value("stop")
.value(ENGLISH_SNOWBALL)
.endArray()
.endObject()
.endObject()
.startObject("filter")
.startObject(ENGLISH_SNOWBALL)
.field("type", "snowball")
.field("language", "English")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
private static final String INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE = "type=resource_already_exists_exception";
private final int nbShards;
private final int nbReplica;
private final int waitForActiveShards;
public static final String CASE_INSENSITIVE = "case_insensitive";
public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
public static final String ENGLISH_SNOWBALL = "english_snowball";
public static final String BOOLEAN = "boolean";
public static final String TYPE = "type";
public static final String LONG = "long";
public static final String DOUBLE = "double";
public static final String KEYWORD = "keyword";
public static final String PROPERTIES = "properties";
public static final String ROUTING = "_routing";
public static final String REQUIRED = "required";
public static final String DATE = "date";
public static final String FORMAT = "format";
public static final String NESTED = "nested";
public static final String FIELDS = "fields";
public static final String RAW = "raw";
public static final String ANALYZER = "analyzer";
public static final String NORMALIZER = "normalizer";
public static final String SEARCH_ANALYZER = "search_analyzer";
@Inject
public IndexCreationFactory(ElasticSearchConfiguration configuration) {
this.nbShards = configuration.getNbShards();
this.nbReplica = configuration.getNbReplica();
this.waitForActiveShards = configuration.getWaitForActiveShards();
}
public AliasSpecificationStep useIndex(IndexName indexName) {
Preconditions.checkNotNull(indexName);
return new AliasSpecificationStep(nbShards, nbReplica, waitForActiveShards, indexName);
}
}