blob: c92b2991f8050247440190556fba514fbb6a46e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.yammer.metrics.core.Clock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.rest.action.admin.indices.alias.delete.AliasesMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*;
/**
* Implements index using ElasticSearch Java API.
*/
public class EsEntityIndexImpl implements AliasedEntityIndex {
private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
private static final AtomicBoolean mappingsCreated = new AtomicBoolean( false );
public static final String DEFAULT_TYPE = "_default_";
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
private final IndexBufferProducer indexBatchBufferProducer;
private final IndexFig indexFig;
private final Timer addTimer;
private final Timer addWriteAliasTimer;
private final Timer addReadAliasTimer;
private final Timer searchTimer;
/**
* We purposefully make this per instance. Some indexes may work, while others may fail
*/
private FailureMonitor failureMonitor;
private final ApplicationScope applicationScope;
private final EsProvider esProvider;
private final int cursorTimeout;
private final IndexFig config;
private final MetricsFactory metricsFactory;
//number of times to wait for the index to refresh properly.
private static final int MAX_WAITS = 10;
//number of milliseconds to try again before sleeping
private static final int WAIT_TIME = 250;
private static final String VERIFY_TYPE = "verification";
private static final ImmutableMap<String, Object> DEFAULT_PAYLOAD =
ImmutableMap.<String, Object>builder().put( "field", "test" ).put(IndexingUtils.ENTITYID_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build();
private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
private EsIndexCache aliasCache;
private Timer removeAliasTimer;
private Timer mappingTimer;
private Timer refreshTimer;
private Timer cursorTimer;
private Timer getVersionsTimer;
private Timer allVersionsTimer;
private Timer deletePreviousTimer;
private final MapManager mapManager;
// private final Timer indexTimer;
@Inject
public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
final EsIndexCache indexCache, final MetricsFactory metricsFactory,
final MapManagerFactory mapManagerFactory, final IndexFig indexFig ) {
this.indexBatchBufferProducer = indexBatchBufferProducer;
this.indexFig = indexFig;
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
this.config = config;
this.cursorTimeout = config.getQueryCursorTimeout();
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
this.alias = indexIdentifier.getAlias();
this.failureMonitor = new FailureMonitorImpl( config, provider );
this.aliasCache = indexCache;
this.metricsFactory = metricsFactory;
this.addTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.add.index.timer" );
this.removeAliasTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.remove.index.alias.timer" );
this.addReadAliasTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.add.read.alias.timer" );
this.addWriteAliasTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.add.write.alias.timer" );
this.mappingTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.create.mapping.timer" );
this.refreshTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.refresh.timer" );
this.searchTimer =metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.search.timer" );
this.cursorTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.search.cursor.timer" );
this.getVersionsTimer =metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.get.versions.timer" );
this.allVersionsTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer" );
this.deletePreviousTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer" );
final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
mapManager = mapManagerFactory.createMapManager( mapScope );
}
@Override
public void initializeIndex() {
final int numberOfShards = config.getNumberOfShards();
final int numberOfReplicas = config.getNumberOfReplicas();
addIndex(null, numberOfShards, numberOfReplicas,config.getWriteConsistencyLevel());
}
@Override
public void addIndex(final String indexSuffix,final int numberOfShards, final int numberOfReplicas, final String writeConsistency) {
String normalizedSuffix = StringUtils.isNotEmpty(indexSuffix) ? indexSuffix : null;
try {
//get index name with suffix attached
String indexName = indexIdentifier.getIndex(normalizedSuffix);
//Create index
try {
final AdminClient admin = esProvider.getClient().admin();
Settings settings = ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
.put("action.write_consistency", writeConsistency )
.build();
//Added For Graphite Metrics
Timer.Context timeNewIndexCreation = addTimer.time();
final CreateIndexResponse cir = admin.indices().prepareCreate(indexName)
.setSettings(settings)
.execute()
.actionGet();
timeNewIndexCreation.stop();
//create the mappings
createMappings( indexName );
//ONLY add the alias if we create the index, otherwise we're going to overwrite production settings
/**
* DO NOT MOVE THIS LINE OF CODE UNLESS YOU REALLY KNOW WHAT YOU'RE DOING!!!!
*/
//We do NOT want to create an alias if the index already exists, we'll overwrite the indexes that
//may have been set via other administrative endpoint
addAlias(normalizedSuffix);
testNewIndex();
logger.info("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
} catch (IndexAlreadyExistsException e) {
logger.info("Index Name [{}] already exists", indexName);
}
} catch (IndexAlreadyExistsException expected) {
// this is expected to happen if index already exists, it's a no-op and swallow
} catch (IOException e) {
throw new RuntimeException("Unable to initialize index", e);
}
}
@Override
public void addAlias(final String indexSuffix) {
try {
Boolean isAck;
String indexName = indexIdentifier.getIndex(indexSuffix);
final AdminClient adminClient = esProvider.getClient().admin();
String[] indexNames = getIndexes(AliasType.Write);
for ( String currentIndex : indexNames ) {
final Timer.Context timeRemoveAlias = removeAliasTimer.time();
try {
//Added For Graphite Metrics
isAck = adminClient.indices().prepareAliases().removeAlias( currentIndex, alias.getWriteAlias() )
.execute().actionGet().isAcknowledged();
logger.info( "Removed Index Name [{}] from Alias=[{}] ACK=[{}]", currentIndex, alias, isAck );
}
catch ( AliasesMissingException aie ) {
logger.info( "Alias does not exist Index Name [{}] from Alias=[{}] ACK=[{}]", currentIndex, alias,
aie.getMessage() );
continue;
}
catch ( InvalidAliasNameException iane ) {
logger.info( "Alias does not exist Index Name [{}] from Alias=[{}] ACK=[{}]", currentIndex, alias,
iane.getMessage() );
continue;
}
finally {
timeRemoveAlias.stop();
}
}
//Added For Graphite Metrics
Timer.Context timeAddReadAlias = addReadAliasTimer.time();
// add read alias
isAck = adminClient.indices().prepareAliases().addAlias(
indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
timeAddReadAlias.stop();
logger.info("Created new read Alias Name [{}] ACK=[{}]", alias.getReadAlias(), isAck);
//Added For Graphite Metrics
Timer.Context timeAddWriteAlias = addWriteAliasTimer.time();
//add write alias
isAck = adminClient.indices().prepareAliases().addAlias(
indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
timeAddWriteAlias.stop();
logger.info("Created new write Alias Name [{}] ACK=[{}]", alias.getWriteAlias(), isAck);
aliasCache.invalidate(alias);
} catch (Exception e) {
logger.warn("Failed to create alias ", e);
}
}
@Override
public String[] getIndexes(final AliasType aliasType) {
return aliasCache.getIndexes(alias, aliasType);
}
/**
* Tests writing a document to a new index to ensure it's working correctly. See this post:
* http://s.apache.org/index-missing-exception
*/
private void testNewIndex() {
// create the document, this ensures the index is ready
// Immediately create a document and remove it to ensure the entire cluster is ready
// to receive documents. Occasionally we see errors.
// See this post: http://s.apache.org/index-missing-exception
logger.debug( "Testing new index name: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
final String tempId = UUIDGenerator.newTimeUUID().toString();
esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId )
.setSource(DEFAULT_PAYLOAD).get();
logger.info( "Successfully created new document with docId {} "
+ "in index read {} write {} and type {}",
tempId, alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE );
// delete all types, this way if we miss one it will get cleaned up
esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias())
.setTypes(VERIFY_TYPE)
.setQuery(MATCH_ALL_QUERY_BUILDER).get();
logger.info( "Successfully deleted all documents in index {} read {} write {} and type {}",
alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE );
return true;
}
};
doInRetry( retryOperation );
}
/**
* Setup ElasticSearch type mappings as a template that applies to all new indexes.
* Applies to all indexes that* start with our prefix.
*/
private void createMappings(final String indexName) throws IOException {
XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
XContentFactory.jsonBuilder(), DEFAULT_TYPE );
//Added For Graphite Metrics
Timer.Context timePutIndex = mappingTimer.time();
PutMappingResponse pitr = esProvider.getClient().admin().indices().preparePutMapping( indexName ).setType(
DEFAULT_TYPE ).setSource( xcb ).execute().actionGet();
timePutIndex.stop();
if ( !pitr.isAcknowledged() ) {
throw new IndexException( "Unable to create default mappings" );
}
}
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
return batch;
}
@Override
public CandidateResults search( final IndexScope indexScope, final SearchTypes searchTypes,
final Query query ) {
final String context = IndexingUtils.createContextName(indexScope);
final String[] entityTypes = searchTypes.getTypeNames();
QueryBuilder qb = query.createQueryBuilder(context);
SearchResponse searchResponse;
if ( query.getCursor() == null ) {
SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() )
.setTypes(entityTypes)
.setScroll(cursorTimeout + "m")
.setQuery(qb);
final FilterBuilder fb = query.createFilterBuilder();
//we have post filters, apply them
if ( fb != null ) {
logger.debug( " Filter: {} ", fb.toString() );
srb = srb.setPostFilter( fb );
}
srb = srb.setFrom( 0 ).setSize( query.getLimit() );
for ( Query.SortPredicate sp : query.getSortPredicates() ) {
final SortOrder order;
if ( sp.getDirection().equals( Query.SortDirection.ASCENDING ) ) {
order = SortOrder.ASC;
}
else {
order = SortOrder.DESC;
}
// we do not know the type of the "order by" property and so we do not know what
// type prefix to use. So, here we add an order by clause for every possible type
// that you can order by: string, number and boolean and we ask ElasticSearch
// to ignore any fields that are not present.
final String stringFieldName = STRING_PREFIX + sp.getPropertyName();
final FieldSortBuilder stringSort = SortBuilders.fieldSort( stringFieldName )
.order( order ).ignoreUnmapped( true );
srb.addSort( stringSort );
logger.debug( " Sort: {} order by {}", stringFieldName, order.toString() );
final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName();
final FieldSortBuilder numberSort = SortBuilders.fieldSort( numberFieldName )
.order( order ).ignoreUnmapped( true );
srb.addSort( numberSort );
logger.debug( " Sort: {} order by {}", numberFieldName, order.toString() );
final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName();
final FieldSortBuilder booleanSort = SortBuilders.fieldSort( booleanFieldName )
.order( order ).ignoreUnmapped( true );
srb.addSort( booleanSort );
logger.debug( " Sort: {} order by {}", booleanFieldName, order.toString() );
}
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching index (read alias): {}\n scope: {} \n type: {}\n query: {} ",
this.alias.getReadAlias(), context, entityTypes, srb );
}
try {
//Added For Graphite Metrics
Timer.Context timeSearch = searchTimer.time();
searchResponse = srb.execute().actionGet();
timeSearch.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with Elasticsearch", t );
failureMonitor.fail( "Unable to execute batch", t );
throw t;
}
failureMonitor.success();
}
else {
String userCursorString = query.getCursor();
if ( userCursorString.startsWith( "\"" ) ) {
userCursorString = userCursorString.substring( 1 );
}
if ( userCursorString.endsWith( "\"" ) ) {
userCursorString = userCursorString.substring( 0, userCursorString.length() - 1 );
}
//now get the cursor from the map and validate
final String esScrollCursor = mapManager.getString( userCursorString );
Preconditions.checkArgument(esScrollCursor != null, "Could not find a cursor for the value '{}' ", esScrollCursor);
logger.debug( "Executing query with cursor: {} ", esScrollCursor );
SearchScrollRequestBuilder ssrb = esProvider.getClient()
.prepareSearchScroll(esScrollCursor).setScroll( cursorTimeout + "m" );
try {
//Added For Graphite Metrics
Timer.Context timeSearchCursor = cursorTimer.time();
searchResponse = ssrb.execute().actionGet();
timeSearchCursor.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with elasticsearch", t );
failureMonitor.fail( "Unable to execute batch", t );
throw t;
}
failureMonitor.success();
}
return parseResults(searchResponse, query);
}
private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) {
final SearchHits searchHits = searchResponse.getHits();
final SearchHit[] hits = searchHits.getHits();
final int length = hits.length;
logger.debug(" Hit count: {} Total hits: {}", length, searchHits.getTotalHits());
List<CandidateResult> candidates = new ArrayList<>( length );
for ( SearchHit hit : hits ) {
String[] idparts = hit.getId().split( SPLITTER );
String id = idparts[0];
String type = idparts[1];
String version = idparts[2];
Id entityId = new SimpleId( UUID.fromString( id ), type );
candidates.add( new CandidateResult( entityId, UUID.fromString( version ) ) );
}
CandidateResults candidateResults = new CandidateResults( query, candidates );
if ( candidates.size() >= query.getLimit() ) {
//USERGRID-461 our cursor is getting too large, map it to a new time UUID
final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
final String esScrollCursor = searchResponse.getScrollId();
//now set this into our map module
final int minutes = indexFig.getQueryCursorTimeout();
//just truncate it, we'll never hit a long value anyway
mapManager.putString( userCursorString, esScrollCursor, ( int ) TimeUnit.MINUTES.toSeconds( minutes ) );
candidateResults.setCursor( userCursorString );
logger.debug(" User cursor = {}, Cursor = {} ", userCursorString, esScrollCursor);
}
return candidateResults;
}
public void refresh() {
BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage());
future.get();
//loop through all batches and retrieve promises and call get
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
try {
String[] indexes = ArrayUtils.addAll(
getIndexes(AliasType.Read),
getIndexes(AliasType.Write)
);
if ( indexes.length == 0 ) {
logger.debug( "Not refreshing indexes, none found for app {}",
applicationScope.getApplication().getUuid() );
return true;
}
//Added For Graphite Metrics
Timer.Context timeRefreshIndex = refreshTimer.time();
esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
timeRefreshIndex.stop();
logger.debug("Refreshed indexes: {}", StringUtils.join(indexes, ", "));
return true;
}
catch ( IndexMissingException e ) {
logger.error( "Unable to refresh index. Waiting before sleeping.", e );
throw e;
}
}
};
doInRetry( retryOperation );
}
@Override
public int getPendingTasks() {
final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin()
.cluster().pendingClusterTasks(new PendingClusterTasksRequest()).actionGet();
return tasksResponse.pendingTasks().size();
}
@Override
public CandidateResults getEntityVersions( final IndexScope scope, final Id id ) {
//since we don't have paging inputs, there's no point in executing a query for paging.
final String context = IndexingUtils.createContextName(scope);
final SearchTypes searchTypes = SearchTypes.fromTypes(id.getType());
final QueryBuilder queryBuilder =
QueryBuilders.termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context );
final SearchRequestBuilder srb = esProvider.getClient().prepareSearch( alias.getReadAlias() )
.setTypes(searchTypes.getTypeNames())
.setScroll(cursorTimeout + "m")
.setQuery(queryBuilder);
final SearchResponse searchResponse;
try {
//Added For Graphite Metrics
Timer.Context timeEntityIndex = getVersionsTimer.time();
searchResponse = srb.execute().actionGet();
timeEntityIndex.stop();
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with elasticsearch" );
failureMonitor.fail( "Unable to execute batch", t );
throw t;
}
failureMonitor.success();
return parseResults(searchResponse, new Query());
}
@Override
public ListenableActionFuture deleteAllVersionsOfEntity(final Id entityId ) {
String idString = IndexingUtils.idString(entityId).toLowerCase();
final TermQueryBuilder tqb = QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString);
//Added For Graphite Metrics
final Timer.Context timeDeleteAllVersions =allVersionsTimer.time();
final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
.prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
response.addListener( new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse( DeleteByQueryResponse response) {
timeDeleteAllVersions.stop();
logger
.debug( "Deleted entity {}:{} from all index scopes with response status = {}", entityId.getType(),
entityId.getUuid(), response.status().toString() );
checkDeleteByQueryResponse(tqb, response);
}
@Override
public void onFailure( Throwable e ) {
logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
entityId.getUuid(), e);
}
});
return response;
}
@Override
public ListenableActionFuture deletePreviousVersions( final Id entityId, final UUID version ) {
String idString = IndexingUtils.idString( entityId ).toLowerCase();
final FilteredQueryBuilder fqb = QueryBuilders.filteredQuery(
QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString),
FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
);
//Added For Graphite Metrics
//Checks the time from the execute to the response below
final Timer.Context timeDeletePreviousVersions = deletePreviousTimer.time();
final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
.prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
//Added For Graphite Metrics
response.addListener( new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse( DeleteByQueryResponse response ) {
timeDeletePreviousVersions.stop();
//error message needs to be retooled so that it describes the entity more throughly
logger
.debug( "Deleted entity {}:{} with version {} from all " + "index scopes with response status = {}",
entityId.getType(), entityId.getUuid(), version, response.status().toString() );
checkDeleteByQueryResponse( fqb, response );
}
@Override
public void onFailure( Throwable e ) {
logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
entityId.getUuid(), e );
}
} );
return response;
}
/**
* Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
*/
private void checkDeleteByQueryResponse(
final QueryBuilder query, final DeleteByQueryResponse response ) {
for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
for ( ShardOperationFailedException failedException : failures ) {
logger.error( String.format("Unable to delete by query %s. "
+ "Failed with code %d and reason %s on shard %s in index %s",
query.toString(),
failedException.status().getStatus(),
failedException.reason(),
failedException.shardId(),
failedException.index() )
);
}
}
}
/**
* Completely delete an index.
*/
public void deleteIndex() {
AdminClient adminClient = esProvider.getClient().admin();
DeleteIndexResponse response = adminClient.indices()
.prepareDelete( indexIdentifier.getIndex(null) ).get();
if ( response.isAcknowledged() ) {
logger.info( "Deleted index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
//invlaidate the alias
aliasCache.invalidate(alias);
}
else {
logger.info( "Failed to delete index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
}
}
/**
* Do the retry operation
*/
private void doInRetry( final RetryOperation operation ) {
for ( int i = 0; i < MAX_WAITS; i++ ) {
try {
if ( operation.doOp() ) {
return;
}
}
catch ( Exception e ) {
logger.error( "Unable to execute operation, retrying", e );
}
try {
Thread.sleep( WAIT_TIME );
}
catch ( InterruptedException e ) {
//swallow it
}
}
}
/**
* Check health of cluster.
*/
@Override
public Health getClusterHealth() {
try {
ClusterHealthResponse chr = esProvider.getClient().admin()
.cluster().health(new ClusterHealthRequest()).get();
return Health.valueOf( chr.getStatus().name() );
}
catch ( Exception ex ) {
logger.error( "Error connecting to ElasticSearch", ex );
}
// this is bad, red alert!
return Health.RED;
}
/**
* Check health of this specific index.
*/
@Override
public Health getIndexHealth() {
try {
ClusterHealthResponse chr = esProvider.getClient().admin().cluster().health(
new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(null)})).get();
return Health.valueOf( chr.getStatus().name() );
}
catch ( Exception ex ) {
logger.error( "Error connecting to ElasticSearch", ex );
}
// this is bad, red alert!
return Health.RED;
}
/**
* Interface for operations.
*/
private static interface RetryOperation {
/**
* Return true if done, false if there should be a retry.
*/
public boolean doOp();
}
}