blob: a743237e31a007a5833159f4c6a896f04754bd71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.persistence.elasticsearch;
import org.apache.unomi.api.ClusterNode;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.TimestampedItem;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.DateRange;
import org.apache.unomi.api.query.IpRange;
import org.apache.unomi.api.query.NumericRange;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.node.Node;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder;
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.SynchronousBundleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ClusterService, SynchronousBundleListener {
public static final long MILLIS_PER_DAY = 24L * 60L * 60L * 1000L;
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private Node node;
private Client client;
private String clusterName;
private String indexName;
private String monthlyIndexNumberOfShards;
private String monthlyIndexNumberOfReplicas;
private String numberOfShards;
private String numberOfReplicas;
private Boolean nodeData;
private Boolean discoveryEnabled;
private String elasticSearchConfig = null;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
private Map<String,String> indexNames;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
private String address;
private String port;
private String secureAddress;
private String securePort;
private Timer timer;
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) {
this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
}
public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) {
this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
}
public void setDiscoveryEnabled(Boolean discoveryEnabled) {
this.discoveryEnabled = discoveryEnabled;
}
public void setNumberOfShards(String numberOfShards) {
this.numberOfShards = numberOfShards;
}
public void setNumberOfReplicas(String numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
}
public void setNodeData(Boolean nodeData) {
this.nodeData = nodeData;
}
public void setAddress(String address) {
this.address = address;
}
public void setPort(String port) {
this.port = port;
}
public void setSecureAddress(String secureAddress) {
this.secureAddress = secureAddress;
}
public void setSecurePort(String securePort) {
this.securePort = securePort;
}
public void setItemsMonthlyIndexed(List<String> itemsMonthlyIndexed) {
this.itemsMonthlyIndexed = itemsMonthlyIndexed;
}
public void setIndexNames(Map<String, String> indexNames) {
this.indexNames = indexNames;
}
public void setRoutingByType(Map<String, String> routingByType) {
this.routingByType = routingByType;
}
public void setElasticSearchConfig(String elasticSearchConfig) {
this.elasticSearchConfig = elasticSearchConfig;
}
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
}
public void start() {
loadPredefinedMappings(bundleContext, false);
// on startup
new InClassLoaderExecute<Object>() {
public Object execute(Object... args) {
logger.info("Starting ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "...");
Map<String, String> settings = null;
if (elasticSearchConfig != null && elasticSearchConfig.length() > 0) {
try {
URL elasticSearchConfigURL = new URL(elasticSearchConfig);
Settings.Builder settingsBuilder = ImmutableSettings.builder().loadFromUrl(elasticSearchConfigURL);
settings = settingsBuilder.build().getAsMap();
logger.info("Successfully loaded ElasticSearch configuration from " + elasticSearchConfigURL);
} catch (MalformedURLException e) {
logger.error("Error in ElasticSearch configuration URL ", e);
} catch (SettingsException se) {
logger.info("Error trying to load settings from " + elasticSearchConfig + ": " + se.getMessage() + " (activate debug mode for exception details)");
if (logger.isDebugEnabled()) {
logger.debug("Exception details", se);
}
}
}
address = System.getProperty("contextserver.address", address);
port = System.getProperty("contextserver.port", port);
secureAddress = System.getProperty("contextserver.secureAddress", secureAddress);
securePort = System.getProperty("contextserver.securePort", securePort);
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
if (settings != null) {
settingsBuilder.put(settings);
}
settingsBuilder.put("cluster.name", clusterName)
.put("node.data", nodeData)
.put("discovery.zen.ping.multicast.enabled", discoveryEnabled)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.number_of_shards", numberOfShards)
.put("node.contextserver.address", address)
.put("node.contextserver.port", port)
.put("node.contextserver.secureAddress", secureAddress)
.put("node.contextserver.securePort", securePort);
node = nodeBuilder().settings(settingsBuilder).node();
client = node.client();
// @todo is there a better way to detect index existence than to wait for it to startup ?
boolean indexExists = false;
int tries = 0;
while (!indexExists && tries < 20) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
indexExists = indicesExistsResponse.isExists();
tries++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("Interrupted", e);
}
}
if (!indexExists) {
logger.info("{} index doesn't exist yet, creating it...", indexName);
Map<String,String> indexMappings = new HashMap<String,String>();
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
internalCreateIndex(indexName, indexMappings);
}
client.admin().indices().preparePutTemplate(indexName + "_monthlyindex")
.setTemplate(indexName + "-*")
.setOrder(1)
.setSettings(ImmutableSettings.settingsBuilder()
.put("number_of_shards", Integer.parseInt(monthlyIndexNumberOfShards))
.put("number_of_replicas", Integer.parseInt(monthlyIndexNumberOfReplicas))
.build()).execute().actionGet();
getMonthlyIndex(new Date(), true);
return null;
}
}.executeInClassLoader();
bundleContext.addBundleListener(this);
try {
for (ServiceReference<ConditionEvaluator> reference : bundleContext.getServiceReferences(ConditionEvaluator.class, null)) {
ConditionEvaluator service = bundleContext.getService(reference);
conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(), reference.getBundle().getBundleId(), service);
}
for (ServiceReference<ConditionESQueryBuilder> reference : bundleContext.getServiceReferences(ConditionESQueryBuilder.class, null)) {
ConditionESQueryBuilder service = bundleContext.getService(reference);
conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(), reference.getBundle().getBundleId(), service);
}
} catch (Exception e) {
logger.error("Cannot get services", e);
}
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
GregorianCalendar gc = new GregorianCalendar();
int thisMonth = gc.get(Calendar.MONTH);
gc.add(Calendar.DAY_OF_MONTH, 1);
if (gc.get(Calendar.MONTH) != thisMonth) {
getMonthlyIndex(gc.getTime(), true);
}
}
}, 10000L, 24L * 60L * 60L * 1000L);
}
public void stop() {
new InClassLoaderExecute<Object>() {
protected Object execute(Object... args) {
logger.info("Closing ElasticSearch persistence backend...");
node.close();
return null;
}
}.executeInClassLoader();
if (timer != null) {
timer.cancel();
timer = null;
}
bundleContext.removeBundleListener(this);
}
@Override
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
case BundleEvent.STARTED:
if (event.getBundle() != null && event.getBundle().getRegisteredServices() != null) {
for (ServiceReference<?> reference : event.getBundle().getRegisteredServices()) {
Object service = bundleContext.getService(reference);
if (service instanceof ConditionEvaluator) {
conditionEvaluatorDispatcher.addEvaluator(reference.getProperty("conditionEvaluatorId").toString(), event.getBundle().getBundleId(), (ConditionEvaluator) service);
}
if (service instanceof ConditionESQueryBuilder) {
conditionESQueryBuilderDispatcher.addQueryBuilder(reference.getProperty("queryBuilderId").toString(), event.getBundle().getBundleId(), (ConditionESQueryBuilder) service);
}
}
}
break;
case BundleEvent.STARTING:
loadPredefinedMappings(event.getBundle().getBundleContext(), true);
break;
case BundleEvent.STOPPING:
conditionEvaluatorDispatcher.removeEvaluators(event.getBundle().getBundleId());
conditionESQueryBuilderDispatcher.removeQueryBuilders(event.getBundle().getBundleId());
break;
}
}
private String getMonthlyIndex(Date date) {
return getMonthlyIndex(date, false);
}
private String getMonthlyIndex(Date date, boolean checkAndCreate) {
String d = new SimpleDateFormat("-YYYY-MM").format(date);
String monthlyIndexName = indexName + d;
if (checkAndCreate) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (!indexExists) {
logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName);
Map<String,String> indexMappings = new HashMap<String,String>();
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (itemsMonthlyIndexed.contains(entry.getKey())) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
internalCreateIndex(monthlyIndexName, indexMappings);
logger.info("{} index created.", monthlyIndexName);
}
}
return monthlyIndexName;
}
private void loadPredefinedMappings(BundleContext bundleContext, boolean createMapping) {
Enumeration<URL> predefinedMappings = bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
return;
}
while (predefinedMappings.hasMoreElements()) {
URL predefinedMappingURL = predefinedMappings.nextElement();
logger.debug("Found mapping at " + predefinedMappingURL + ", loading... ");
try {
final String path = predefinedMappingURL.getPath();
String name = path.substring(path.lastIndexOf('/') + 1, path.lastIndexOf('.'));
BufferedReader reader = new BufferedReader(new InputStreamReader(predefinedMappingURL.openStream()));
StringBuilder content = new StringBuilder();
String l;
while ((l = reader.readLine()) != null) {
content.append(l);
}
mappings.put(name, content.toString());
if (createMapping) {
if (itemsMonthlyIndexed.contains(name)) {
createMapping(name, content.toString(), indexName + "-*");
} else if (indexNames.containsKey(name)) {
if (client.admin().indices().prepareExists(indexNames.get(name)).execute().actionGet().isExists()) {
createMapping(name, content.toString(), indexNames.get(name));
}
} else {
createMapping(name, content.toString(), indexName);
}
}
} catch (Exception e) {
logger.error("Error while loading mapping definition " + predefinedMappingURL, e);
}
}
}
@Override
public <T extends Item> List<T> getAllItems(final Class<T> clazz) {
return getAllItems(clazz, 0, -1, null).getList();
}
@Override
public long getAllItemsCount(String itemType) {
return queryCount(FilterBuilders.matchAllFilter(), itemType);
}
@Override
public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy) {
return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> T load(final String itemId, final Class<T> clazz) {
return load(itemId, null, clazz);
}
@Override
public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) {
return new InClassLoaderExecute<T>() {
protected T execute(Object... args) {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
if (itemsMonthlyIndexed.contains(itemType) && dateHint == null) {
PartialList<T> r = query(QueryBuilders.idsQuery(itemType).ids(itemId), null, clazz, 0, 1, null);
if (r.size() > 0) {
return r.get(0);
}
return null;
} else {
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(dateHint) : indexName);
GetResponse response = client.prepareGet(index, itemType, itemId)
.execute()
.actionGet();
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(response.getId());
return value;
} else {
return null;
}
}
} catch (IndexMissingException e) {
logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (IllegalAccessException e) {
logger.error("Error loading itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (Exception t) {
logger.error("Error loading itemType=" + clazz.getName() + "itemId=" + itemId, t);
}
return null;
}
}.executeInClassLoader();
}
@Override
public boolean save(final Item item) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
try {
String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName);
IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, item.getItemId())
.setSource(source);
if (routingByType.containsKey(itemType)) {
indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
}
try {
indexBuilder.execute().actionGet();
} catch (IndexMissingException e) {
if (itemsMonthlyIndexed.contains(itemType)) {
getMonthlyIndex(((TimestampedItem) item).getTimeStamp(), true);
indexBuilder.execute().actionGet();
}
}
return true;
} catch (IOException e) {
logger.error("Error saving item " + item, e);
}
return false;
}
}.executeInClassLoader();
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
}
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
client.prepareUpdate(index, itemType, itemId).setDoc(source)
.execute()
.actionGet();
return true;
} catch (IndexMissingException e) {
logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (NoSuchFieldException e) {
logger.error("Error updating item " + itemId, e);
} catch (IllegalAccessException e) {
logger.error("Error updating item " + itemId, e);
}
return false;
}
}.executeInClassLoader();
}
@Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
client.prepareUpdate(index, itemType, itemId).setScript(script, ScriptService.ScriptType.INLINE).setScriptParams(scriptParams)
.execute()
.actionGet();
return true;
} catch (IndexMissingException e) {
logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (NoSuchFieldException e) {
logger.error("Error updating item " + itemId, e);
} catch (IllegalAccessException e) {
logger.error("Error updating item " + itemId, e);
}
return false;
}
}.executeInClassLoader();
}
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
//Index the query = register it in the percolator
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
client.prepareDelete(getIndexNameForQuery(itemType), itemType, itemId)
.execute().actionGet();
return true;
} catch (Exception e) {
logger.error("Cannot remove", e);
}
return false;
}
}.executeInClassLoader();
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
client.prepareDeleteByQuery(getIndexNameForQuery(itemType))
.setQuery(conditionESQueryBuilderDispatcher.getQueryBuilder(query))
.execute().actionGet();
return true;
} catch (Exception e) {
logger.error("Cannot remove by query", e);
}
return false;
}
}.executeInClassLoader();
}
public boolean createIndex(final String indexName) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (!indexExists) {
Map<String,String> indexMappings = new HashMap<String,String>();
for (Map.Entry<String, String> entry : mappings.entrySet()) {
if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) {
indexMappings.put(entry.getKey(), entry.getValue());
}
}
internalCreateIndex(indexName, indexMappings);
}
return !indexExists;
}
}.executeInClassLoader();
}
public boolean removeIndex(final String indexName) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet();
boolean indexExists = indicesExistsResponse.isExists();
if (indexExists) {
client.admin().indices().prepareDelete(indexName).execute().actionGet();
}
return indexExists;
}
}.executeInClassLoader();
}
private void internalCreateIndex(String indexName, Map<String,String> mappings) {
CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName)
.setSettings("{\n" +
" \"analysis\": {\n" +
" \"tokenizer\": {\n" +
" \"myTokenizer\": {\n" +
" \"type\":\"pattern\",\n" +
" \"pattern\":\".*\",\n" +
" \"group\":0\n" +
" }\n" +
" },\n" +
" \"analyzer\": {\n" +
" \"folding\": {\n" +
" \"type\":\"custom\",\n" +
" \"tokenizer\": \"myTokenizer\",\n" +
" \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n");
for (Map.Entry<String, String> entry : mappings.entrySet()) {
builder.addMapping(entry.getKey(), entry.getValue());
}
builder.execute().actionGet();
}
private boolean createMapping(final String type, final String source, final String indexName) {
client.admin().indices()
.preparePutMapping(indexName)
.setType(type)
.setSource(source)
.execute().actionGet();
return true;
}
@Override
public Map<String, Map<String, Object>> getMapping(final String itemType) {
return new InClassLoaderExecute<Map<String, Map<String, Object>>>() {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args) {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings();
Map<String, Map<String, Object>> propertyMap = new HashMap<>();
try {
UnmodifiableIterator<ImmutableOpenMap<String, MappingMetaData>> it = mappings.valuesIt();
while (it.hasNext()) {
ImmutableOpenMap<String, MappingMetaData> next = it.next();
Map<String, Map<String, Object>> properties = (Map<String, Map<String, Object>>) next.get(itemType).getSourceAsMap().get("properties");
for (Map.Entry<String, Map<String, Object>> entry : properties.entrySet()) {
if (propertyMap.containsKey(entry.getKey())) {
Map<String, Object> subPropMap = propertyMap.get(entry.getKey());
for (Map.Entry<String, Object> subentry : entry.getValue().entrySet()) {
if (subPropMap.containsKey(subentry.getKey()) && subPropMap.get(subentry.getKey()) instanceof Map && subentry.getValue() instanceof Map) {
((Map) subPropMap.get(subentry.getKey())).putAll((Map) subentry.getValue());
} else {
subPropMap.put(subentry.getKey(), subentry.getValue());
}
}
} else {
propertyMap.put(entry.getKey(), entry.getValue());
}
}
}
} catch (IOException e) {
logger.error("Cannot get mapping", e);
}
return propertyMap;
}
}.executeInClassLoader();
}
public boolean saveQuery(final String queryName, final String query) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
//Index the query = register it in the percolator
try {
logger.info("Saving query : " + queryName);
client.prepareIndex(indexName, ".percolator", queryName)
.setSource(query)
.setRefresh(true) // Needed when the query shall be available immediately
.execute().actionGet();
return true;
} catch (Exception e) {
logger.error("Cannot save query", e);
}
return false;
}
}.executeInClassLoader();
}
@Override
public boolean saveQuery(String queryName, Condition query) {
if (query == null) {
return false;
}
saveQuery(queryName, conditionESQueryBuilderDispatcher.getQuery(query));
return true;
}
@Override
public boolean removeQuery(final String queryName) {
return new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
//Index the query = register it in the percolator
try {
client.prepareDelete(indexName, ".percolator", queryName)
.setRefresh(true) // Needed when the query shall be available immediately
.execute().actionGet();
return true;
} catch (Exception e) {
logger.error("Cannot delete query", e);
}
return false;
}
}.executeInClassLoader();
}
@Override
public List<String> getMatchingSavedQueries(final Item item) {
return new InClassLoaderExecute<List<String>>() {
protected List<String> execute(Object... args) {
List<String> matchingQueries = new ArrayList<String>();
try {
String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
//Percolate
PercolateResponse response = client.preparePercolate()
.setIndices(indexName)
.setDocumentType(itemType)
.setSource("{doc:" + source + "}").execute().actionGet();
//Iterate over the results
for (PercolateResponse.Match match : response) {
//Handle the result which is the name of
//the query in the percolator
matchingQueries.add(match.getId().string());
}
} catch (IOException e) {
logger.error("Error getting matching saved queries for item=" + item, e);
}
return matchingQueries;
}
}.executeInClassLoader();
}
@Override
public boolean testMatch(Condition query, Item item) {
try {
return conditionEvaluatorDispatcher.eval(query, item);
} catch (UnsupportedOperationException e) {
logger.error("Eval not supported, continue with query", e);
}
try {
final Class<? extends Item> clazz = item.getClass();
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
FilterBuilder builder = FilterBuilders.andFilter(
FilterBuilders.idsFilter(itemType).ids(item.getItemId()),
conditionESQueryBuilderDispatcher.buildFilter(query));
return queryCount(builder, itemType) > 0;
} catch (IllegalAccessException e) {
logger.error("Error getting query for item=" + item, e);
} catch (NoSuchFieldException e) {
logger.error("Error getting query for item=" + item, e);
}
return false;
}
@Override
public <T extends Item> List<T> query(final Condition query, String sortBy, final Class<T> clazz) {
return query(query, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String fieldValue, String sortBy, final Class<T> clazz) {
return query(fieldName, fieldValue, sortBy, clazz, 0, -1).getList();
}
@Override
public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) {
return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz)).getList();
}
@Override
public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz));
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz));
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
return query(QueryBuilders.queryStringQuery(fulltext).defaultField("_all"), sortBy, clazz, offset, size, getRouting("_all", new String[]{fulltext}, clazz));
}
@Override
public <T extends Item> PartialList<T> rangeQuery(String fieldName, String from, String to, String sortBy, Class<T> clazz, int offset, int size) {
RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
builder.from(from);
builder.to(to);
return query(builder, sortBy, clazz, offset, size, null);
}
@Override
public long queryCount(Condition query, String itemType) {
return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
}
private long queryCount(final FilterBuilder filter, final String itemType) {
return new InClassLoaderExecute<Long>() {
@Override
protected Long execute(Object... args) {
SearchResponse response = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSearchType(SearchType.COUNT)
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(AggregationBuilders.filter("filter").filter(filter))
.execute()
.actionGet();
Aggregations searchHits = response.getAggregations();
Filter filter = searchHits.get("filter");
return filter.getDocCount();
}
}.executeInClassLoader();
}
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing) {
return new InClassLoaderExecute<PartialList<T>>() {
@Override
protected PartialList<T> execute(Object... args) {
List<T> results = new ArrayList<T>();
long totalHits = 0;
try {
String itemType = getItemType(clazz);
SearchRequestBuilder requestBuilder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setFetchSource(true)
.setQuery(query)
.setFrom(offset);
if (size != -1) {
requestBuilder.setSize(size);
} else {
requestBuilder.setSize(Integer.MAX_VALUE);
}
if (routing != null) {
requestBuilder.setRouting(routing);
}
if (sortBy != null) {
String[] sortByArray = sortBy.split(",");
for (String sortByElement : sortByArray) {
if (sortByElement.startsWith("geo:")) {
String[] elements = sortByElement.split(":");
GeoDistanceSortBuilder distanceSortBuilder = SortBuilders.geoDistanceSort(elements[1]).point(Double.parseDouble(elements[2]), Double.parseDouble(elements[3])).unit(DistanceUnit.KILOMETERS);
if (elements.length > 4 && elements[4].equals("desc")) {
requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.DESC));
} else {
requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC));
}
} else {
if (sortByElement.endsWith(":desc")) {
requestBuilder = requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - ":desc".length()), SortOrder.DESC);
} else if (sortByElement.endsWith(":asc")) {
requestBuilder = requestBuilder.addSort(sortByElement.substring(0, sortByElement.length() - ":asc".length()), SortOrder.ASC);
} else {
requestBuilder = requestBuilder.addSort(sortByElement, SortOrder.ASC);
}
}
}
}
SearchResponse response = requestBuilder
.execute()
.actionGet();
SearchHits searchHits = response.getHits();
totalHits = searchHits.getTotalHits();
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = CustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
value.setItemId(searchHit.getId());
results.add(value);
}
} catch (Exception t) {
logger.error("Error loading itemType=" + clazz.getName() + "query=" + query, t);
}
return new PartialList<T>(results, offset, size, totalHits);
}
}.executeInClassLoader();
}
@Override
public Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType) {
return new InClassLoaderExecute<Map<String, Long>>() {
@Override
protected Map<String, Long> execute(Object... args) {
Map<String, Long> results = new LinkedHashMap<String, Long>();
SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSearchType(SearchType.COUNT)
.setQuery(QueryBuilders.matchAllQuery());
List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>();
if (aggregate != null) {
AggregationBuilder bucketsAggregation = null;
if (aggregate instanceof DateAggregate) {
DateAggregate dateAggregate = (DateAggregate) aggregate;
DateHistogramBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(aggregate.getField()).interval(new DateHistogram.Interval((dateAggregate.getInterval())));
if (dateAggregate.getFormat() != null) {
dateHistogramBuilder.format(dateAggregate.getFormat());
}
bucketsAggregation = dateHistogramBuilder;
} else if (aggregate instanceof NumericRangeAggregate) {
RangeBuilder rangebuilder = AggregationBuilders.range("buckets").field(aggregate.getField());
for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) {
if (range != null) {
if (range.getFrom() != null && range.getTo() != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
} else if (range.getFrom() != null) {
rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
} else if (range.getTo() != null) {
rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
}
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof DateRangeAggregate) {
DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate;
DateRangeBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(aggregate.getField());
if (dateRangeAggregate.getFormat() != null) {
rangebuilder.format(dateRangeAggregate.getFormat());
}
for (DateRange range : dateRangeAggregate.getDateRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof IpRangeAggregate) {
IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate;
IPv4RangeBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(aggregate.getField());
for (IpRange range : ipRangeAggregate.getRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
}
}
bucketsAggregation = rangebuilder;
} else {
//default
bucketsAggregation = AggregationBuilders.terms("buckets").field(aggregate.getField()).size(Integer.MAX_VALUE);
}
if (bucketsAggregation != null) {
final MissingBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(aggregate.getField());
for (AggregationBuilder aggregationBuilder : lastAggregation) {
bucketsAggregation.subAggregation(aggregationBuilder);
missingBucketsAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation);
}
}
if (filter != null) {
AggregationBuilder filterAggregation = AggregationBuilders.filter("filter").filter(conditionESQueryBuilderDispatcher.buildFilter(filter));
for (AggregationBuilder aggregationBuilder : lastAggregation) {
filterAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Collections.singletonList(filterAggregation);
}
AggregationBuilder globalAggregation = AggregationBuilders.global("global");
for (AggregationBuilder aggregationBuilder : lastAggregation) {
globalAggregation.subAggregation(aggregationBuilder);
}
builder.addAggregation(globalAggregation);
SearchResponse response = builder.execute().actionGet();
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
Global globalAgg = aggregations.get("global");
results.put("_all", globalAgg.getDocCount());
aggregations = globalAgg.getAggregations();
if (aggregations.get("filter") != null) {
Filter filterAgg = aggregations.get("filter");
results.put("_filtered", filterAgg.getDocCount());
aggregations = filterAgg.getAggregations();
}
if (aggregations.get("buckets") != null) {
MultiBucketsAggregation terms = aggregations.get("buckets");
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
results.put(bucket.getKey(), bucket.getDocCount());
}
SingleBucketAggregation missing = aggregations.get("missing");
if (missing.getDocCount() > 0) {
results.put("_missing", missing.getDocCount());
}
}
}
return results;
}
}.executeInClassLoader();
}
private <T extends Item> String getItemType(Class<T> clazz) {
try {
return (String) clazz.getField("ITEM_TYPE").get(null);
} catch (NoSuchFieldException e) {
logger.error("Class " + clazz.getName() + " doesn't define a publicly accessible ITEM_TYPE field", e);
} catch (IllegalAccessException e) {
logger.error("Error loading itemType=" + clazz.getName(), e);
}
return null;
}
private <T extends Item> String[] getRouting(String fieldName, String[] fieldValues, Class<T> clazz) {
String itemType = getItemType(clazz);
String[] routing = null;
if (routingByType.containsKey(itemType) && routingByType.get(itemType).equals(fieldName)) {
routing = fieldValues;
}
return routing;
}
@Override
public List<ClusterNode> getClusterNodes() {
return new InClassLoaderExecute<List<ClusterNode>>() {
@Override
protected List<ClusterNode> execute(Object... args) {
Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>();
NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo(NodesOperationRequest.ALL_NODES)
.setSettings(true)
.execute()
.actionGet();
NodeInfo[] nodesInfoArray = nodesInfoResponse.getNodes();
for (NodeInfo nodeInfo : nodesInfoArray) {
if (nodeInfo.getSettings().get("node.contextserver.address") != null) {
ClusterNode clusterNode = new ClusterNode();
clusterNode.setHostName(nodeInfo.getHostname());
clusterNode.setHostAddress(nodeInfo.getSettings().get("node.contextserver.address"));
clusterNode.setPublicPort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.port")));
clusterNode.setSecureHostAddress(nodeInfo.getSettings().get("node.contextserver.secureAddress"));
clusterNode.setSecurePort(Integer.parseInt(nodeInfo.getSettings().get("node.contextserver.securePort")));
clusterNode.setMaster(nodeInfo.getNode().isMasterNode());
clusterNode.setData(nodeInfo.getNode().isDataNode());
clusterNodes.put(nodeInfo.getNode().getId(), clusterNode);
}
}
NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats(NodesOperationRequest.ALL_NODES)
.setOs(true)
.setProcess(true)
.execute()
.actionGet();
NodeStats[] nodeStatsArray = nodesStatsResponse.getNodes();
for (NodeStats nodeStats : nodeStatsArray) {
ClusterNode clusterNode = clusterNodes.get(nodeStats.getNode().getId());
if (clusterNode != null) {
// the following may be null in the case where Sigar didn't initialize properly, for example
// because the native libraries were not installed or if we redeployed the OSGi bundle in which
// case Sigar cannot initialize properly since it tries to reload the native libraries, generates
// an error and doesn't initialize properly.
if (nodeStats.getProcess() != null && nodeStats.getProcess().getCpu() != null) {
clusterNode.setCpuLoad(nodeStats.getProcess().getCpu().getPercent());
}
if (nodeStats.getOs() != null) {
clusterNode.setLoadAverage(nodeStats.getOs().getLoadAverage());
clusterNode.setUptime(nodeStats.getOs().getUptime().getMillis());
}
}
}
return new ArrayList<ClusterNode>(clusterNodes.values());
}
}.executeInClassLoader();
}
@Override
public void refresh() {
new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
return true;
}
}.executeInClassLoader();
}
@Override
public void purge(final Date date) {
new InClassLoaderExecute<Object>() {
@Override
protected Object execute(Object... args) {
IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*")
.setIndexing(false)
.setGet(false)
.setSearch(false)
.setWarmer(false)
.setMerge(false)
.setFieldData(false)
.setFlush(false)
.setCompletion(false)
.setRefresh(false)
.setSuggest(false)
.execute()
.actionGet();
SimpleDateFormat d = new SimpleDateFormat("yyyy-MM");
List<String> toDelete = new ArrayList<String>();
for (String currentIndexName : statsResponse.getIndices().keySet()) {
if (currentIndexName.startsWith(indexName + "-")) {
try {
Date indexDate = d.parse(currentIndexName.substring(indexName.length() + 1));
if (indexDate.before(date)) {
toDelete.add(currentIndexName);
}
} catch (ParseException e) {
logger.error("Cannot parse index name " + currentIndexName, e);
}
}
}
if (!toDelete.isEmpty()) {
client.admin().indices().prepareDelete(toDelete.toArray(new String[toDelete.size()])).execute().actionGet();
}
return null;
}
}.executeInClassLoader();
}
@Override
public void purge(final String scope) {
new InClassLoaderExecute<Void>() {
@Override
protected Void execute(Object... args) {
QueryBuilder query = QueryBuilders.termQuery("scope", ConditionContextHelper.foldToASCII(scope));
BulkRequestBuilder deleteByScope = client.prepareBulk();
final TimeValue keepAlive = TimeValue.timeValueHours(1);
SearchResponse response = client.prepareSearch(indexName + "*")
.setSearchType(SearchType.SCAN)
.setScroll(keepAlive)
.setQuery(query)
.setSize(100).execute().actionGet();
// Scroll until no more hits are returned
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
// add hit to bulk delete
deleteByScope.add(Requests.deleteRequest(hit.index()).type(hit.type()).id(hit.id()));
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(keepAlive).execute().actionGet();
// If we have no more hits, exit
if (response.getHits().getHits().length == 0) {
break;
}
}
// we're done with the scrolling, delete now
if (deleteByScope.numberOfActions() > 0) {
final BulkResponse deleteResponse = deleteByScope.get();
if (deleteResponse.hasFailures()) {
// do something
logger.debug("Couldn't delete from scope " + scope + ":\n{}", deleteResponse.buildFailureMessage());
}
}
return null;
}
}.executeInClassLoader();
}
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) {
return new InClassLoaderExecute<Map<String, Double>>() {
@Override
protected Map<String, Double> execute(Object... args) {
Map<String, Double> results = new LinkedHashMap<String, Double>();
SearchRequestBuilder builder = client.prepareSearch(getIndexNameForQuery(itemType))
.setTypes(itemType)
.setSearchType(SearchType.COUNT)
.setQuery(QueryBuilders.matchAllQuery());
AggregationBuilder filterAggregation = AggregationBuilders.filter("metrics").filter(conditionESQueryBuilderDispatcher.buildFilter(condition));
if (metrics != null) {
for (String metric : metrics) {
switch (metric) {
case "sum":
filterAggregation.subAggregation(AggregationBuilders.sum("sum").field(field));
break;
case "avg":
filterAggregation.subAggregation(AggregationBuilders.avg("avg").field(field));
break;
case "min":
filterAggregation.subAggregation(AggregationBuilders.min("min").field(field));
break;
case "max":
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
break;
}
}
}
builder.addAggregation(filterAggregation);
SearchResponse response = builder.execute().actionGet();
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
Aggregation metricsResults = aggregations.get("metrics");
if (metricsResults instanceof HasAggregations) {
aggregations = ((HasAggregations) metricsResults).getAggregations();
for (Aggregation aggregation : aggregations) {
InternalNumericMetricsAggregation.SingleValue singleValue = (InternalNumericMetricsAggregation.SingleValue) aggregation;
results.put("_" + singleValue.getName(), singleValue.value());
}
}
}
return results;
}
}.executeInClassLoader();
}
private String getIndexNameForQuery(String itemType) {
return indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName);
}
public abstract static class InClassLoaderExecute<T> {
protected abstract T execute(Object... args);
public T executeInClassLoader(Object... args) {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return execute(args);
} finally {
Thread.currentThread().setContextClassLoader(tccl);
}
}
}
}