blob: 87a46eb7dfaf3c04cbf78594c36ae8d210b95ea7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.persistence.elasticsearch;
import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.TimestampedItem;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.DateRange;
import org.apache.unomi.api.query.IpRange;
import org.apache.unomi.api.query.NumericRange;
import org.apache.unomi.metrics.MetricAdapter;
import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.osgi.framework.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
public static final String NUMBER_OF_SHARDS = "number_of_shards";
public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
public static final String CLUSTER_NAME = "cluster.name";
public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private TransportClient client;
private BulkProcessor bulkProcessor;
private String elasticSearchAddresses;
private List<String> elasticSearchAddressList = new ArrayList<>();
private String clusterName;
private String indexName;
private String monthlyIndexNumberOfShards;
private String monthlyIndexNumberOfReplicas;
private String numberOfShards;
private String numberOfReplicas;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
private Map<String, String> indexNames;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
private Integer defaultQueryLimit = 10;
private String bulkProcessorConcurrentRequests = "1";
private String bulkProcessorBulkActions = "1000";
private String bulkProcessorBulkSize = "5MB";
private String bulkProcessorFlushInterval = "5s";
private String bulkProcessorBackoffPolicy = "exponential";
private String minimalElasticSearchVersion = "5.0.0";
private String maximalElasticSearchVersion = "5.7.0";
private String aggregateQueryBucketSize = "5000";
private String transportClientClassName = null;
private String transportClientProperties = null;
private String transportClientJarDirectory = null;
private MetricsService metricsService;
private HazelcastInstance hazelcastInstance;
private Set<String> itemClassesToCacheSet = new HashSet<>();
private String itemClassesToCache;
private boolean useBatchingForSave = false;
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setElasticSearchAddresses(String elasticSearchAddresses) {
this.elasticSearchAddresses = elasticSearchAddresses;
String[] elasticSearchAddressesArray = elasticSearchAddresses.split(",");
elasticSearchAddressList.clear();
for (String elasticSearchAddress : elasticSearchAddressesArray) {
elasticSearchAddressList.add(elasticSearchAddress.trim());
}
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) {
this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
}
public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) {
this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
}
public void setNumberOfShards(String numberOfShards) {
this.numberOfShards = numberOfShards;
}
public void setNumberOfReplicas(String numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
}
public void setDefaultQueryLimit(Integer defaultQueryLimit) {
this.defaultQueryLimit = defaultQueryLimit;
}
public void setItemsMonthlyIndexed(List<String> itemsMonthlyIndexed) {
this.itemsMonthlyIndexed = itemsMonthlyIndexed;
}
public void setIndexNames(Map<String, String> indexNames) {
this.indexNames = indexNames;
}
public void setRoutingByType(Map<String, String> routingByType) {
this.routingByType = routingByType;
}
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
}
public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) {
this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
}
public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
this.bulkProcessorBulkActions = bulkProcessorBulkActions;
}
public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) {
this.bulkProcessorBulkSize = bulkProcessorBulkSize;
}
public void setBulkProcessorFlushInterval(String bulkProcessorFlushInterval) {
this.bulkProcessorFlushInterval = bulkProcessorFlushInterval;
}
public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) {
this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
}
public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
this.minimalElasticSearchVersion = minimalElasticSearchVersion;
}
public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
this.maximalElasticSearchVersion = maximalElasticSearchVersion;
}
public void setAggregateQueryBucketSize(String aggregateQueryBucketSize) {
this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void setTransportClientClassName(String transportClientClassName) {
this.transportClientClassName = transportClientClassName;
}
public void setTransportClientProperties(String transportClientProperties) {
this.transportClientProperties = transportClientProperties;
}
public void setTransportClientJarDirectory(String transportClientJarDirectory) {
this.transportClientJarDirectory = transportClientJarDirectory;
}
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
public void setItemClassesToCache(String itemClassesToCache) {
this.itemClassesToCache = itemClassesToCache;
if (StringUtils.isNotBlank(itemClassesToCache)) {
String[] itemClassesToCacheParts = itemClassesToCache.split(",");
if (itemClassesToCacheParts != null) {
itemClassesToCacheSet.clear();
for (String itemClassToCache : itemClassesToCacheParts) {
itemClassesToCacheSet.add(itemClassToCache.trim());
}
}
}
}
public void setUseBatchingForSave(boolean useBatchingForSave) {
this.useBatchingForSave = useBatchingForSave;
}
public void start() throws Exception {
// on startup
new InClassLoaderExecute<Object>(null, null) {
public Object execute(Object... args) throws Exception {
bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests);
bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions);
bulkProcessorBulkSize = System.getProperty(BULK_PROCESSOR_BULK_SIZE, bulkProcessorBulkSize);
bulkProcessorFlushInterval = System.getProperty(BULK_PROCESSOR_FLUSH_INTERVAL, bulkProcessorFlushInterval);
bulkProcessorBackoffPolicy = System.getProperty(BULK_PROCESSOR_BACKOFF_POLICY, bulkProcessorBackoffPolicy);
// this property is used for integration tests, to make sure we don't conflict with an already running ElasticSearch instance.
if (System.getProperty("org.apache.unomi.itests.elasticsearch.transport.port") != null) {
elasticSearchAddressList.clear();
elasticSearchAddressList.add("localhost:" + System.getProperty("org.apache.unomi.itests.elasticsearch.transport.port"));
logger.info("Overriding ElasticSearch address list from system property=" + elasticSearchAddressList);
}
// this property is used for integration tests, to make sure we don't conflict with an already running ElasticSearch instance.
if (System.getProperty("org.apache.unomi.itests.elasticsearch.cluster.name") != null) {
clusterName = System.getProperty("org.apache.unomi.itests.elasticsearch.cluster.name");
logger.info("Overriding cluster name from system property=" + clusterName);
}
Settings.Builder transportSettings = Settings.builder()
.put(CLUSTER_NAME, clusterName);
if (StringUtils.isNotBlank(transportClientClassName) && StringUtils.isNotBlank(transportClientJarDirectory)) {
logger.info("Connecting to ElasticSearch persistence backend using transport class " + transportClientClassName +
" with JAR directory "+transportClientJarDirectory +
" using cluster name " + clusterName + " and index name " + indexName + "...");
client = newTransportClient(transportSettings, transportClientClassName, transportClientJarDirectory, transportClientProperties);
} else {
logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "...");
client = new PreBuiltTransportClient(transportSettings.build());
}
for (String elasticSearchAddress : elasticSearchAddressList) {
String[] elasticSearchAddressParts = elasticSearchAddress.split(":");
String elasticSearchHostName = elasticSearchAddressParts[0];
int elasticSearchPort = Integer.parseInt(elasticSearchAddressParts[1]);
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticSearchHostName), elasticSearchPort));
} catch (UnknownHostException e) {
String message = "Error resolving address " + elasticSearchAddress + " ElasticSearch transport client not connected";
throw new Exception(message, e);
}
}
// let's now check the versions of all the nodes in the cluster, to make sure they are as expected.
try {
NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo()
.all().execute().get();
org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion);
org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion);
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
org.elasticsearch.Version version = nodeInfo.getVersion();
if (version.before(minimalVersion) ||
version.equals(maximalVersion) ||
version.after(maximalVersion)) {
throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
}
}
} catch (InterruptedException e) {
throw new Exception("Error checking ElasticSearch versions", e);
} catch (ExecutionException e) {
throw new Exception("Error checking ElasticSearch versions", e);
}
loadPredefinedMappings(bundleContext, false);
// load predefined mappings and condition dispatchers of any bundles that were started before this one.
for (Bundle existingBundle : bundleContext.getBundles()) {
if (existingBundle.getBundleContext() != null) {
loadPredefinedMappings(existingBundle.getBundleContext(), false);
}
}
// @todo is there a better way to detect index existence than to wait for it to startup ?
boolean indexExists = false;
int tries = 0;
while (!indexExists && tries < 20) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
indexExists = indicesExistsResponse.isExists();
tries++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("Interrupted", e);
}
}
if (!indexExists) {
logger.info("{} index doesn't exist yet, creating it...", indexName);
Map<String, String> indexMappings = new HashMap<String, String>();
indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
internalCreateIndex(indexName, indexMappings);
} else {
logger.info("Found index {}, ElasticSearch started successfully.", indexName);
for (Map.Entry<String, String> entry : mappings.entrySet()) {
createMapping(entry.getKey(), entry.getValue());
}
}
createMonthlyIndexTemplate();
if (client != null && bulkProcessor == null) {
bulkProcessor = getBulkProcessor();
}
logger.info("Waiting for GREEN cluster status...");
client.admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.get();
logger.info("Cluster status is GREEN");
return true;
}
}.executeInClassLoader();
bundleContext.addBundleListener(this);
logger.info(this.getClass().getName() + " service started successfully.");
}
public BulkProcessor getBulkProcessor() {
if (bulkProcessor != null) {
return bulkProcessor;
}
BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
logger.debug("Before Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
logger.debug("After Bulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
logger.error("After Bulk (failure)", failure);
}
});
if (bulkProcessorConcurrentRequests != null) {
int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests);
if (concurrentRequests > 1) {
bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
}
}
if (bulkProcessorBulkActions != null) {
int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
bulkProcessorBuilder.setBulkActions(bulkActions);
}
if (bulkProcessorBulkSize != null) {
bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
}
if (bulkProcessorFlushInterval != null) {
bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval, null, BULK_PROCESSOR_FLUSH_INTERVAL));
} else {
// in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
}
if (bulkProcessorBackoffPolicy != null) {
String backoffPolicyStr = bulkProcessorBackoffPolicy;
if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
backoffPolicyStr = backoffPolicyStr.toLowerCase();
if ("nobackoff".equals(backoffPolicyStr)) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff());
} else if (backoffPolicyStr.startsWith("constant(")) {
int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
} else if (backoffPolicyStr.startsWith("exponential")) {
if (!backoffPolicyStr.contains("(")) {
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff());
} else {
// we detected parameters, must process them.
int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY);
int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos));
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
}
}
}
}
bulkProcessor = bulkProcessorBuilder.build();
return bulkProcessor;
}
public void stop() {
new InClassLoaderExecute<Object>(null, null) {
protected Object execute(Object... args) {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
try {
bulkProcessor.awaitClose(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("Error waiting for bulk operations to flush !", e);
}
}
if (client != null) {
client.close();
}
return null;
}
}.catchingExecuteInClassLoader(true);
bundleContext.removeBundleListener(this);
}
public void bindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
ConditionEvaluator conditionEvaluator = bundleContext.getService(conditionEvaluatorServiceReference);
conditionEvaluatorDispatcher.addEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(), conditionEvaluator);
}
public void unbindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
if (conditionEvaluatorServiceReference == null) {
return;
}
conditionEvaluatorDispatcher.removeEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
}
public void bindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
ConditionESQueryBuilder conditionESQueryBuilder = bundleContext.getService(conditionESQueryBuilderServiceReference);
conditionESQueryBuilderDispatcher.addQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(), conditionESQueryBuilder);
}
public void unbindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
if (conditionESQueryBuilderServiceReference == null) {
return;
}
conditionESQueryBuilderDispatcher.removeQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
}
@Override
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
case BundleEvent.STARTING:
loadPredefinedMappings(event.getBundle().getBundleContext(), true);
break;
}
}
private String getMonthlyIndexName(Date date) {
String d = new SimpleDateFormat("-yyyy-MM").format(date);
String monthlyIndexName = indexName + d;
return monthlyIndexName;
}
private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) {
Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
return;
}
while (predefinedMappings.hasMoreElements()) {
URL predefinedMappingURL = predefinedMappings.nextElement();
logger.info("Found mapping at " + predefinedMappingURL + ", loading... ");
try {
final String path = predefinedMappingURL.getPath();
String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream()));
StringBuilder content = new StringBuilder();
String l;
while ((l = reader.readLine()) != null) {
content.append(l);
}
String mappingSource = content.toString();
mappings.put(name, mappingSource);
if (createMapping) {
createMapping(name, mappingSource);
}
} catch (Exception e) {
logger.error("Error while loading mapping definition " + predefinedMappingURL, e);
}
}
}
@Override
public <T extends Item> List<T> getAllItems(final Class<T> clazz) {
return getAllItems(clazz, 0, -1, null).getList();
}
@Override
public long getAllItemsCount(String itemType) {
return queryCount(QueryBuilders.matchAllQuery(), itemType);
}
@Override
public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
long startTime = System.currentTimeMillis();
try {
return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, null);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
}
}
}
@Override
public <T extends Item> T load(final String itemId, final Class<T> clazz) {
return load(itemId, null, clazz);
}
@Override
public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) {
return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem") {
protected T execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
T itemFromCache = getFromCache(itemId, clazz);
if (itemFromCache != null) {
return itemFromCache;
}
if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
return new MetricAdapter<T>(metricsService, ".loadItemWithQuery") {
@Override
public T execute(Object... args) throws Exception {
PartialList<T> r = query(QueryBuilders.idsQuery(itemType).addIds(itemId), null, clazz, 0, 1, null, null);
if (r.size() > 0) {
return r.get(0);
}
return null;
}
}.execute();
} else {
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName);
GetResponse response = client.prepareGet(index, itemType, itemId)
.execute()
.actionGet();
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(response.getId());
value.setVersion(response.getVersion());
putInCache(itemId, value);
return value;
} else {
return null;
}
}
} catch (IndexNotFoundException e) {
// this can happen if we are just testing the existence of the item, it is not always an error.
return null;
} catch (Exception ex) {
throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex);
}
}
}.catchingExecuteInClassLoader(true);
}
@Override
public boolean save(final Item item) {
return save(item, useBatchingForSave);
}
@Override
public boolean save(final Item item, final boolean useBatching) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
String itemId = item.getItemId();
putInCache(itemId, item);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName);
IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, itemId)
.setSource(source, XContentType.JSON);
if (routingByType.containsKey(itemType)) {
indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
}
try {
if (bulkProcessor == null || !useBatching) {
indexBuilder.execute().actionGet();
} else {
bulkProcessor.add(indexBuilder.request());
}
} catch (IndexNotFoundException e) {
logger.error("Could not find index {}, could not register item type {} with id {} ",
index, itemType, itemId, e);
return false;
}
return true;
} catch (IOException e) {
throw new Exception("Error saving item " + item, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
if (bulkProcessor == null) {
client.prepareUpdate(index, itemType, itemId).setDoc(source)
.execute()
.actionGet();
} else {
UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request();
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
for (int i = 0; i < scripts.length; i++) {
Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
client.admin().indices().prepareRefresh(index).get();
UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
ubqrb.source(index).source().setTypes(itemType);
BulkByScrollResponse response = ubqrb.setSlices(2)
.setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript)
.filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get();
if (response.getBulkFailures().size() > 0) {
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage());
}
} else {
logger.info("Update By Query has processed {} in {}.", response.getUpdated(), response.getTook().toString());
}
if (response.isTimedOut()) {
logger.error("Update By Query ended with timeout!");
}
if (response.getVersionConflicts() > 0) {
logger.warn("Update By Query ended with {} Version Conflicts!", response.getVersionConflicts());
}
if (response.getNoops() > 0) {
logger.warn("Update By Query ended with {} noops!", response.getNoops());
}
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName(), e);
} catch (ScriptException e) {
logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack());
throw new Exception("Error in the update script");
} finally {
return false;
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
if (bulkProcessor == null) {
client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
.execute()
.actionGet();
} else {
UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request();
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
client.prepareDelete(getIndexNameForQuery(itemType), itemType, itemId)
.execute().actionGet();
return true;
} catch (Exception e) {
throw new Exception("Cannot remove", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery") {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
BulkRequestBuilder deleteByScope = client.prepareBulk();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchResponse response = client.prepareSearch(indexName + "*")
.setIndices(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setScroll(keepAlive)
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
.setSize(100).execute().actionGet();
// Scroll until no more hits are returned
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
// add hit to bulk delete
deleteFromCache(hit.getId(), clazz);
deleteByScope.add(Requests.deleteRequest(hit.getIndex()).type(hit.getType()).id(hit.getId()));
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
// we're done with the scrolling, delete now
if (deleteByScope.numberOfActions() > 0) {
final BulkResponse deleteResponse = deleteByScope.get();
if (deleteResponse.hasFailures()) {
// do something
logger.debug("Couldn't remove by query " + query + ":\n{}", deleteResponse.buildFailureMessage());
}
}
return true;
} catch (Exception e) {
throw new Exception("Cannot remove by query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean indexTemplateExists(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") {
protected Boolean execute(Object... args) {
GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(templateName).execute().actionGet();
return getIndexTemplatesResponse.getIndexTemplates().size() == 1;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean removeIndexTemplate(final String templateName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") {
protected Boolean execute(Object... args) {
DeleteIndexTemplateResponse deleteIndexTemplateResponse = client.admin().indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet();
return deleteIndexTemplateResponse.isAcknowledged();
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean createMonthlyIndexTemplate() {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") {
protected Boolean execute(Object... args) {
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices")
.template(indexName + "-*")
.settings("{\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
" \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + "\n" +
" },\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
" \"folding\": {\n" +
" \"type\":\"custom\",\n" +
" \"tokenizer\": \"keyword\",\n" +
" \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n", XContentType.JSON);
Map<String, String> indexMappings = new HashMap<String, String>();
indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (itemsMonthlyIndexed.contains(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
putIndexTemplateRequest.mappings().putAll(indexMappings);
PutIndexTemplateResponse putIndexTemplateResponse = client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
return putIndexTemplateResponse.isAcknowledged();
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean createIndex(final String indexName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createItem") {
protected Boolean execute(Object... args) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (!indexExists) {
Map<String, String> indexMappings = new HashMap<String, String>();
indexMappings.put("_default_", mappings.get("_default_"));
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
internalCreateIndex(indexName, indexMappings);
}
return !indexExists;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
public boolean removeIndex(final String indexName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") {
protected Boolean execute(Object... args) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (indexExists) {
client.admin().indices().prepareDelete(indexName).execute().actionGet();
}
return indexExists;
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
private void internalCreateIndex(String indexName, Map<String, String> mappings) {
CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName)
.setSettings("{\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + numberOfShards + ",\n" +
" \"number_of_replicas\" : " + numberOfReplicas + "\n" +
" },\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
" \"folding\": {\n" +
" \"type\":\"custom\",\n" +
" \"tokenizer\": \"keyword\",\n" +
" \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n", XContentType.JSON);
for (Map.Entry<String, String> entry : mappings.entrySet()) {
builder.addMapping(entry.getKey(), entry.getValue(), XContentType.JSON);
}
builder.execute().actionGet();
}
private void createMapping(final String type, final String source, final String indexName) {
client.admin().indices()
.preparePutMapping(indexName)
.setType(type)
.setSource(source, XContentType.JSON)
.execute().actionGet();
}
@Override
public void createMapping(String type, String source) {
if (type.equals("_default_")) {
return;
}
if (itemsMonthlyIndexed.contains(type)) {
createMonthlyIndexTemplate();
if (client.admin().indices().prepareExists(indexName + "-*").execute().actionGet().isExists()) {
createMapping(type, source, indexName + "-*");
}
} else if (indexNames.containsKey(type)) {
if (client.admin().indices().prepareExists(indexNames.get(type)).execute().actionGet().isExists()) {
createMapping(type, source, indexNames.get(type));
}
} else {
createMapping(type, source, indexName);
}
}
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) {
return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args) throws Exception {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings();
Map<String, Map<String, Object>> propertyMap = new HashMap<>();
try {
Iterator<ImmutableOpenMap<String, MappingMetaData>> it = mappings.valuesIt();
while (it.hasNext()) {
ImmutableOpenMap<String, MappingMetaData> next = it.next();
Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) next.get(itemType).getSourceAsMap().get("properties");
for (Map.Entry<String, Map<String, Object>> entry : properties.entrySet()) {
if (propertyMap.containsKey(entry.getKey())) {
Map<String, Object> subPropMap = propertyMap.get(entry.getKey());
for (Map.Entry<String, Object> subentry : entry.getValue().entrySet()) {
if (subPropMap.containsKey(subentry.getKey()) && subPropMap.get(subentry.getKey()) instanceof Map && subentry.getValue() instanceof Map) {
((Map) subPropMap.get(subentry.getKey())).putAll((Map) subentry.getValue());
} else {
subPropMap.put(subentry.getKey(), subentry.getValue());
}
}
} else {
propertyMap.put(entry.getKey(), entry.getValue());
}
}
}
} catch (Throwable t) {
throw new Exception("Cannot get mapping for itemType="+ itemType, t);
}
return propertyMap;
}
}.catchingExecuteInClassLoader(true);
}
public Map<String, Object> getPropertyMapping(String property, String itemType) {
Map<String, Map<String, Object>> mappings = knownMappings.get(itemType);
Map<String, Object> result = getPropertyMapping(property, mappings);
if (result == null) {
mappings = getPropertiesMapping(itemType);
knownMappings.put(itemType, mappings);
result = getPropertyMapping(property, mappings);
}
return result;
}
private Map<String, Object> getPropertyMapping(String property, Map<String, Map<String, Object>> mappings) {
Map<String, Object> propMapping = null;
String[] properties = StringUtils.split(property, '.');
for (int i = 0; i < properties.length && mappings != null; i++) {
String s = properties[i];
propMapping = mappings.get(s);
if (i == properties.length - 1) {
return propMapping;
} else {
mappings = propMapping != null ? ((Map<String, Map<String, Object>>) propMapping.get("properties")) : null;
}
}
return null;
}
private String getPropertyNameWithData(String name, String itemType) {
Map<String, Object> propertyMapping = getPropertyMapping(name, itemType);
if (propertyMapping == null) {
return null;
}
if (propertyMapping != null
&& "text".equals(propertyMapping.get("type"))
&& propertyMapping.containsKey("fields")
&& ((Map) propertyMapping.get("fields")).containsKey("keyword")) {
name += ".keyword";
}
return name;
}
public boolean saveQuery(final String queryName, final String query) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery") {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
logger.info("Saving query : " + queryName);
client.prepareIndex(indexName, ".percolator", queryName)
.setSource(query, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute().actionGet();
return true;
} catch (Exception e) {
throw new Exception("Cannot save query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean saveQuery(String queryName, Condition query) {
if (query == null) {
return false;
}
saveQuery(queryName, conditionESQueryBuilderDispatcher.getQuery(query));
return true;
}
@Override
public boolean removeQuery(final String queryName) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery") {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
client.prepareDelete(indexName, ".percolator", queryName)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute().actionGet();
return true;
} catch (Exception e) {
throw new Exception("Cannot delete query", e);
}
}
}.catchingExecuteInClassLoader(true);
if (result == null) {
return false;
} else {
return result;
}
}
@Override
public boolean testMatch(Condition query, Item item) {
long startTime = System.currentTimeMillis();
try {
return conditionEvaluatorDispatcher.eval(query, item);
} catch (UnsupportedOperationException e) {
logger.error("Eval not supported, continue with query", e);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".testMatchLocally", startTime);
}
}
startTime = System.currentTimeMillis();
try {
final Class<? extends Item> clazz = item.getClass();
String itemType = Item.getItemType(clazz);
QueryBuilder builder = QueryBuilders.boolQuery()
.must(QueryBuilders.idsQuery(itemType).addIds(item.getItemId()))
.must(conditionESQueryBuilderDispatcher.buildFilter(query));
return queryCount(builder, itemType) > 0;
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".testMatchInElasticSearch", startTime);
}
}
}
@Override
public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) {
return query(query, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, null);
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) {
return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, scrollTimeValidity);
}
@Override
public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null, null);
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String fieldValue, String sortBy, final Class<T> clazz) {
return query(fieldName, fieldValue, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) {
return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz), null).getList();
}
@Override
public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz), null);
}
@Override
public <T extends Item> PartialList<T> rangeQuery(String fieldName, String from, String to, String sortBy, Class<T> clazz, int offset, int size) {
RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
builder.from(from);
builder.to(to);
return query(builder, sortBy, clazz, offset, size, null, null);
}
@Override
public long queryCount(Condition query, String itemType) {
try {
return conditionESQueryBuilderDispatcher.count(query);
} catch (UnsupportedOperationException e) {
try {
QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query);
if (filter instanceof IdsQueryBuilder) {
return ((IdsQueryBuilder) filter).ids().size();
}
return queryCount(filter, itemType);
} catch (UnsupportedOperationException e1) {
return -1;
}
}
}
private long queryCount(final QueryBuilder filter, final String itemType) {
return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount") {
@Override
protected Long execute(Object... args) {
SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSize(0)
.setQuery(filter)
.execute()
.actionGet();
return response.getHits().getTotalHits();
}
}.catchingExecuteInClassLoader(true);
}
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query") {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
String scrollIdentifier = null;
long totalHits = 0;
try {
String itemType = Item.getItemType(clazz);
TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchRequestBuilder requestBuilder = null;
if (scrollTimeValidity != null) {
keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity");
requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setFetchSource(true)
.setScroll(keepAlive)
.setFrom(offset)
.setQuery(query)
.setSize(size);
} else {
requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setFetchSource(true)
.setQuery(query)
.setFrom(offset);
}
if (size == Integer.MIN_VALUE) {
requestBuilder.setSize(defaultQueryLimit);
} else if (size != -1) {
requestBuilder.setSize(size);
} else {
// size == -1, use scroll query to retrieve all the results
requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setFetchSource(true)
.setScroll(keepAlive)
.setFrom(offset)
.setQuery(query)
.setSize(100);
}
if (routing != null) {
requestBuilder.setRouting(routing);
}
if (sortBy != null) {
String[] sortByArray = sortBy.split(",");
for (String sortByElement : sortByArray) {
if (sortByElement.startsWith("geo:")) {
String[] elements = sortByElement.split(":");
GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1], Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS);
if (elements.length > 4 && elements[4].equals("desc")) {
requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC));
} else {
requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC));
}
} else {
String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType);
if (name != null) {
if (sortByElement.endsWith(":desc")) {
requestBuilder = requestBuilder.addSort(name, SortOrder.DESC);
} else {
requestBuilder = requestBuilder.addSort(name, SortOrder.ASC);
}
} else {
// in the case of no data existing for the property, we will not add the sorting to the request.
}
}
}
}
SearchResponse response = requestBuilder
.setVersion(true)
.execute()
.actionGet();
if (size == -1) {
// Scroll until no more hits are returned
while (true) {
for (SearchHit searchHit : response.getHits().getHits()) {
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
} else {
SearchHits searchHits = response.getHits();
scrollIdentifier = response.getScrollId();
totalHits = searchHits.getTotalHits();
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
}
} catch (Exception t) {
throw new Exception("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
}
PartialList<T> result = new PartialList<T>(results, offset, size, totalHits);
if (scrollIdentifier != null && totalHits != 0) {
result.setScrollIdentifier(scrollIdentifier);
result.setScrollTimeValidity(scrollTimeValidity);
}
return result;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery") {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
long totalHits = 0;
try {
TimeValue keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueMinutes(10), "scrollTimeValidity");
SearchResponse response = client.prepareSearchScroll(scrollIdentifier).setScroll(keepAlive).execute().actionGet();
if (response.getHits().getHits().length == 0) {
client.prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();
} else {
for (SearchHit searchHit : response.getHits().getHits()) {
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
value.setVersion(searchHit.getVersion());
results.add(value);
}
}
PartialList<T> result = new PartialList<T>(results, 0, response.getHits().getHits().length, response.getHits().getTotalHits());
if (scrollIdentifier != null) {
result.setScrollIdentifier(scrollIdentifier);
result.setScrollTimeValidity(scrollTimeValidity);
}
return result;
} catch (Exception t) {
throw new Exception("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t);
}
}
}.catchingExecuteInClassLoader(true);
}
/**
* @deprecated As of version 1.3.0-incubating, use {@link #aggregateWithOptimizedQuery(Condition, BaseAggregate, String)} instead
*/
@Deprecated
@Override
public Map<String, Long> aggregateQuery(Condition filter, BaseAggregate aggregate, String itemType) {
return aggregateQuery(filter, aggregate, itemType, false);
}
@Override
public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType) {
return aggregateQuery(filter, aggregate, itemType, true);
}
private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery) {
return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery") {
@Override
protected Map<String, Long> execute(Object... args) {
Map<String, Long> results = new LinkedHashMap<String, Long>();
SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery());
List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>();
if (aggregate != null) {
AggregationBuilder bucketsAggregation = null;
String fieldName = aggregate.getField();
if (aggregate instanceof DateAggregate) {
DateAggregate dateAggregate = (DateAggregate) aggregate;
DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).dateHistogramInterval(new DateHistogramInterval((dateAggregate.getInterval())));
if (dateAggregate.getFormat() != null) {
dateHistogramBuilder.format(dateAggregate.getFormat());
}
bucketsAggregation = dateHistogramBuilder;
} else if (aggregate instanceof NumericRangeAggregate) {
RangeAggregationBuilder rangebuilder = AggregationBuilders.range("buckets").field(fieldName);
for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) {
if (range != null) {
if (range.getFrom() != null && range.getTo() != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
} else if (range.getFrom() != null) {
rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
} else if (range.getTo() != null) {
rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
}
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof DateRangeAggregate) {
DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate;
DateRangeAggregationBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(fieldName);
if (dateRangeAggregate.getFormat() != null) {
rangebuilder.format(dateRangeAggregate.getFormat());
}
for (DateRange range : dateRangeAggregate.getDateRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom() != null ? range.getFrom().toString() : null, range.getTo() != null ? range.getTo().toString() : null);
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof IpRangeAggregate) {
IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate;
IpRangeAggregationBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(fieldName);
for (IpRange range : ipRangeAggregate.getRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
}
}
bucketsAggregation = rangebuilder;
} else {
fieldName = getPropertyNameWithData(fieldName, itemType);
//default
if (fieldName != null) {
bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
if (aggregate instanceof TermsAggregate) {
TermsAggregate termsAggregate = (TermsAggregate) aggregate;
if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
}
}
} else {
// field name could be null if no existing data exists
}
}
if (bucketsAggregation != null) {
final MissingAggregationBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(fieldName);
for (AggregationBuilder aggregationBuilder : lastAggregation) {
bucketsAggregation.subAggregation(aggregationBuilder);
missingBucketsAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation);
}
}
// If the request is optimized then we don't need a global aggregation which is very slow and we can put the query with a
// filter on range items in the query block so we don't retrieve all the document before filtering the whole
if (optimizedQuery) {
for (AggregationBuilder aggregationBuilder : lastAggregation) {
builder.addAggregation(aggregationBuilder);
}
if (filter != null) {
builder.setQuery(conditionESQueryBuilderDispatcher.buildFilter(filter));
}
} else {
if (filter != null) {
AggregationBuilder filterAggregation = AggregationBuilders.filter("filter", conditionESQueryBuilderDispatcher.buildFilter(filter));
for (AggregationBuilder aggregationBuilder : lastAggregation) {
filterAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Collections.singletonList(filterAggregation);
}
AggregationBuilder globalAggregation = AggregationBuilders.global("global");
for (AggregationBuilder aggregationBuilder : lastAggregation) {
globalAggregation.subAggregation(aggregationBuilder);
}
builder.addAggregation(globalAggregation);
}
SearchResponse response = builder.execute().actionGet();
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
if (optimizedQuery) {
if (response.getHits() != null) {
results.put("_filtered", response.getHits().getTotalHits());
}
} else {
Global globalAgg = aggregations.get("global");
results.put("_all", globalAgg.getDocCount());
aggregations = globalAgg.getAggregations();
if (aggregations.get("filter") != null) {
Filter filterAgg = aggregations.get("filter");
results.put("_filtered", filterAgg.getDocCount());
aggregations = filterAgg.getAggregations();
}
}
if (aggregations.get("buckets") != null) {
MultiBucketsAggregation terms = aggregations.get("buckets");
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
results.put(bucket.getKeyAsString(), bucket.getDocCount());
}
SingleBucketAggregation missing = aggregations.get("missing");
if (missing.getDocCount() > 0) {
results.put("_missing", missing.getDocCount());
}
}
}
return results;
}
}.catchingExecuteInClassLoader(true);
}
private <T extends Item> String[] getRouting(String fieldName, String[] fieldValues, Class<T> clazz) {
String itemType = Item.getItemType(clazz);
String[] routing = null;
if (routingByType.containsKey(itemType) && routingByType.get(itemType).equals(fieldName)) {
routing = fieldValues;
}
return routing;
}
@Override
public void refresh() {
new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh") {
protected Boolean execute(Object... args) {
if (bulkProcessor != null) {
bulkProcessor.flush();
}
client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
return true;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public void purge(final Date date) {
new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") {
@Override
protected Object execute(Object... args) throws Exception {
IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*")
.setIndexing(false)
.setGet(false)
.setSearch(false)
.setWarmer(false)
.setMerge(false)
.setFieldData(false)
.setFlush(false)
.setCompletion(false)
.setRefresh(false)
.execute()
.actionGet();
SimpleDateFormat d = new SimpleDateFormat("yyyy-MM");
List<String> toDelete = new ArrayList<String>();
for (String currentIndexName : statsResponse.getIndices().keySet()) {
if (currentIndexName.startsWith(indexName + "-")) {
try {
Date indexDate = d.parse(currentIndexName.substring(indexName.length() + 1));
if (indexDate.before(date)) {
toDelete.add(currentIndexName);
}
} catch (ParseException e) {
throw new Exception("Cannot parse index name " + currentIndexName, e);
}
}
}
if (!toDelete.isEmpty()) {
client.admin().indices().prepareDelete(toDelete.toArray(new String[toDelete.size()])).execute().actionGet();
}
return null;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public void purge(final String scope) {
new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope") {
@Override
protected Void execute(Object... args) {
QueryBuilder query = termQuery("scope", scope);
BulkRequestBuilder deleteByScope = client.prepareBulk();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchResponse response = client.prepareSearch(indexName + "*")
.setScroll(keepAlive)
.setQuery(query)
.setSize(100).execute().actionGet();
// Scroll until no more hits are returned
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
// add hit to bulk delete
deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id()));
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
// we're done with the scrolling, delete now
if (deleteByScope.numberOfActions() > 0) {
final BulkResponse deleteResponse = deleteByScope.get();
if (deleteResponse.hasFailures()) {
// do something
logger.debug("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage());
}
}
return null;
}
}.catchingExecuteInClassLoader(true);
}
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) {
return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics") {
@Override
protected Map<String, Double> execute(Object... args) {
Map<String, Double> results = new LinkedHashMap<String, Double>();
SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery());
AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics", conditionESQueryBuilderDispatcher.buildFilter(condition));
if (metrics != null) {
for (String metric : metrics) {
switch (metric) {
case "sum":
filterAggregation.subAggregation(AggregationBuilders.sum("sum").field(field));
break;
case "avg":
filterAggregation.subAggregation(AggregationBuilders.avg("avg").field(field));
break;
case "min":
filterAggregation.subAggregation(AggregationBuilders.min("min").field(field));
break;
case "max":
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
break;
case "card":
filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
break;
case "count":
filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
break;
}
}
}
builder.addAggregation(filterAggregation);
SearchResponse response = builder.execute().actionGet();
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
Aggregation metricsResults = aggregations.get("metrics");
if (metricsResults instanceof HasAggregations) {
aggregations = ((HasAggregations) metricsResults).getAggregations();
for (Aggregation aggregation : aggregations) {
InternalNumericMetricsAggregation.SingleValue singleValue = (InternalNumericMetricsAggregation.SingleValue) aggregation;
results.put("_" + singleValue.getName(), singleValue.value());
}
}
}
return results;
}
}.catchingExecuteInClassLoader(true);
}
private String getIndexNameForQuery(String itemType) {
return indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName);
}
private String getConfig(Map<String, String> settings, String key,
String defaultValue) {
if (settings != null && settings.get(key) != null) {
return settings.get(key);
}
return defaultValue;
}
public abstract static class InClassLoaderExecute<T> {
private String timerName;
private MetricsService metricsService;
public InClassLoaderExecute(MetricsService metricsService, String timerName) {
this.timerName = timerName;
this.metricsService = metricsService;
}
protected abstract T execute(Object... args) throws Exception;
public T executeInClassLoader(Object... args) throws Exception {
long startTime = System.currentTimeMillis();
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return execute(args);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(timerName, startTime);
}
Thread.currentThread().setContextClassLoader(tccl);
}
}
public T catchingExecuteInClassLoader(boolean logError, Object... args) {
try {
return executeInClassLoader(timerName, args);
} catch (Exception e) {
if (logError) {
logger.error("Error while executing in class loader", e);
}
}
return null;
}
}
public TransportClient newTransportClient(Settings.Builder settingsBuilder,
String transportClientClassName,
String transportClientJarDirectory,
String transportClientProperties) {
ArrayList<URL> urls = new ArrayList<>();
File pluginLocationFile = new File(transportClientJarDirectory);
File[] pluginLocationFiles = pluginLocationFile.listFiles();
for (File pluginFile : pluginLocationFiles) {
if (pluginFile.getName().toLowerCase().endsWith(".jar")) {
try {
urls.add(pluginFile.toURI().toURL());
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(this.getClass().getClassLoader(), urls.toArray(new URL[urls.size()]));
if (StringUtils.isNotBlank(transportClientProperties)) {
String[] clientProperties = transportClientProperties.split(",");
if (clientProperties.length > 0) {
for (String clientProperty : clientProperties) {
String[] clientPropertyParts = clientProperty.split("=");
settingsBuilder.put(clientPropertyParts[0], clientPropertyParts[1]);
}
}
}
try {
Class<?> transportClientClass = childFirstClassLoader.loadClass(transportClientClassName);
Constructor<?> transportClientConstructor = transportClientClass.getConstructor(Settings.class, Class[].class);
return (TransportClient) transportClientConstructor.newInstance(settingsBuilder.build(), new Class[0]);
} catch (ClassNotFoundException e) {
logger.error("Couldn't find class " + transportClientClassName, e);
} catch (NoSuchMethodException e) {
logger.error("Error creating transport client with class" + transportClientClassName, e);
} catch (IllegalAccessException e) {
logger.error("Error creating transport client with class" + transportClientClassName, e);
} catch (InstantiationException e) {
logger.error("Error creating transport client with class" + transportClientClassName, e);
} catch (InvocationTargetException e) {
logger.error("Error creating transport client with class" + transportClientClassName, e);
}
return null;
}
private <T extends Item> boolean isCacheActiveForClass(String className) {
if (itemClassesToCacheSet.contains("*")) {
return true;
}
if (itemClassesToCacheSet.contains(className)) {
return true;
}
return false;
}
private <T extends Item> T getFromCache(String itemId, Class<T> clazz) {
String className = clazz.getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.get(itemId);
}
private <T extends Item> T putInCache(String itemId, T item) {
String className = item.getClass().getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.put(itemId, item);
}
private <T extends Item> T deleteFromCache(String itemId, Class clazz) {
String className = clazz.getName();
if (!isCacheActiveForClass(className)) {
return null;
}
Map<String,T> itemCache = hazelcastInstance.getMap(className);
return itemCache.remove(itemId);
}
}