| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.usergrid.corepersistence.asyncevents; |
| |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import com.google.common.base.Optional; |
| import org.apache.usergrid.persistence.index.impl.IndexProducer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; |
| import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; |
| import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent; |
| import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; |
| import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; |
| import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; |
| import org.apache.usergrid.corepersistence.index.EntityIndexOperation; |
| import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; |
| import org.apache.usergrid.corepersistence.index.IndexProcessorFig; |
| import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; |
| import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManager; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; |
| import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; |
| import org.apache.usergrid.persistence.core.metrics.MetricsFactory; |
| import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.graph.Edge; |
| import org.apache.usergrid.persistence.index.EntityIndex; |
| import org.apache.usergrid.persistence.index.EntityIndexFactory; |
| import org.apache.usergrid.persistence.index.IndexLocationStrategy; |
| import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; |
| import org.apache.usergrid.persistence.model.entity.Entity; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.queue.QueueManager; |
| import org.apache.usergrid.persistence.queue.QueueManagerFactory; |
| import org.apache.usergrid.persistence.queue.QueueMessage; |
| import org.apache.usergrid.persistence.queue.QueueScope; |
| import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; |
| |
| import com.codahale.metrics.Counter; |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Histogram; |
| import com.codahale.metrics.Timer; |
| import com.google.common.base.Preconditions; |
| import com.google.inject.Inject; |
| import com.google.inject.Singleton; |
| |
| import rx.Observable; |
| import rx.Subscriber; |
| import rx.Subscription; |
| import rx.functions.Func1; |
| import rx.schedulers.Schedulers; |
| |
| |
| @Singleton |
| public class AmazonAsyncEventService implements AsyncEventService { |
| |
| |
| private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); |
| |
| // SQS maximum receive messages is 10 |
| private static final int MAX_TAKE = 10; |
| public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars |
| |
| private final QueueManager queue; |
| private final QueueScope queueScope; |
| private final IndexProcessorFig indexProcessorFig; |
| private final IndexProducer indexProducer; |
| private final EntityCollectionManagerFactory entityCollectionManagerFactory; |
| private final IndexLocationStrategyFactory indexLocationStrategyFactory; |
| private final EntityIndexFactory entityIndexFactory; |
| private final EventBuilder eventBuilder; |
| private final RxTaskScheduler rxTaskScheduler; |
| |
| private final Timer readTimer; |
| private final Timer writeTimer; |
| private final Timer ackTimer; |
| |
| /** |
| * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions |
| */ |
| private final Object mutex = new Object(); |
| |
| private final Counter indexErrorCounter; |
| private final AtomicLong counter = new AtomicLong(); |
| private final AtomicLong inFlight = new AtomicLong(); |
| private final Histogram messageCycle; |
| |
| //the actively running subscription |
| private List<Subscription> subscriptions = new ArrayList<>(); |
| |
| |
| @Inject |
| public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, |
| final IndexProcessorFig indexProcessorFig, |
| final IndexProducer indexProducer, |
| final MetricsFactory metricsFactory, |
| final EntityCollectionManagerFactory entityCollectionManagerFactory, |
| final IndexLocationStrategyFactory indexLocationStrategyFactory, |
| final EntityIndexFactory entityIndexFactory, |
| final EventBuilder eventBuilder, |
| final RxTaskScheduler rxTaskScheduler ) { |
| this.indexProducer = indexProducer; |
| |
| this.entityCollectionManagerFactory = entityCollectionManagerFactory; |
| this.indexLocationStrategyFactory = indexLocationStrategyFactory; |
| this.entityIndexFactory = entityIndexFactory; |
| this.eventBuilder = eventBuilder; |
| this.rxTaskScheduler = rxTaskScheduler; |
| |
| this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); |
| this.queue = queueManagerFactory.getQueueManager(queueScope); |
| this.indexProcessorFig = indexProcessorFig; |
| |
| this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); |
| this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read"); |
| this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack"); |
| this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error"); |
| this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle"); |
| |
| |
| //wire up the gauge of inflight message |
| metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() { |
| @Override |
| public Long getValue() { |
| return inFlight.longValue(); |
| } |
| }); |
| |
| start(); |
| } |
| |
| |
| /** |
| * Offer the EntityIdScope to SQS |
| */ |
| private void offer(final Object operation) { |
| final Timer.Context timer = this.writeTimer.time(); |
| |
| try { |
| //signal to SQS |
| this.queue.sendMessage( operation ); |
| } catch (IOException e) { |
| throw new RuntimeException("Unable to queue message", e); |
| } finally { |
| timer.stop(); |
| } |
| } |
| |
| private void offerBatch(final List operations){ |
| final Timer.Context timer = this.writeTimer.time(); |
| |
| try { |
| //signal to SQS |
| this.queue.sendMessages( operations ); |
| } catch (IOException e) { |
| throw new RuntimeException("Unable to queue message", e); |
| } finally { |
| timer.stop(); |
| } |
| } |
| |
| |
| /** |
| * Take message from SQS |
| */ |
| private Observable<QueueMessage> take() { |
| |
| final Timer.Context timer = this.readTimer.time(); |
| |
| try { |
| return queue.getMessages(MAX_TAKE, |
| indexProcessorFig.getIndexQueueVisibilityTimeout(), |
| indexProcessorFig.getIndexQueueTimeout(), |
| AsyncEvent.class); |
| } |
| //stop our timer |
| finally { |
| timer.stop(); |
| } |
| } |
| |
| |
| /** |
| * Ack message in SQS |
| */ |
| public void ack(final QueueMessage message) { |
| |
| final Timer.Context timer = this.ackTimer.time(); |
| |
| try{ |
| queue.commitMessage(message); |
| |
| //decrement our in-flight counter |
| inFlight.decrementAndGet(); |
| |
| }catch(Exception e){ |
| throw new RuntimeException("Unable to ack messages", e); |
| }finally { |
| timer.stop(); |
| } |
| |
| |
| } |
| |
| /** |
| * Ack message in SQS |
| */ |
| public void ack(final List<QueueMessage> messages) { |
| |
| final Timer.Context timer = this.ackTimer.time(); |
| |
| try{ |
| queue.commitMessages(messages); |
| |
| //decrement our in-flight counter |
| inFlight.decrementAndGet(); |
| |
| }catch(Exception e){ |
| throw new RuntimeException("Unable to ack messages", e); |
| }finally { |
| timer.stop(); |
| } |
| |
| |
| } |
| |
| |
| private List<QueueMessage> handleMessages( final List<QueueMessage> messages ) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleMessages with {} message", messages.size()); |
| } |
| |
| final int bufferSize = messages.size(); |
| Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> { |
| AsyncEvent event = null; |
| try{ |
| event = (AsyncEvent) message.getBody(); |
| }catch (ClassCastException cce){ |
| logger.error("Failed to deserialize message body",cce); |
| } |
| |
| if (event == null) { |
| logger.error("AsyncEvent type or event is null!"); |
| return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),System.currentTimeMillis())); |
| } |
| |
| final AsyncEvent thisEvent = event; |
| if(logger.isDebugEnabled()) { |
| logger.debug("Processing {} event", event); |
| } |
| |
| try { |
| Observable<IndexOperationMessage> indexoperationObservable; |
| //merge each operation to a master observable; |
| if (event instanceof EdgeDeleteEvent) { |
| indexoperationObservable = handleEdgeDelete(message); |
| } else if (event instanceof EdgeIndexEvent) { |
| indexoperationObservable = handleEdgeIndex(message); |
| } else if (event instanceof EntityDeleteEvent) { |
| indexoperationObservable = handleEntityDelete(message); |
| } else if (event instanceof EntityIndexEvent) { |
| indexoperationObservable = handleEntityIndexUpdate(message); |
| } else if (event instanceof InitializeApplicationIndexEvent) { |
| //does not return observable |
| handleInitializeApplicationIndex(event,message); |
| indexoperationObservable = Observable.just(new IndexOperationMessage()); |
| } else { |
| throw new Exception("Unknown EventType");//TODO: print json instead |
| } |
| |
| //return type that can be indexed and ack'd later |
| return indexoperationObservable |
| .map(indexOperationMessage -> |
| new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage),thisEvent.getCreationTime()) |
| ); |
| } catch (Exception e) { |
| logger.error("Failed to index message: " + message.getMessageId(), e, message); |
| return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime())); |
| } |
| }); |
| |
| //filter for success, send to the index(optional), ack |
| return masterObservable |
| //take the max |
| .buffer(bufferSize) |
| //map them to index results and return them |
| .flatMap(indexEventResults -> { |
| IndexOperationMessage combined = new IndexOperationMessage(); |
| indexEventResults.stream().forEach( |
| indexEventResult -> { |
| if (indexEventResult.getIndexOperationMessage().isPresent()) { |
| combined.ingest(indexEventResult.getIndexOperationMessage().get()); |
| } |
| }); |
| |
| |
| //ack after successful completion of the operation. |
| return indexProducer.put(combined) |
| //change observable type |
| .flatMap(indexOperationMessage -> Observable.from(indexEventResults)) |
| //remove unsuccessful |
| .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent()) |
| //measure |
| .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime())) |
| //return the queue messages to ack |
| .map(result -> result.getQueueMessage().get()) |
| .toList(); |
| |
| }) |
| .doOnError(t -> logger.error("Failed to process queuemessages",t)) |
| .toBlocking().lastOrDefault(null); |
| } |
| |
| |
| @Override |
| public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { |
| IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( |
| applicationScope ); |
| offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy))); |
| } |
| |
| |
| @Override |
| public void queueEntityIndexUpdate(final ApplicationScope applicationScope, |
| final Entity entity) { |
| |
| offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0)); |
| } |
| |
| |
| public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) { |
| |
| Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); |
| |
| final AsyncEvent event = ( AsyncEvent ) message.getBody(); |
| |
| Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); |
| Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); |
| |
| final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; |
| |
| |
| //process the entity immediately |
| //only process the same version, otherwise ignore |
| final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope(); |
| final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); |
| final Id entityId = entityIdScope.getId(); |
| final long updatedAfter = entityIndexEvent.getUpdatedAfter(); |
| |
| final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); |
| |
| final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation ); |
| return observable; |
| } |
| |
| |
| @Override |
| public void queueNewEdge(final ApplicationScope applicationScope, |
| final Entity entity, |
| final Edge newEdge) { |
| |
| EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge); |
| |
| offer( operation ); |
| } |
| |
| public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) { |
| |
| Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" ); |
| |
| final AsyncEvent event = (AsyncEvent) message.getBody(); |
| |
| Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); |
| Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); |
| |
| final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; |
| |
| final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope(); |
| final Edge edge = edgeIndexEvent.getEdge(); |
| |
| |
| |
| final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); |
| |
| final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge( |
| applicationScope, entity, edge)); |
| return edgeIndexObservable; |
| } |
| |
| @Override |
| public void queueDeleteEdge(final ApplicationScope applicationScope, |
| final Edge edge) { |
| |
| offer( new EdgeDeleteEvent( applicationScope, edge ) ); |
| } |
| |
| public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) { |
| |
| Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); |
| |
| final AsyncEvent event = (AsyncEvent) message.getBody(); |
| |
| Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" ); |
| Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); |
| |
| |
| final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event; |
| |
| final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope(); |
| final Edge edge = edgeDeleteEvent.getEdge(); |
| |
| if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); |
| |
| final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge ); |
| return observable; |
| } |
| |
| |
| @Override |
| public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { |
| |
| offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); |
| } |
| |
| @Override |
| public long getQueueDepth() { |
| return queue.getQueueDepth(); |
| } |
| |
| public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) { |
| |
| Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); |
| |
| final AsyncEvent event = (AsyncEvent) message.getBody(); |
| Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" ); |
| Preconditions.checkArgument( event instanceof EntityDeleteEvent, |
| String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) ); |
| |
| |
| final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; |
| final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); |
| final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); |
| |
| if (logger.isDebugEnabled()) |
| logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); |
| |
| final EventBuilderImpl.EntityDeleteResults |
| entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); |
| |
| |
| final Observable<IndexOperationMessage> merged = entityDeleteResults |
| .getEntitiesCompacted() |
| .collect(() -> new ArrayList<>(), (list, item) -> list.add(item)) |
| .flatMap(collected -> entityDeleteResults.getIndexObservable()) ; |
| return merged; |
| } |
| |
| |
| public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { |
| Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); |
| Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); |
| |
| final InitializeApplicationIndexEvent initializeApplicationIndexEvent = |
| ( InitializeApplicationIndexEvent ) event; |
| |
| final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy(); |
| final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy ); |
| index.initialize(); |
| } |
| |
| /** |
| * Loop through and start the workers |
| */ |
| public void start() { |
| final int count = indexProcessorFig.getWorkerCount(); |
| |
| for (int i = 0; i < count; i++) { |
| startWorker(); |
| } |
| } |
| |
| |
| /** |
| * Stop the workers |
| */ |
| public void stop() { |
| synchronized (mutex) { |
| //stop consuming |
| |
| for (final Subscription subscription : subscriptions) { |
| subscription.unsubscribe(); |
| } |
| } |
| } |
| |
| |
| private void startWorker() { |
| synchronized (mutex) { |
| |
| Observable<List<QueueMessage>> consumer = |
| Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() { |
| @Override |
| public void call(final Subscriber<? super List<QueueMessage>> subscriber) { |
| |
| //name our thread so it's easy to see |
| Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet()); |
| |
| List<QueueMessage> drainList = null; |
| |
| do { |
| try { |
| drainList = take().toList().toBlocking().lastOrDefault(null); |
| //emit our list in it's entity to hand off to a worker pool |
| subscriber.onNext(drainList); |
| |
| //take since we're in flight |
| inFlight.addAndGet(drainList.size()); |
| } catch (Throwable t) { |
| final long sleepTime = indexProcessorFig.getFailureRetryTime(); |
| |
| logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t); |
| |
| if (drainList != null) { |
| inFlight.addAndGet(-1 * drainList.size()); |
| } |
| |
| |
| try { |
| Thread.sleep(sleepTime); |
| } catch (InterruptedException ie) { |
| //swallow |
| } |
| |
| indexErrorCounter.inc(); |
| } |
| } |
| while (true); |
| } |
| }) |
| //this won't block our read loop, just reads and proceeds |
| .map(messages -> |
| { |
| if (messages == null || messages.size() == 0) { |
| return null; |
| } |
| |
| try { |
| |
| List<QueueMessage> messagesToAck = handleMessages(messages); |
| |
| if (messagesToAck == null || messagesToAck.size() == 0) { |
| return messagesToAck; |
| } |
| //ack each message, but only if we didn't error. |
| ack(messagesToAck); |
| return messagesToAck; |
| } catch (Exception e) { |
| logger.error("failed to ack messages to sqs", messages.get(0).getMessageId(), e); |
| return null; |
| //do not rethrow so we can process all of them |
| } |
| }); |
| |
| //start in the background |
| |
| final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe(); |
| |
| subscriptions.add(subscription); |
| } |
| } |
| |
| public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { |
| //change to id scope to avoid serialization issues |
| offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince ) ); |
| } |
| |
| public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { |
| |
| List batch = new ArrayList<EdgeScope>(); |
| for ( EdgeScope e : edges){ |
| //change to id scope to avoid serialization issues |
| batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); |
| } |
| offerBatch( batch ); |
| } |
| |
| |
| public class IndexEventResult{ |
| private final Optional<QueueMessage> queueMessage; |
| private final Optional<IndexOperationMessage> indexOperationMessage; |
| private final long creationTime; |
| |
| |
| public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){ |
| |
| this.queueMessage = queueMessage; |
| this.indexOperationMessage = indexOperationMessage; |
| |
| this.creationTime = creationTime; |
| } |
| |
| |
| public Optional<QueueMessage> getQueueMessage() { |
| return queueMessage; |
| } |
| |
| public Optional<IndexOperationMessage> getIndexOperationMessage() { |
| return indexOperationMessage; |
| } |
| |
| public long getCreationTime() { |
| return creationTime; |
| } |
| } |
| } |