blob: b31cf3931cf177191bace77d7cafc60929966876 [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. The ASF licenses this file to You
* * under the Apache License, Version 2.0 (the "License"); you may not
* * use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License. For additional information regarding
* * copyright in this work, please see the NOTICE file in the top level
* * directory of this distribution.
*
*/
package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferConsumer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* Consumer for IndexOperationMessages
*/
@Singleton
public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
private final IndexFig config;
private final FailureMonitorImpl failureMonitor;
private final Client client;
private final Timer flushTimer;
private final Counter indexSizeCounter;
private final Counter indexErrorCounter;
private final Meter flushMeter;
private final Timer produceTimer;
private final BufferQueue bufferQueue;
private final IndexFig indexFig;
private final AtomicLong counter = new AtomicLong( );
//the actively running subscription
private List<Subscription> subscriptions;
private Object mutex = new Object();
private AtomicLong inFlight = new AtomicLong( );
@Inject
public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "buffer.meter");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "buffer.size");
this.indexErrorCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "error.count");
//wire up the gauge of inflight messages
metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", new Gauge<Long>() {
@Override
public Long getValue() {
return inFlight.longValue();
}
} );
this.config = config;
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
this.bufferQueue = bufferQueue;
this.indexFig = indexFig;
subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
//batch up sets of some size and send them in batch
start();
}
/**
* Loop throught and start the workers
*/
public void start() {
final int count = indexFig.getWorkerCount();
for(int i = 0; i < count; i ++){
startWorker();
}
}
/**
* Stop the workers
*/
public void stop() {
synchronized ( mutex ) {
//stop consuming
for(final Subscription subscription: subscriptions){
subscription.unsubscribe();
}
}
}
private void startWorker(){
synchronized ( mutex) {
Observable<List<IndexOperationMessage>> consumer = Observable.create(
new Observable.OnSubscribe<List<IndexOperationMessage>>() {
@Override
public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
List<IndexOperationMessage> drainList;
do {
try {
Timer.Context timer = produceTimer.time();
drainList = bufferQueue
.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
TimeUnit.MILLISECONDS );
subscriber.onNext( drainList );
//take since we're in flight
inFlight.addAndGet( drainList.size() );
timer.stop();
}
catch ( Exception e ) {
final long sleepTime = config.getFailureRetryTime();
log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, e );
try {
Thread.sleep( sleepTime );
}
catch ( InterruptedException ie ) {
//swallow
}
indexErrorCounter.inc();
}
}
while ( true );
}
} ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
if ( containerList.size() == 0 ) {
return;
}
flushMeter.mark( containerList.size() );
Timer.Context time = flushTimer.time();
execute( containerList );
time.stop();
}
} )
//ack after we process
.doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( final List<IndexOperationMessage> indexOperationMessages ) {
bufferQueue.ack( indexOperationMessages );
//release so we know we've done processing
inFlight.addAndGet( -1 * indexOperationMessages.size() );
}
} ).doOnError( new Action1<Throwable>() {
@Override
public void call( final Throwable throwable ) {
log.error( "An exception occurred when trying to deque and write to elasticsearch. Ignoring",
throwable );
indexErrorCounter.inc();
}
} );
//start in the background
final Subscription subscription = consumer.subscribe();
subscriptions.add(subscription );
}
}
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
private void execute(final List<IndexOperationMessage> operationMessages) {
if (operationMessages == null || operationMessages.size() == 0) {
return;
}
//process and flatten all the messages to builder requests
//batch shard operations into a bulk request
Observable.from( operationMessages ).flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
final Observable<DeIndexRequest> deIndex =
Observable.from( indexOperationMessage.getDeIndexRequests() );
indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() );
indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() );
return Observable.merge( index, deIndex );
}
} )
//collection all the operations into a single stream
.reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() {
@Override
public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder,
final BatchRequest batchRequest ) {
batchRequest.doOperation( client, bulkRequestBuilder );
return bulkRequestBuilder;
}
} )
//send the request off to ES
.doOnNext( new Action1<BulkRequestBuilder>() {
@Override
public void call( final BulkRequestBuilder bulkRequestBuilder ) {
sendRequest( bulkRequestBuilder );
}
} ).toBlocking().lastOrDefault(null);
//call back all futures
Observable.from(operationMessages)
.doOnNext(new Action1<IndexOperationMessage>() {
@Override
public void call(IndexOperationMessage operationMessage) {
operationMessage.getFuture().done();
}
})
.toBlocking().lastOrDefault(null);
}
/**
* initialize request
* @return
*/
private BulkRequestBuilder initRequest() {
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
bulkRequest.setRefresh(config.isForcedRefresh());
return bulkRequest;
}
/**
* send bulk request
* @param bulkRequest
*/
private void sendRequest(BulkRequestBuilder bulkRequest) {
//nothing to do, we haven't added anthing to the index
if (bulkRequest.numberOfActions() == 0) {
return;
}
final BulkResponse responses;
try {
responses = bulkRequest.execute().actionGet();
} catch (Throwable t) {
log.error("Unable to communicate with elasticsearch");
failureMonitor.fail("Unable to execute batch", t);
throw t;
}
failureMonitor.success();
for (BulkItemResponse response : responses) {
if (response.isFailed()) {
final BulkItemResponse.Failure failure = response.getFailure();
final String message;
if(failure != null) {
message = "Unable to index documents. Errors are :" + response.getFailure().getMessage();
}else{
message = "Unable to index documents. Response is :" + response.getResponse();
}
throw new RuntimeException(message);
}
}
}
}