blob: 79a80c03b9075a8c0d119fa343545a67c32ced68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.corepersistence.asyncevents;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.index.impl.IndexingUtils;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.commons.lang.StringUtils.indexOf;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
/**
* TODO, this whole class is becoming a nightmare.
* We need to remove all consume from this class and refactor it into the following manner.
*
* 1. Produce. Keep the code in the handle as is
* 2. Consume: Move the code into a refactored system
* 2.1 A central dispatcher
* 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own
* impl that will then emit a stream of batch operations to perform
* 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler
* 2.4 The batch handler will roll up the operations into a batch size, and then queue them
* 2.5 The receive batch handler will execute the batch operations
*
* TODO determine how we error handle?
*
*/
@Singleton
public class AsyncEventServiceImpl implements AsyncEventService {
private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class);
// SQS maximum receive messages is 10
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_DELETE = "delete";
public static final String DEAD_LETTER_SUFFIX = "_dead";
private final LegacyQueueManager indexQueue;
private final LegacyQueueManager utilityQueue;
private final LegacyQueueManager deleteQueue;
private final LegacyQueueManager indexQueueDead;
private final LegacyQueueManager utilityQueueDead;
private final LegacyQueueManager deleteQueueDead;
private final IndexProcessorFig indexProcessorFig;
private final LegacyQueueFig queueFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler;
private final Timer readTimer;
private final Timer writeTimer;
private final Timer ackTimer;
/**
* This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
*/
private final Object mutex = new Object();
private final Counter indexErrorCounter;
private final AtomicLong counter = new AtomicLong();
private final AtomicLong counterUtility = new AtomicLong();
private final AtomicLong counterDelete = new AtomicLong();
private final AtomicLong counterIndexDead = new AtomicLong();
private final AtomicLong counterUtilityDead = new AtomicLong();
private final AtomicLong counterDeleteDead = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
private final MapManager esMapPersistence;
//the actively running subscription
private List<Subscription> subscriptions = new ArrayList<>();
@Inject
public AsyncEventServiceImpl(final LegacyQueueManagerFactory queueManagerFactory,
final IndexProcessorFig indexProcessorFig,
final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
final LegacyQueueFig queueFig,
@EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder;
final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
this.rxTaskScheduler = rxTaskScheduler;
LegacyQueueScope indexQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
LegacyQueueScope utilityQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
LegacyQueueScope deleteQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
LegacyQueueScope indexQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
LegacyQueueScope utilityQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
LegacyQueueScope deleteQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read");
this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack");
this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error");
this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle");
//wire up the gauge of inflight message
metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge<Long>() {
@Override
public Long getValue() {
return inFlight.longValue();
}
});
start();
}
private String getQueueName(AsyncEventQueueType queueType) {
switch (queueType) {
case REGULAR:
return QUEUE_NAME;
case UTILITY:
return QUEUE_NAME_UTILITY;
case DELETE:
return QUEUE_NAME_DELETE;
default:
throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
}
}
private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
return getQueue(queueType, false);
}
private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
switch (queueType) {
case REGULAR:
return isDeadQueue ? indexQueueDead : indexQueue;
case UTILITY:
return isDeadQueue ? utilityQueueDead : utilityQueue;
case DELETE:
return isDeadQueue ? deleteQueueDead : deleteQueue;
default:
throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
}
}
private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
switch (queueType) {
case REGULAR:
return isDeadQueue ? counterIndexDead : counter;
case UTILITY:
return isDeadQueue ? counterUtilityDead : counterUtility;
case DELETE:
return isDeadQueue ? counterDeleteDead : counterDelete;
default:
throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
}
}
/**
* Offer the EntityIdScope to SQS
*/
private void offer(final Serializable operation) {
offer(operation, AsyncEventQueueType.REGULAR);
}
private void offer(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
getQueue(queueType).sendMessageToLocalRegion(operation);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
timer.stop();
}
}
private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
getQueue(queueType).sendMessageToAllRegions(operation);
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
}
finally {
timer.stop();
}
}
private void offerBatch(final List operations, AsyncEventQueueType queueType){
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
getQueue(queueType).sendMessages(operations);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
timer.stop();
}
}
/**
* Take message
*/
private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
final Timer.Context timer = this.readTimer.time();
try {
return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
}
finally {
//stop our timer
timer.stop();
}
}
private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
return take(queueType, false);
}
/**
* Ack message
*/
public void ack(final List<LegacyQueueMessage> messages) {
final Timer.Context timer = this.ackTimer.time();
try {
for ( LegacyQueueMessage legacyQueueMessage : messages ) {
try {
indexQueue.commitMessage( legacyQueueMessage );
inFlight.decrementAndGet();
} catch ( Throwable t ) {
logger.error("Continuing after error acking message: " + legacyQueueMessage.getMessageId() );
}
}
} catch (Exception e) {
throw new RuntimeException( "Unable to ack messages", e );
} finally {
timer.stop();
}
}
public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
// different functionality
ack(messages);
}
try {
getQueue(queueType, isDeadQueue).commitMessages( messages );
}
catch (Exception e) {
throw new RuntimeException("Unable to ack messages", e);
}
}
/**
* calls the event handlers and returns a result with information on whether
* it needs to be ack'd and whether it needs to be indexed
* @param messages
* @return
*/
private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
if (logger.isDebugEnabled()) {
logger.debug("callEventHandlers with {} message(s)", messages.size());
}
Stream<IndexEventResult> indexEventResults = messages.stream().map(message ->
{
if(logger.isDebugEnabled()){
logger.debug("Queue message with ID {} has been received {} time(s)",
message.getMessageId(),
message.getReceiveCount() );
}
AsyncEvent event = null;
try {
event = (AsyncEvent) message.getBody();
} catch (ClassCastException cce) {
logger.error("Failed to deserialize message body", cce);
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
if (event == null) {
logger.error("AsyncEvent type or event is null!");
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
final AsyncEvent thisEvent = event;
if (logger.isDebugEnabled()) {
logger.debug("Processing event with type {}", event.getClass().getSimpleName());
}
try {
IndexOperationMessage single = new IndexOperationMessage();
// normal indexing event for an entity
if ( event instanceof EntityIndexEvent ){
single = handleEntityIndexUpdate( message );
}
// normal indexing event for an edge
else if ( event instanceof EdgeIndexEvent ){
single = handleEdgeIndex( message );
}
// deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EdgeDeleteEvent ) {
single = handleEdgeDelete( message );
}
// deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EntityDeleteEvent ) {
single = handleEntityDelete( message );
}
// initialization has special logic, therefore a special event type and no index operation message
else if ( event instanceof InitializeApplicationIndexEvent ) {
handleInitializeApplicationIndex(event, message);
}
// this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) {
handleIndexOperation((ElasticsearchIndexEvent) event);
} else if (event instanceof DeIndexOldVersionsEvent) {
single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
} else {
throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
}
if( !(event instanceof ElasticsearchIndexEvent)
&& !(event instanceof InitializeApplicationIndexEvent)
&& single.isEmpty() ){
logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
}
// if no exception happens and the QueueMessage is returned in these results, it will get ack'd
return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
} catch (IndexDocNotFoundException e){
// this exception is throw when we wait before trying quorum read on map persistence.
// return empty event result so the event's message doesn't get ack'd
if(logger.isDebugEnabled()){
logger.debug(e.getMessage());
}
return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
} catch (Exception e) {
// NPEs don't have a detail message, so add something for our log statement to identify better
final String errorMessage;
if( e instanceof NullPointerException ) {
errorMessage = "NullPointerException";
}else{
errorMessage = e.getMessage();
}
// if the event fails to process, log and return empty message result so it doesn't get ack'd
logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() );
return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
}
});
return indexEventResults.collect(Collectors.toList());
}
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope);
logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
}
@Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity, long updatedAfter) {
logger.trace("Offering EntityIndexEvent for {}:{}",
entity.getId().getUuid(), entity.getId().getType());
offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
}
private IndexOperationMessage handleEntityIndexUpdate(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
final AsyncEvent event = ( AsyncEvent ) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
Preconditions.checkArgument(event instanceof EntityIndexEvent,
String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
//process the entity immediately
//only process the same version, otherwise ignore
final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
final long updatedAfter = entityIndexEvent.getUpdatedAfter();
final EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, entityId, updatedAfter);
// default this observable's return to empty index operation message if nothing is emitted
return eventBuilder.buildEntityIndex( entityIndexOperation )
.toBlocking().lastOrDefault(new IndexOperationMessage());
}
@Override
public void queueNewEdge(final ApplicationScope applicationScope,
final Entity entity,
final Edge newEdge) {
logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
}
private IndexOperationMessage handleEdgeIndex(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
Preconditions.checkArgument(event instanceof EdgeIndexEvent,
String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
final EntityCollectionManager ecm =
entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
// default this observable's return to empty index operation message if nothing is emitted
return ecm.load( edgeIndexEvent.getEntityId() )
.flatMap( loadedEntity ->
eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()))
.toBlocking().lastOrDefault(new IndexOperationMessage());
}
@Override
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
}
private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) {
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
Preconditions.checkArgument(event instanceof EdgeDeleteEvent,
String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
final Edge edge = edgeDeleteEvent.getEdge();
if (logger.isDebugEnabled()) {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}
// default this observable's return to empty index operation message if nothing is emitted
return eventBuilder.buildDeleteEdge(applicationScope, edge);
}
/**
* Queue up an indexOperationMessage for multi region execution
* @param indexOperationMessage
* @param queueType
*/
public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
// don't try to produce something with nothing
if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
return;
}
final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
final UUID newMessageId = UUIDGenerator.newTimeUUID();
final int expirationTimeInSeconds =
( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
//write to the map in ES
esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
//now queue up the index message
final ElasticsearchIndexEvent elasticsearchIndexEvent =
new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
//send to the topic so all regions index the batch
logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
offerTopic( elasticsearchIndexEvent, queueType );
}
private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
throws IndexDocNotFoundException {
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
Preconditions.checkNotNull( messageId, "messageId must not be null" );
final String message = esMapPersistence.getString( messageId.toString() );
final IndexOperationMessage indexOperationMessage;
if(message == null) {
// provide some time back pressure before performing a quorum read
if ( queueFig.getQuorumFallback() && System.currentTimeMillis() >
elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
if(logger.isDebugEnabled()){
logger.debug("ES batch with id {} not found, reading with strong consistency", messageId);
}
final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
if (highConsistency == null) {
throw new RuntimeException("ES batch with id " +
messageId+" not found when reading with strong consistency");
}
indexOperationMessage =
ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
} else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) {
// if esMapPersistence message hasn't been received yet, log and return (will be acked)
logger.error("ES map message never received, removing message from queue. indexBatchId={}", messageId);
return;
} else {
logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}", messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime());
throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
}
} else {
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
// don't let this continue if there's nothing to index
if (indexOperationMessage == null || indexOperationMessage.isEmpty()){
throw new RuntimeException(
"IndexOperationMessage cannot be null or empty after retrieving from map persistence");
}
// always do a check to ensure the indexes are initialized for the index requests
initializeEntityIndexes(indexOperationMessage);
// send it to to be indexed
indexProducer.put(indexOperationMessage).toBlocking().last();
}
@Override
public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion) {
// queue the de-index of old versions to the topic so cleanup happens in all regions
logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
}
public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){
final ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion();
// default this observable's return to empty index operation message if nothing is emitted
return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion )
.toBlocking().lastOrDefault(new IndexOperationMessage());
}
/**
* this method will call initialize for each message, since we are caching the entity indexes,
* we don't worry about aggregating by app id
* @param indexOperationMessage
*/
private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
// create a set so we can have a unique list of appIds for which we call createEntityIndex
Set<UUID> appIds = new HashSet<>();
// loop through all indexRequests and add the appIds to the set
indexOperationMessage.getIndexRequests().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
appIds.add(appId);
});
// loop through all deindexRequests and add the appIds to the set
indexOperationMessage.getDeIndexRequests().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
appIds.add(appId);
});
// for each of the appIds in the unique set, call create entity index to ensure the aliases are created
appIds.forEach(appId -> {
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
}
);
}
@Override
public long getQueueDepth() {
return indexQueue.getQueueDepth();
}
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
AsyncEventQueueType.DELETE );
}
private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
Preconditions.checkArgument( event instanceof EntityDeleteEvent,
String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
}
return eventBuilder.buildEntityDelete( applicationScope, entityId );
}
private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent,
String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s",
event.getClass()));
final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
( InitializeApplicationIndexEvent ) event;
final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
}
/**
* Loop through and start the workers
*/
public void start() {
final int indexCount = indexProcessorFig.getWorkerCount();
final int utilityCount = indexProcessorFig.getWorkerCountUtility();
final int deleteCount = indexProcessorFig.getWorkerCountDelete();
final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={} delete={} deleteDLQ={}",
indexCount, indexDeadCount, utilityCount, utilityDeadCount, deleteCount, deleteDeadCount);
for (int i = 0; i < indexCount; i++) {
startWorker(AsyncEventQueueType.REGULAR);
}
for (int i = 0; i < utilityCount; i++) {
startWorker(AsyncEventQueueType.UTILITY);
}
if( indexQueue instanceof SNSQueueManagerImpl) {
logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
for (int i = 0; i < indexDeadCount; i++) {
startDeadQueueWorker(AsyncEventQueueType.REGULAR);
}
for (int i = 0; i < utilityDeadCount; i++) {
startDeadQueueWorker(AsyncEventQueueType.UTILITY);
}
for (int i = 0; i < deleteDeadCount; i++) {
startDeadQueueWorker(AsyncEventQueueType.DELETE);
}
}else{
logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue workers.");
}
}
/**
* Stop the workers
*/
public void stop() {
synchronized (mutex) {
//stop consuming
for (final Subscription subscription : subscriptions) {
subscription.unsubscribe();
}
}
}
private void startWorker(final AsyncEventQueueType queueType) {
synchronized (mutex) {
String type = getQueueName(queueType);
Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
long threadNum = getCounter(queueType, false).incrementAndGet();
Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
drainList = take(queueType);
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
//take since we're in flight
inFlight.addAndGet( drainList.size() );
} catch ( Throwable t ) {
final long sleepTime = indexProcessorFig.getFailureRetryTime();
// there might be an error here during tests, just clean the cache
indexQueue.clearQueueNameCache();
if ( t instanceof InvalidQueryException ) {
// don't fill up log with exceptions when keyspace and column
// families are not ready during bootstrap/setup
logger.warn( "Failed to dequeue due to '{}'. Sleeping for {} ms",
t.getMessage(), sleepTime );
} else {
logger.error( "Failed to dequeue. Sleeping for {} ms", sleepTime, t);
}
if ( drainList != null ) {
inFlight.addAndGet( -1 * drainList.size() );
}
try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
indexErrorCounter.inc();
}
}
while ( true );
}
} ) //this won't block our read loop, just reads and proceeds
.flatMap( sqsMessages -> {
//do this on a different schedule, and introduce concurrency
// with flatmap for faster processing
return Observable.just( sqsMessages )
.map( messages -> {
if ( messages == null || messages.size() == 0 ) {
// no messages came from the queue, move on
return null;
}
try {
// process the messages
List<IndexEventResult> indexEventResults =
callEventHandlers( messages );
// submit the processed messages to index producer
List<LegacyQueueMessage> messagesToAck =
submitToIndex( indexEventResults, queueType );
if ( messagesToAck.size() < messages.size() ) {
logger.warn(
"Missing {} message(s) from index processing",
messages.size() - messagesToAck.size() );
}
// ack each message if making it to this point
if( messagesToAck.size() > 0 ){
ack(messagesToAck, queueType, false);
}
return messagesToAck;
}
catch ( Exception e ) {
logger.error( "Failed to ack messages", e );
return null;
//do not rethrow so we can process all of them
}
} ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
//end flatMap
}, indexProcessorFig.getEventConcurrencyFactor() );
//start in the background
final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
subscriptions.add(subscription);
}
}
private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
String type = getQueueName(queueType);
synchronized (mutex) {
Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@Override
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
long threadNum = getCounter(queueType, true).incrementAndGet();
Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
drainList = take(queueType, true);
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
//take since we're in flight
inFlight.addAndGet( drainList.size() );
} catch ( Throwable t ) {
final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
// there might be an error here during tests, just clean the cache
indexQueueDead.clearQueueNameCache();
if ( t instanceof InvalidQueryException ) {
// don't fill up log with exceptions when keyspace and column
// families are not ready during bootstrap/setup
logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
t.getMessage(), sleepTime );
} else {
logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
}
if ( drainList != null ) {
inFlight.addAndGet( -1 * drainList.size() );
}
try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
}
}
while ( true );
}
} ) //this won't block our read loop, just reads and proceeds
.flatMap( sqsMessages -> {
//do this on a different schedule, and introduce concurrency
// with flatmap for faster processing
return Observable.just( sqsMessages )
.map( messages -> {
if ( messages == null || messages.size() == 0 ) {
// no messages came from the queue, move on
return null;
}
try {
// put the dead letter messages back in the appropriate queue
LegacyQueueManager returnQueue = getQueue(queueType, false);
List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
for (LegacyQueueMessage msg : successMessages) {
logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
}
int unsuccessfulMessagesSize = messages.size() - successMessages.size();
if (unsuccessfulMessagesSize > 0) {
// some messages couldn't be sent to originating queue, log
Set<String> successMessageIds = new HashSet<>();
for (LegacyQueueMessage msg : successMessages) {
String messageId = msg.getMessageId();
if (successMessageIds.contains(messageId)) {
logger.warn("Found duplicate messageId in returned messages: {}", messageId);
} else {
successMessageIds.add(messageId);
}
}
for (LegacyQueueMessage msg : messages) {
String messageId = msg.getMessageId();
if (!successMessageIds.contains(messageId)) {
logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
}
}
}
ack(successMessages, queueType, true);
return messages;
}
catch ( Exception e ) {
logger.error( "Failed to ack messages", e );
return null;
//do not rethrow so we can process all of them
}
} ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
//end flatMap
}, indexProcessorFig.getEventConcurrencyFactor() );
//start in the background
final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
subscriptions.add(subscription);
}
}
/**
* Submit results to index and return the queue messages to be ack'd
*
*/
private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
// if nothing came back then return empty list
if(indexEventResults==null){
return new ArrayList<>(0);
}
IndexOperationMessage combined = new IndexOperationMessage();
List<LegacyQueueMessage> queueMessages = indexEventResults.stream()
// filter out messages that are not present, they were not processed and put into the results
.filter( result -> result.getQueueMessage().isPresent() )
.map(indexEventResult -> {
//record the cycle time
messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
// ingest each index op into our combined, single index op for the index producer
if(indexEventResult.getIndexOperationMessage().isPresent()){
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
return indexEventResult.getQueueMessage().get();
})
// collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
queueIndexOperationMessage(combined, queueType);
return queueMessages;
}
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, id, updatedSince);
queueIndexOperationMessage(
eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR );
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
final List<EntityIndexEvent> batch = new ArrayList<>();
edges.forEach(e -> {
//change to id scope to avoid serialization issues
batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
});
logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
offerBatch( batch, queueType );
}
public class IndexEventResult{
private final Optional<IndexOperationMessage> indexOperationMessage;
private final Optional<LegacyQueueMessage> queueMessage;
private final long creationTime;
public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage,
Optional<LegacyQueueMessage> queueMessage, long creationTime){
this.queueMessage = queueMessage;
this.creationTime = creationTime;
this.indexOperationMessage = indexOperationMessage;
}
public Optional<IndexOperationMessage> getIndexOperationMessage() {
return indexOperationMessage;
}
public Optional<LegacyQueueMessage> getQueueMessage() {
return queueMessage;
}
public long getCreationTime() {
return creationTime;
}
}
public String getQueueManagerClass() {
return indexQueue.getClass().getSimpleName();
}
}