blob: 812c18470013de275da52b3937a8426428d896b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.persistence.elasticsearch;
import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.TimestampedItem;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.DateRange;
import org.apache.unomi.api.query.IpRange;
import org.apache.unomi.api.query.NumericRange;
import org.apache.unomi.metrics.MetricAdapter;
import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.osgi.framework.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
public static final String NUMBER_OF_SHARDS = "number_of_shards";
public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
public static final String CLUSTER_NAME = "cluster.name";
public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
public static final String INDEX_DATE_PREFIX = "date-";
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private RestHighLevelClient client;
private BulkProcessor bulkProcessor;
private String elasticSearchAddresses;
private List<String> elasticSearchAddressList = new ArrayList<>();
private String clusterName;
private String indexPrefix;
private String monthlyIndexNumberOfShards;
private String monthlyIndexNumberOfReplicas;
private String numberOfShards;
private String numberOfReplicas;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
private Integer defaultQueryLimit = 10;
private String bulkProcessorConcurrentRequests = "1";
private String bulkProcessorBulkActions = "1000";
private String bulkProcessorBulkSize = "5MB";
private String bulkProcessorFlushInterval = "5s";
private String bulkProcessorBackoffPolicy = "exponential";
private String minimalElasticSearchVersion = "7.0.0";
private String maximalElasticSearchVersion = "8.0.0";
// authentication props
private String username;
private String password;
private boolean sslEnable = false;
private boolean sslTrustAllCertificates = false;
private int aggregateQueryBucketSize = 5000;
private MetricsService metricsService;
private HazelcastInstance hazelcastInstance;
private Set<String> itemClassesToCacheSet = new HashSet<>();
private String itemClassesToCache;
private boolean useBatchingForSave = false;
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setElasticSearchAddresses(String elasticSearchAddresses) {
this.elasticSearchAddresses = elasticSearchAddresses;
String[] elasticSearchAddressesArray = elasticSearchAddresses.split(",");
elasticSearchAddressList.clear();
for (String elasticSearchAddress : elasticSearchAddressesArray) {
elasticSearchAddressList.add(elasticSearchAddress.trim());
}
}
public void setIndexPrefix(String indexPrefix) {
this.indexPrefix = indexPrefix;
}
public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) {
this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
}
public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) {
this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
}
public void setNumberOfShards(String numberOfShards) {
this.numberOfShards = numberOfShards;
}
public void setNumberOfReplicas(String numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
}
public void setDefaultQueryLimit(Integer defaultQueryLimit) {
this.defaultQueryLimit = defaultQueryLimit;
}
public void setItemsMonthlyIndexed(List<String> itemsMonthlyIndexed) {
this.itemsMonthlyIndexed = itemsMonthlyIndexed;
}
public void setRoutingByType(Map<String, String> routingByType) {
this.routingByType = routingByType;
}
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
}
public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) {
this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
}
public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
this.bulkProcessorBulkActions = bulkProcessorBulkActions;
}
public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) {
this.bulkProcessorBulkSize = bulkProcessorBulkSize;
}
public void setBulkProcessorFlushInterval(String bulkProcessorFlushInterval) {
this.bulkProcessorFlushInterval = bulkProcessorFlushInterval;
}
public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) {
this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
}
public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
this.minimalElasticSearchVersion = minimalElasticSearchVersion;
}
public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
this.maximalElasticSearchVersion = maximalElasticSearchVersion;
}
public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
public void setItemClassesToCache(String itemClassesToCache) {
this.itemClassesToCache = itemClassesToCache;
if (StringUtils.isNotBlank(itemClassesToCache)) {
String[] itemClassesToCacheParts = itemClassesToCache.split(",");
if (itemClassesToCacheParts != null) {
itemClassesToCacheSet.clear();
for (String itemClassToCache : itemClassesToCacheParts) {
itemClassesToCacheSet.add(itemClassToCache.trim());
}
}
}
}
public void setUseBatchingForSave(boolean useBatchingForSave) {
this.useBatchingForSave = useBatchingForSave;
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setSslEnable(boolean sslEnable) {
this.sslEnable = sslEnable;
}
public void setSslTrustAllCertificates(boolean sslTrustAllCertificates) {
this.sslTrustAllCertificates = sslTrustAllCertificates;
}
public void start() throws Exception {
// on startup
new InClassLoaderExecute<Object>(null, null) {
public Object execute(Object... args) throws Exception {
bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests);
bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions);
bulkProcessorBulkSize = System.getProperty(BULK_PROCESSOR_BULK_SIZE, bulkProcessorBulkSize);
bulkProcessorFlushInterval = System.getProperty(BULK_PROCESSOR_FLUSH_INTERVAL, bulkProcessorFlushInterval);
bulkProcessorBackoffPolicy = System.getProperty(BULK_PROCESSOR_BACKOFF_POLICY, bulkProcessorBackoffPolicy);
// this property is used for integration tests, to make sure we don't conflict with an already running ElasticSearch instance.
if (System.getProperty("org.apache.unomi.itests.elasticsearch.http.port") != null) {
elasticSearchAddressList.clear();
elasticSearchAddressList.add("localhost:" + System.getProperty("org.apache.unomi.itests.elasticsearch.http.port"));
logger.info("Overriding ElasticSearch address list from system property=" + elasticSearchAddressList);
}
// this property is used for integration tests, to make sure we don't conflict with an already running ElasticSearch instance.
if (System.getProperty("org.apache.unomi.itests.elasticsearch.cluster.name") != null) {
clusterName = System.getProperty("org.apache.unomi.itests.elasticsearch.cluster.name");
logger.info("Overriding cluster name from system property=" + clusterName);
}
buildClient();
MainResponse response = client.info(RequestOptions.DEFAULT);
org.elasticsearch.client.core.MainResponse.Version version = response.getVersion();
org.elasticsearch.Version clusterVersion = org.elasticsearch.Version.fromString(version.getNumber());
org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion);
org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion);
if (clusterVersion.before(minimalVersion) ||
clusterVersion.equals(maximalVersion) ||
clusterVersion.after(maximalVersion)) {
throw new Exception("ElasticSearch version is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
}
loadPredefinedMappings(bundleContext, false);
// load predefined mappings and condition dispatchers of any bundles that were started before this one.
for (Bundle existingBundle : bundleContext.getBundles()) {
if (existingBundle.getBundleContext() != null) {
loadPredefinedMappings(existingBundle.getBundleContext(), false);
}
}
createMonthlyIndexTemplate();
if (client != null && bulkProcessor == null) {
bulkProcessor = getBulkProcessor();
}
logger.info("Waiting for GREEN cluster status...");
client.cluster().health(new ClusterHealthRequest().waitForGreenStatus(), RequestOptions.DEFAULT);
logger.info("Cluster status is GREEN");
return true;
}
}.executeInClassLoader();
bundleContext.addBundleListener(this);
logger.info(this.getClass().getName() + " service started successfully.");
}
private void buildClient() {
List<Node> nodeList = new ArrayList<>();
for (String elasticSearchAddress : elasticSearchAddressList) {
String[] elasticSearchAddressParts = elasticSearchAddress.split(":");
String elasticSearchHostName = elasticSearchAddressParts[0];
int elasticSearchPort = Integer.parseInt(elasticSearchAddressParts[1]);
// configure authentication
nodeList.add(new Node(new HttpHost(elasticSearchHostName, elasticSearchPort, sslEnable ? "https" : "http")));
}
RestClientBuilder clientBuilder = RestClient.builder(nodeList.toArray(new Node[nodeList.size()]));
clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
if (sslTrustAllCertificates) {
try {
final SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, new TrustManager[]{new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs,
String authType) {
}
public void checkServerTrusted(X509Certificate[] certs,
String authType) {
}
}}, new SecureRandom());
httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier());
} catch (NoSuchAlgorithmException | KeyManagementException e) {
logger.error("Error creating SSL Context for trust all certificates", e);
}
}
if (StringUtils.isNotBlank(username)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
});
logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
client = new RestHighLevelClient(clientBuilder);
}
public BulkProcessor getBulkProcessor() {
if (bulkProcessor != null) {
return bulkProcessor;
}
BulkProcessor.Listener bulkProcessorListener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
logger.debug("Before Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
logger.debug("After Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
logger.error("After Bulk (failure)", failure);
}
};
BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
bulkProcessorListener);
if (bulkProcessorConcurrentRequests != null) {
int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests);
if (concurrentRequests > 1) {
bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
}
}
if (bulkProcessorBulkActions != null) {
int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
bulkProcessorBuilder.setBulkActions(bulkActions);
}
if (bulkProcessorBulkSize != null) {
bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
}
if (bulkProcessorFlushInterval != null) {
bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval, null, BULK_PROCESSOR_FLUSH_INTERVAL));
} else {
// in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
}
if (bulkProcessorBackoffPolicy != null) {
String backoffPolicyStr = bulkProcessorBackoffPolicy;
if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
backoffPolicyStr = backoffPolicyStr.toLowerCase();
if ("nobackoff".equals(backoffPolicyStr)) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff());
} else if (backoffPolicyStr.startsWith("constant(")) {
int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
} else if (backoffPolicyStr.startsWith("exponential")) {
if (!backoffPolicyStr.contains("(")) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff());
} else {
// we detected parameters, must process them.
int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
}
}
}
}
bulkProcessor = bulkProcessorBuilder.build();
return bulkProcessor;
}
public void stop() {
new InClassLoaderExecute<Object>(null, null) {
protected Object execute(Object... args) throws IOException {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
try {
bulkProcessor.awaitClose(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("Error waiting for bulk operations to flush !", e);
}
}
if (client != null) {
client.close();
}
return null;
}
}.catchingExecuteInClassLoader(true);
bundleContext.removeBundleListener(this);
}
public void bindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
ConditionEvaluator conditionEvaluator = bundleContext.getService(conditionEvaluatorServiceReference);
conditionEvaluatorDispatcher.addEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(), conditionEvaluator);
}
public void unbindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
if (conditionEvaluatorServiceReference == null) {
return;
}
conditionEvaluatorDispatcher.removeEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
}
public void bindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
ConditionESQueryBuilder conditionESQueryBuilder = bundleContext.getService(conditionESQueryBuilderServiceReference);
conditionESQueryBuilderDispatcher.addQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(), conditionESQueryBuilder);
}
public void unbindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
if (conditionESQueryBuilderServiceReference == null) {
return;
}
conditionESQueryBuilderDispatcher.removeQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
}
@Override
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
case BundleEvent.STARTING:
loadPredefinedMappings(event.getBundle().getBundleContext(), true);
break;
}
}
private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) {
Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
return;
}
while (predefinedMappings.hasMoreElements()) {
URL predefinedMappingURL = predefinedMappings.nextElement();
logger.info("Found mapping at " + predefinedMappingURL + ", loading... ");
try {
final String path = predefinedMappingURL.getPath();
String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
String mappingSource = loadMappingFile(predefinedMappingURL);
mappings.put(name, mappingSource);
String itemIndexName = getIndex(name, new Date());
if (!client.indices().exists(new GetIndexRequest(itemIndexName), RequestOptions.DEFAULT)) {
logger.info("{} index doesn't exist yet, creating it...", itemIndexName);
internalCreateIndex(itemIndexName, mappingSource);
} else {
logger.info("Found index {}", itemIndexName);
if (createMapping) {
logger.info("Updating mapping for {}", itemIndexName);
createMapping(name, mappingSource);
}
}
} catch (Exception e) {
logger.error("Error while loading mapping definition " + predefinedMappingURL, e);
}
}
}
private String loadMappingFile(URL predefinedMappingURL) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream()));
StringBuilder content = new StringBuilder();
String l;
while ((l = reader.readLine()) != null) {
content.append(l);
}
return content.toString();
}
@Override
public <T extends Item> List<T> getAllItems(final Class<T> clazz) {
return getAllItems(clazz, 0, -1, null).getList();
}
@Override
public long getAllItemsCount(String itemType) {
return queryCount(QueryBuilders.matchAllQuery(), itemType);
}
@Override
public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
long startTime = System.currentTimeMillis();
try {
return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
}
}
}
@Override
public <T extends Item> T load(final String itemId, final Class<T> clazz) {
return load(itemId, null, clazz);
}
@Override
public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) {
return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem") {
protected T execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
T itemFromCache = getFromCache(itemId, clazz);
if (itemFromCache != null) {
return itemFromCache;
}
if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
@Override
public T execute(Object... args) throws Exception {
PartialList<T> r = query(QueryBuilders.idsQuery(itemType).addIds(itemId), null, clazz, 0, 1, null, null);
if (r.size() > 0) {
return r.get(0);
}
return null;
}
}.execute();
} else {
GetRequest getRequest = new GetRequest(getIndex(itemType, dateHint), itemId);
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(response.getId());
value.setVersion(response.getVersion());
putInCache(itemId, value);
return value;
} else {
return null;
}
}
} catch (IndexNotFoundException e) {
// this can happen if we are just testing the existence of the item, it is not always an error.
return null;
} catch (Exception ex) {
throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex);
}
}
}.catchingExecuteInClassLoader(true);
}
@Override
public boolean save(final Item item) {
return save(item, useBatchingForSave);
}
@Override
public boolean save(final Item item, final boolean useBatching) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
String itemId = item.getItemId();
putInCache(itemId, item);
String index = getIndex(itemType, itemsMonthlyIndexed.contains(itemType) ? ((TimestampedItem) item).getTimeStamp() : null);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
indexRequest.source(source, XContentType.JSON);
if (routingByType.containsKey(itemType)) {
indexRequest.routing(routingByType.get(itemType));
}
try {
if (bulkProcessor == null || !useBatching) {
client.index(indexRequest, RequestOptions.DEFAULT);
} else {
bulkProcessor.add(indexRequest);
}
} catch (IndexNotFoundException e) {
logger.error("Could not find index {}, could not register item type {} with id {} ",
index, itemType, itemId, e);
return false;
}
return true;
} catch (IOException e) {
throw new Exception("Error saving item " + item, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId);
updateRequest.doc(source);
if (bulkProcessor == null) {
client.update(updateRequest, RequestOptions.DEFAULT);
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
String index = getIndex(itemType, dateHint);
for (int i = 0; i < scripts.length; i++) {
Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
RefreshRequest refreshRequest = new RefreshRequest(index);
client.indices().refresh(refreshRequest, RequestOptions.DEFAULT);
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
updateByQueryRequest.setConflicts("proceed");
updateByQueryRequest.setMaxRetries(1000);
updateByQueryRequest.setSlices(2);
updateByQueryRequest.setScript(actualScript);
updateByQueryRequest.setQuery(conditionESQueryBuilderDispatcher.buildFilter(conditions[i]));
BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
if (response.getBulkFailures().size() > 0) {
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage());
}
} else {
logger.info("Update By Query has processed {} in {}.", response.getUpdated(), response.getTook().toString());
}
if (response.isTimedOut()) {
logger.error("Update By Query ended with timeout!");
}
if (response.getVersionConflicts() > 0) {
logger.warn("Update By Query ended with {} Version Conflicts!", response.getVersionConflicts());
}
if (response.getNoops() > 0) {
logger.warn("Update By Query ended with {} noops!", response.getNoops());
}
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName(), e);
} catch (ScriptException e) {
logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack());
throw new Exception("Error in the update script");
} finally {
return false;
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
String index = getIndex(itemType, dateHint);
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
UpdateRequest updateRequest = new UpdateRequest(index, itemId);
updateRequest.script(actualScript);
if (bulkProcessor == null) {
client.update(updateRequest, RequestOptions.DEFAULT);
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
DeleteRequest deleteRequest = new DeleteRequest(getIndexNameForQuery(itemType), itemId);
client.delete(deleteRequest, RequestOptions.DEFAULT);
return true;
} catch (Exception e) {
throw new Exception("Cannot remove", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
BulkRequest deleteByScopeBulkRequest = new BulkRequest();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery())
.indices(getIndexNameForQuery(itemType))
.scroll(keepAlive);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
.size(100);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// Scroll until no more hits are returned
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
// add hit to bulk delete
deleteFromCache(hit.getId(), clazz);
deleteByScopeBulkRequest.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
searchScrollRequest.scroll(keepAlive);
response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
// we're done with the scrolling, delete now
if (deleteByScopeBulkRequest.numberOfActions() > 0) {
final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
if (deleteResponse.hasFailures()) {
// do something
logger.debug("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage());
}
}
return true;
} catch (Exception e) {
throw new Exception("Cannot remove by query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean indexTemplateExists(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") {
protected Boolean execute(Object... args) throws IOException {
IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(templateName);
return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT);
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean removeIndexTemplate(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") {
protected Boolean execute(Object... args) throws IOException {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(templateName);
AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
return deleteIndexTemplateResponse.isAcknowledged();
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean createMonthlyIndexTemplate() {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") {
protected Boolean execute(Object... args) throws IOException {
boolean executedSuccessfully = true;
for (String itemName : itemsMonthlyIndexed) {
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-"+itemName+"-date-template")
.patterns(Collections.singletonList(getMonthlyIndexForQuery(itemName)))
.settings("{\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
" \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + "\n" +
" },\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
" \"folding\": {\n" +
" \"type\":\"custom\",\n" +
" \"tokenizer\": \"keyword\",\n" +
" \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n", XContentType.JSON);
putIndexTemplateRequest.mapping(mappings.get(itemName), XContentType.JSON);
AcknowledgedResponse putIndexTemplateResponse = client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
executedSuccessfully &= putIndexTemplateResponse.isAcknowledged();
}
return executedSuccessfully;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean createIndex(final String itemType) {
String index = getIndex(itemType);
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex") {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
if (!indexExists) {
internalCreateIndex(index, mappings.get(itemType));
}
return !indexExists;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean removeIndex(final String itemType) {
String index = getIndex(itemType);
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
if (indexExists) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
}
return indexExists;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
private void internalCreateIndex(String indexName, String mappingSource) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.settings("{\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + numberOfShards + ",\n" +
" \"number_of_replicas\" : " + numberOfReplicas + "\n" +
" },\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
" \"folding\": {\n" +
" \"type\":\"custom\",\n" +
" \"tokenizer\": \"keyword\",\n" +
" \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n", XContentType.JSON);
createIndexRequest.mapping(mappingSource, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
logger.info("Index created: [{}], acknowledge: [{}], shards acknowledge: [{}]", createIndexResponse.index(),
createIndexResponse.isAcknowledged(), createIndexResponse.isShardsAcknowledged());
}
@Override
public void createMapping(String type, String source) {
try {
if (itemsMonthlyIndexed.contains(type)) {
createMonthlyIndexTemplate();
String indexName = getIndex(type, new Date());
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
if (client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) {
putMapping(source, indexName);
}
} else {
putMapping(source, getIndex(type));
}
} catch (IOException ioe) {
logger.error("Error while creating mapping for type " + type + " and source " + source, ioe);
}
}
private void putMapping(final String source, final String indexName) throws IOException {
PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
putMappingRequest.source(source, XContentType.JSON);
client.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
}
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) {
return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args) throws Exception {
// Get all mapping for current itemType
GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
getMappingsRequest.indices(getIndexNameForQuery(itemType));
GetMappingsResponse getMappingsResponse = client.indices().getMapping(getMappingsRequest, RequestOptions.DEFAULT);
Map<String, MappingMetaData> mappings = getMappingsResponse.mappings();
// create a list of Keys to get the mappings in chronological order
// in case there is monthly context then the mapping will be added from the oldest to the most recent one
Set<String> orderedKeys = new TreeSet<>(mappings.keySet());
Map<String, Map<String, Object>> result = new HashMap<>();
try {
for (String key : orderedKeys) {
if (mappings.containsKey(key)) {
Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) mappings.get(key).getSourceAsMap().get("properties");
for (Map.Entry<String, Map<String, Object>> entry : properties.entrySet()) {
if (result.containsKey(entry.getKey())) {
Map<String, Object> subResult = result.get(entry.getKey());
for (Map.Entry<String, Object> subentry : entry.getValue().entrySet()) {
if (subResult.containsKey(subentry.getKey())
&& subResult.get(subentry.getKey()) instanceof Map
&& subentry.getValue() instanceof Map) {
mergePropertiesMapping((Map) subResult.get(subentry.getKey()), (Map) subentry.getValue());
} else {
subResult.put(subentry.getKey(), subentry.getValue());
}
}
} else {
result.put(entry.getKey(), entry.getValue());
}
}
}
}
} catch (Throwable t) {
throw new Exception("Cannot get mapping for itemType="+ itemType, t);
}
return result;
}
}.catchingExecuteInClassLoader(true);
}
private void mergePropertiesMapping(Map<String, Object> result, Map<String, Object> entry) {
for (Map.Entry<String, Object> subentry : entry.entrySet()) {
if (result.containsKey(subentry.getKey())
&& result.get(subentry.getKey()) instanceof Map
&& subentry.getValue() instanceof Map) {
mergePropertiesMapping((Map) result.get(subentry.getKey()), (Map) subentry.getValue());
} else {
result.put(subentry.getKey(), subentry.getValue());
}
}
}
public Map<String, Object> getPropertyMapping(String property, String itemType) {
Map<String, Map<String, Object>> mappings = knownMappings.get(itemType);
Map<String, Object> result = getPropertyMapping(property, mappings);
if (result == null) {
mappings = getPropertiesMapping(itemType);
knownMappings.put(itemType, mappings);
result = getPropertyMapping(property, mappings);
}
return result;
}
private Map<String, Object> getPropertyMapping(String property, Map<String, Map<String, Object>> mappings) {
Map<String, Object> propMapping = null;
String[] properties = StringUtils.split(property, '.');
for (int i = 0; i < properties.length && mappings != null; i++) {
String s = properties[i];
propMapping = mappings.get(s);
if (i == properties.length - 1) {
return propMapping;
} else {
mappings = propMapping != null ? ((Map<String, Map<String, Object>>) propMapping.get("properties")) : null;
}
}
return null;
}
private String getPropertyNameWithData(String name, String itemType) {
Map<String, Object> propertyMapping = getPropertyMapping(name, itemType);
if (propertyMapping == null) {
return null;
}
if (propertyMapping != null
&& "text".equals(propertyMapping.get("type"))
&& propertyMapping.containsKey("fields")
&& ((Map) propertyMapping.get("fields")).containsKey("keyword")) {
name += ".keyword";
}
return name;
}
public boolean saveQuery(final String queryName, final String query) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery") {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
logger.info("Saving query : " + queryName);
String index = getIndex(".percolator", null);
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(queryName);
indexRequest.source(query, XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(indexRequest, RequestOptions.DEFAULT);
return true;
} catch (Exception e) {
throw new Exception("Cannot save query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean saveQuery(String queryName, Condition query) {
if (query == null) {
return false;
}
saveQuery(queryName, conditionESQueryBuilderDispatcher.getQuery(query));
return true;
}
@Override
public boolean removeQuery(final String queryName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery") {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
String index = getIndex(".percolator", null);
DeleteRequest deleteRequest = new DeleteRequest(index);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, RequestOptions.DEFAULT);
return true;
} catch (Exception e) {
throw new Exception("Cannot delete query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean testMatch(Condition query, Item item) {
long startTime = System.currentTimeMillis();
try {
return conditionEvaluatorDispatcher.eval(query, item);
} catch (UnsupportedOperationException e) {
logger.error("Eval not supported, continue with query", e);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".testMatchLocally", startTime);
}
}
startTime = System.currentTimeMillis();
try {
final Class<? extends Item> clazz = item.getClass();
String itemType = Item.getItemType(clazz);
QueryBuilder builder = QueryBuilders.boolQuery()
.must(QueryBuilders.idsQuery(itemType).addIds(item.getItemId()))
.must(conditionESQueryBuilderDispatcher.buildFilter(query));
return queryCount(builder, itemType) > 0;
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".testMatchInElasticSearch", startTime);
}
}
}
@Override
public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) {
return query(query, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, null);
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) {
return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, scrollTimeValidity);
}
@Override
public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null, null);
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String fieldValue, String sortBy, final Class<T> clazz) {
return query(fieldName, fieldValue, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) {
return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz), null).getList();
}
@Override
public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.queryStringQuery(fulltext), sortBy, clazz, offset, size, null, null);
}
@Override
public <T extends Item> PartialList<T> rangeQuery(String fieldName, String from, String to, String sortBy, Class<T> clazz, int offset, int size) {
RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
builder.from(from);
builder.to(to);
return query(builder, sortBy, clazz, offset, size, null, null);
}
@Override
public long queryCount(Condition query, String itemType) {
try {
return conditionESQueryBuilderDispatcher.count(query);
} catch (UnsupportedOperationException e) {
try {
QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
if (filter instanceof IdsQueryBuilder) {
return ((IdsQueryBuilder) filter).ids().size();
}
return queryCount(filter, itemType);
} catch (UnsupportedOperationException e1) {
return -1;
}
}
}
private long queryCount(final QueryBuilder filter, final String itemType) {
return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount") {
@Override
protected Long execute(Object... args) throws IOException {
CountRequest countRequest = new CountRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(filter);
countRequest.source(searchSourceBuilder);
CountResponse response = client.count(countRequest, RequestOptions.DEFAULT);
return response.getCount();
}
}.catchingExecuteInClassLoader(true);
}
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query") {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
String scrollIdentifier = null;
long totalHits = 0;
PartialList.Relation totalHitsRelation = PartialList.Relation.EQUAL;
try {
String itemType = Item.getItemType(clazz);
TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.fetchSource(true)
.query(query)
.size(size < 0 ? defaultQueryLimit : size)
.from(offset);
if (scrollTimeValidity != null) {
keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity");
searchRequest.scroll(keepAlive);
}
if (size == Integer.MIN_VALUE) {
searchSourceBuilder.size(defaultQueryLimit);
} else if (size != -1) {
searchSourceBuilder.size(size);
} else {
// size == -1, use scroll query to retrieve all the results
searchRequest.scroll(keepAlive);
}
if (routing != null) {
searchRequest.routing(routing);
}
if (sortBy != null) {
String[] sortByArray = sortBy.split(",");
for (String sortByElement : sortByArray) {
if (sortByElement.startsWith("geo:")) {
String[] elements = sortByElement.split(":");
GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1], Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS);
if (elements.length > 4 && elements[4].equals("desc")) {
searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.DESC));
} else {
searchSourceBuilder.sort(distanceSortBuilder.order(SortOrder.ASC));
}
} else {
String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType);
if (name != null) {
if (sortByElement.endsWith(":desc")) {
searchSourceBuilder.sort(name, SortOrder.DESC);
} else {
searchSourceBuilder.sort(name, SortOrder.ASC);
}
} else {
// in the case of no data existing for the property, we will not add the sorting to the request.
}
}
}
}
searchSourceBuilder.version(true);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
if (size == -1) {
// Scroll until no more hits are returned
while (true) {
for (SearchHit searchHit : response.getHits().getHits()) {
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
searchScrollRequest.scroll(keepAlive);
response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
} else {
SearchHits searchHits = response.getHits();
scrollIdentifier = response.getScrollId();
totalHits = searchHits.getTotalHits().value;
totalHitsRelation = getTotalHitsRelation(searchHits.getTotalHits());
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
}
} catch (Exception t) {
throw new Exception("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
}
PartialList<T> result = new PartialList<T>(results, offset, size, totalHits, totalHitsRelation);
if (scrollIdentifier != null && totalHits != 0) {
result.setScrollIdentifier(scrollIdentifier);
result.setScrollTimeValidity(scrollTimeValidity);
}
return result;
}
}.catchingExecuteInClassLoader(true);
}
private PartialList.Relation getTotalHitsRelation(TotalHits totalHits) {
return TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO.equals(totalHits.relation) ? PartialList.Relation.GREATER_THAN_OR_EQUAL_TO : PartialList.Relation.EQUAL;
}
@Override
public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery") {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
long totalHits = 0;
try {
TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity");
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollIdentifier);
searchScrollRequest.scroll(keepAlive);
SearchResponse response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
if (response.getHits().getHits().length == 0) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
} else {
for (SearchHit searchHit : response.getHits().getHits()) {
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
}
PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits().value, getTotalHitsRelation(response.getHits().getTotalHits()));
if (scrollIdentifier != null) {
result.setScrollIdentifier(scrollIdentifier);
result.setScrollTimeValidity(scrollTimeValidity);
}
return result;
} catch (Exception t) {
throw new Exception("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t);
}
}
}.catchingExecuteInClassLoader(true);
}
/**
* @deprecated As of version 1.3.0-incubating, use {@link #aggregateWithOptimizedQuery(Condition, BaseAggregate, String)} instead
*/
@Deprecated
@Override
public Map<String, Long> aggregateQuery(Condition filter, BaseAggregate aggregate, String itemType) {
return aggregateQuery(filter, aggregate, itemType, false);
}
@Override
public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType) {
return aggregateQuery(filter, aggregate, itemType, true);
}
private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery) {
return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery") {
@Override
protected Map<String, Long> execute(Object... args) throws IOException {
Map<String, Long> results = new LinkedHashMap<String, Long>();
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>();
if (aggregate != null) {
AggregationBuilder bucketsAggregation = null;
String fieldName = aggregate.getField();
if (aggregate instanceof DateAggregate) {
DateAggregate dateAggregate = (DateAggregate) aggregate;
DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).calendarInterval(new DateHistogramInterval((dateAggregate.getInterval())));
if (dateAggregate.getFormat() != null) {
dateHistogramBuilder.format(dateAggregate.getFormat());
}
bucketsAggregation = dateHistogramBuilder;
} else if (aggregate instanceof NumericRangeAggregate) {
RangeAggregationBuilder rangebuilder = AggregationBuilders.range("buckets").field(fieldName);
for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) {
if (range != null) {
if (range.getFrom() != null && range.getTo() != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
} else if (range.getFrom() != null) {
rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
} else if (range.getTo() != null) {
rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
}
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof DateRangeAggregate) {
DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate;
DateRangeAggregationBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(fieldName);
if (dateRangeAggregate.getFormat() != null) {
rangebuilder.format(dateRangeAggregate.getFormat());
}
for (DateRange range : dateRangeAggregate.getDateRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom() != null ? range.getFrom().toString() : null, range.getTo() != null ? range.getTo().toString() : null);
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof IpRangeAggregate) {
IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate;
IpRangeAggregationBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(fieldName);
for (IpRange range : ipRangeAggregate.getRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
}
}
bucketsAggregation = rangebuilder;
} else {
fieldName = getPropertyNameWithData(fieldName, itemType);
//default
if (fieldName != null) {
bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize);
if (aggregate instanceof TermsAggregate) {
TermsAggregate termsAggregate = (TermsAggregate) aggregate;
if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
}
}
} else {
// field name could be null if no existing data exists
}
}
if (bucketsAggregation != null) {
final MissingAggregationBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(fieldName);
for (AggregationBuilder aggregationBuilder : lastAggregation) {
bucketsAggregation.subAggregation(aggregationBuilder);
missingBucketsAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation);
}
}
// If the request is optimized then we don't need a global aggregation which is very slow and we can put the query with a
// filter on range items in the query block so we don't retrieve all the document before filtering the whole
if (optimizedQuery) {
for (AggregationBuilder aggregationBuilder : lastAggregation) {
searchSourceBuilder.aggregation(aggregationBuilder);
}
if (filter != null) {
searchSourceBuilder.query(conditionESQueryBuilderDispatcher.buildFilter(filter));
}
} else {
if (filter != null) {
AggregationBuilder filterAggregation = AggregationBuilders.filter("filter", conditionESQueryBuilderDispatcher.buildFilter(filter));
for (AggregationBuilder aggregationBuilder : lastAggregation) {
filterAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Collections.singletonList(filterAggregation);
}
AggregationBuilder globalAggregation = AggregationBuilders.global("global");
for (AggregationBuilder aggregationBuilder : lastAggregation) {
globalAggregation.subAggregation(aggregationBuilder);
}
searchSourceBuilder.aggregation(globalAggregation);
}
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
if (optimizedQuery) {
if (response.getHits() != null) {
results.put("_filtered", response.getHits().getTotalHits().value);
}
} else {
Global globalAgg = aggregations.get("global");
results.put("_all", globalAgg.getDocCount());
aggregations = globalAgg.getAggregations();
if (aggregations.get("filter") != null) {
Filter filterAgg = aggregations.get("filter");
results.put("_filtered", filterAgg.getDocCount());
aggregations = filterAgg.getAggregations();
}
}
if (aggregations.get("buckets") != null) {
long totalDocCount = 0;
MultiBucketsAggregation terms = aggregations.get("buckets");
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
results.put(bucket.getKeyAsString(), bucket.getDocCount());
totalDocCount += bucket.getDocCount();
}
SingleBucketAggregation missing = aggregations.get("missing");
if (missing.getDocCount() > 0) {
results.put("_missing", missing.getDocCount());
totalDocCount += missing.getDocCount();
}
if (response.getHits() != null && TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO.equals(response.getHits().getTotalHits().relation)) {
results.put("_filtered", totalDocCount);
}
}
}
return results;
}
}.catchingExecuteInClassLoader(true);
}
private <T extends Item> String[] getRouting(String fieldName, String[] fieldValues, Class<T> clazz) {
String itemType = Item.getItemType(clazz);
String[] routing = null;
if (routingByType.containsKey(itemType) && routingByType.get(itemType).equals(fieldName)) {
routing = fieldValues;
}
return routing;
}
@Override
public void refresh() {
new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh") {
protected Boolean execute(Object... args) {
if (bulkProcessor != null) {
bulkProcessor.flush();
}
try {
client.indices().refresh(Requests.refreshRequest(), RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();//TODO manage ES7
}
return true;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public void purge(final Date date) {
new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") {
@Override
protected Object execute(Object... args) throws Exception {
GetIndexRequest getIndexRequest = new GetIndexRequest(getAllIndexForQuery());
GetIndexResponse getIndexResponse = client.indices().get(getIndexRequest, RequestOptions.DEFAULT);
String[] indices = getIndexResponse.getIndices();
SimpleDateFormat d = new SimpleDateFormat("yyyy-MM");
List<String> toDelete = new ArrayList<String>();
for (String currentIndexName : indices) {
int indexDatePrefixPos = currentIndexName.indexOf(INDEX_DATE_PREFIX);
if (indexDatePrefixPos > -1) {
try {
Date indexDate = d.parse(currentIndexName.substring(indexDatePrefixPos + INDEX_DATE_PREFIX.length()));
if (indexDate.before(date)) {
toDelete.add(currentIndexName);
}
} catch (ParseException e) {
throw new Exception("Cannot parse index name " + currentIndexName, e);
}
}
}
if (!toDelete.isEmpty()) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(toDelete.toArray(new String[toDelete.size()]));
client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
}
return null;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public void purge(final String scope) {
new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope") {
@Override
protected Void execute(Object... args) throws IOException {
QueryBuilder query = termQuery("scope", scope);
BulkRequest deleteByScopeBulkRequest = new BulkRequest();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequest searchRequest = new SearchRequest(getAllIndexForQuery()).scroll(keepAlive);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(query)
.size(100);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// Scroll until no more hits are returned
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
// add hit to bulk delete
DeleteRequest deleteRequest = new DeleteRequest(hit.getIndex(), hit.getId());
deleteByScopeBulkRequest.add(deleteRequest);
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(response.getScrollId());
searchScrollRequest.scroll(keepAlive);
response = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
break;
}
}
// we're done with the scrolling, delete now
if (deleteByScopeBulkRequest.numberOfActions() > 0) {
final BulkResponse deleteResponse = client.bulk(deleteByScopeBulkRequest, RequestOptions.DEFAULT);
if (deleteResponse.hasFailures()) {
// do something
logger.warn("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage());
}
}
return null;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) {
return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics") {
@Override
protected Map<String, Double> execute(Object... args) throws IOException {
Map<String, Double> results = new LinkedHashMap<String, Double>();
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.size(0)
.query(QueryBuilders.matchAllQuery());
AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics", conditionESQueryBuilderDispatcher.buildFilter(condition));
if (metrics != null) {
for (String metric : metrics) {
switch (metric) {
case "sum":
filterAggregation.subAggregation(AggregationBuilders.sum("sum").field(field));
break;
case "avg":
filterAggregation.subAggregation(AggregationBuilders.avg("avg").field(field));
break;
case "min":
filterAggregation.subAggregation(AggregationBuilders.min("min").field(field));
break;
case "max":
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
break;
case "card":
filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
break;
case "count":
filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
break;
}
}
}
searchSourceBuilder.aggregation(filterAggregation);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
Aggregation metricsResults = aggregations.get("metrics");
if (metricsResults instanceof HasAggregations) {
aggregations = ((HasAggregations) metricsResults).getAggregations();
for (Aggregation aggregation : aggregations) {
NumericMetricsAggregation.SingleValue singleValue = (NumericMetricsAggregation.SingleValue) aggregation;
results.put("_" + singleValue.getName(), singleValue.value());
}
}
}
return results;
}
}.catchingExecuteInClassLoader(true);
}
private String getConfig(Map<String, String> settings, String key,
String defaultValue) {
if (settings != null && settings.get(key) != null) {
return settings.get(key);
}
return defaultValue;
}
public abstract static class InClassLoaderExecute<T> {
private String timerName;
private MetricsService metricsService;
public InClassLoaderExecute(MetricsService metricsService, String timerName) {
this.timerName = timerName;
this.metricsService = metricsService;
}
protected abstract T execute(Object... args) throws Exception;
public T executeInClassLoader(Object... args) throws Exception {
long startTime = System.currentTimeMillis();
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return execute(args);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(timerName, startTime);
}
Thread.currentThread().setContextClassLoader(tccl);
}
}
public T catchingExecuteInClassLoader(boolean logError, Object... args) {
try {
return executeInClassLoader(timerName, args);
} catch (Exception e) {
if (logError) {
logger.error("Error while executing in class loader", e);
}
}
return null;
}
}
private <T extends Item> boolean isCacheActiveForClass(String className) {
if (itemClassesToCacheSet.contains("*")) {
return true;
}
if (itemClassesToCacheSet.contains(className)) {
return true;
}
return false;
}
private <T extends Item> T getFromCache(String itemId, Class<T> clazz) {
String className = clazz.getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.get(itemId);
}
private <T extends Item> T putInCache(String itemId, T item) {
String className = item.getClass().getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.put(itemId, item);
}
private <T extends Item> T deleteFromCache(String itemId, Class clazz) {
String className = clazz.getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.remove(itemId);
}
private String getAllIndexForQuery() {
return indexPrefix + "*";
}
private String getIndexNameForQuery(String itemType) {
return itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexForQuery(itemType) : getIndex(itemType, null);
}
private String getMonthlyIndexForQuery(String itemType) {
return indexPrefix + "-" + itemType.toLowerCase() + "-" + INDEX_DATE_PREFIX + "*";
}
private String getIndex(String itemType, Date dateHint) {
String indexItemTypePart = itemsMonthlyIndexed.contains(itemType) && dateHint != null ? itemType + "-" + getMonthlyIndexPart(dateHint) : itemType;
return getIndex(indexItemTypePart);
}
private String getIndex(String indexItemTypePart) {
return (indexPrefix + "-" + indexItemTypePart).toLowerCase();
}
private String getMonthlyIndexPart(Date date) {
String d = new SimpleDateFormat("yyyy-MM").format(date);
return INDEX_DATE_PREFIX + d;
}
}