blob: e8195ac15e01ae8c1f3a8b76f5994f05810e639b [file] [log] [blame]
package org.apache.solr.crossdc.consumer;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.crossdc.common.*;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
/**
* This is a Java class called KafkaCrossDcConsumer, which is part of the Apache Solr framework.
* It consumes messages from Kafka and mirrors them into a Solr instance. It uses a KafkaConsumer
* object to subscribe to one or more topics and receive ConsumerRecords that contain MirroredSolrRequest
* objects. The SolrMessageProcessor handles each MirroredSolrRequest and sends the resulting
* UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is used to handle the update
* requests asynchronously. The KafkaCrossDcConsumer also handles offset management, committing offsets
* to Kafka and can seek to specific offsets for error recovery. The class provides methods to start and
* top the consumer thread.
*/
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
private final KafkaConsumer<String,MirroredSolrRequest> kafkaConsumer;
private final CountDownLatch startLatch;
KafkaMirroringSink kafkaMirroringSink;
private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
private final String[] topicNames;
private final SolrMessageProcessor messageProcessor;
private final CloudSolrClient solrClient;
private final ThreadPoolExecutor executor;
private PartitionManager partitionManager;
private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
/**
* @param conf The Kafka consumer configuration
* @param startLatch
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID));
kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("KafkaCrossDcConsumerWorker");
return t;
}
});
executor.prestartAllCoreThreads();
solrClient = createSolrClient(conf);
messageProcessor = createSolrMessageProcessor();
log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
this.kafkaMirroringSink = createKafkaMirroringSink(conf);
log.info("Created Kafka resubmit producer");
}
protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
}
public KafkaConsumer<String,MirroredSolrRequest> createKafkaConsumer(Properties properties) {
return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
}
/**
* This is where the magic happens.
* 1. Polls and gets the packets from the queue
* 2. Extract the MirroredSolrRequest objects
* 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
*/
@Override public void run() {
log.info("About to start Kafka consumer thread, topics={}", Arrays.asList(topicNames));
try {
kafkaConsumer.subscribe(Arrays.asList((topicNames)));
log.info("Consumer started");
startLatch.countDown();
while (pollAndProcessRequests()) {
//no-op within this loop: everything is done in pollAndProcessRequests method defined above.
}
log.info("Closed kafka consumer. Exiting now.");
try {
kafkaConsumer.close();
} catch (Exception e) {
log.warn("Failed to close kafka consumer", e);
}
try {
kafkaMirroringSink.close();
} catch (Exception e) {
log.warn("Failed to close kafka mirroring sink", e);
}
} finally {
IOUtils.closeQuietly(solrClient);
}
}
/**
* Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
* shutdown i.e. when there's a wakeup exception.
*/
boolean pollAndProcessRequests() {
log.trace("Entered pollAndProcessRequests loop");
try {
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
ConsumerRecords<String,MirroredSolrRequest> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
if (log.isTraceEnabled()) {
log.trace("poll return {} records", records.count());
}
UpdateRequest solrReqBatch = null;
ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = records.records(partition);
PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
try {
ModifiableSolrParams lastParams = null;
NamedList lastParamsAsNamedList = null;
solrReqBatch = new UpdateRequest();
for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : partitionRecords) {
if (log.isTraceEnabled()) {
log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(),
requestRecord.value());
}
lastRecord = requestRecord;
MirroredSolrRequest req = requestRecord.value();
UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
ModifiableSolrParams params = solrReq.getParams();
if (log.isTraceEnabled()) {
log.trace("params={}", params);
}
if (lastParams != null && !lastParams.toNamedList().equals(params.toNamedList())) {
if (log.isTraceEnabled()) {
log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
}
lastParamsAsNamedList = null;
sendBatch(solrReqBatch, lastRecord, workUnit);
solrReqBatch = new UpdateRequest();
workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
}
lastParams = solrReq.getParams();
solrReqBatch.setParams(params);
if (lastParamsAsNamedList == null) {
lastParamsAsNamedList = lastParams.toNamedList();
}
List<SolrInputDocument> docs = solrReq.getDocuments();
if (docs != null) {
solrReqBatch.add(docs);
}
List<String> deletes = solrReq.getDeleteById();
if (deletes != null) {
solrReqBatch.deleteById(deletes);
}
List<String> deleteByQuery = solrReq.getDeleteQuery();
if (deleteByQuery != null) {
for (String delByQuery : deleteByQuery) {
solrReqBatch.deleteByQuery(delByQuery);
}
}
}
sendBatch(solrReqBatch, lastRecord, workUnit);
try {
partitionManager.checkForOffsetUpdates(partition);
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
// handleItem sets the thread interrupt, let's exit if there has been an interrupt set
if (Thread.currentThread().isInterrupted()) {
log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
return false;
} catch (Exception e) {
// If there is any exception returned by handleItem, don't set the offset.
if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
return false;
}
}
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
return false;
} catch (Exception e) {
if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
}
return true;
}
public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
UpdateRequest finalSolrReqBatch = solrReqBatch;
Future<?> future = executor.submit(() -> {
try {
IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
processResult(lastRecord, result);
} catch (MirroringException e) {
// We don't really know what to do here
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
throw new RuntimeException(e);
} finally {
executor.submit(() -> {
try {
partitionManager.checkForOffsetUpdates(workUnit.partition);
} catch (Throwable e) {
// already logging in checkForOffsetUpdates
}
});
}
});
workUnit.workItems.add(future);
}
void processResult(ConsumerRecord<String,MirroredSolrRequest> record, IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
switch (result.status()) {
case FAILED_RESUBMIT:
if (log.isTraceEnabled()) {
log.trace("result=failed-resubmit");
}
metrics.counter("failed-resubmit").inc();
kafkaMirroringSink.submit(record.value());
break;
case HANDLED:
// no-op
if (log.isTraceEnabled()) {
log.trace("result=handled");
}
metrics.counter("handled").inc();
break;
case NOT_HANDLED_SHUTDOWN:
if (log.isTraceEnabled()) {
log.trace("result=nothandled_shutdown");
}
metrics.counter("nothandled_shutdown").inc();
case FAILED_RETRY:
log.error("Unexpected response while processing request. We never expect {}.", result.status().toString());
metrics.counter("failed-retry").inc();
break;
default:
if (log.isTraceEnabled()) {
log.trace("result=no matching case");
}
// no-op
}
}
/**
* Shutdown the Kafka consumer by calling wakeup.
*/
public final void shutdown() {
kafkaConsumer.wakeup();
log.info("Shutdown called on KafkaCrossDcConsumer");
try {
if (!executor.isShutdown()) {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
solrClient.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for executor to shutdown");
} catch (Exception e) {
log.warn("Exception closing Solr client on shutdown", e);
}
}
protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
return new CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)), Optional.empty()).build();
}
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}
}