blob: 62ae86ed4a44c4376df097d93b1df2bd703ef50d [file] [log] [blame]
package org.apache.solr.crossdc.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.*;
public class PartitionManager {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final ConcurrentHashMap<TopicPartition, PartitionWork> partitionWorkMap = new ConcurrentHashMap<>();
private final KafkaConsumer<String, MirroredSolrRequest> consumer;
static class PartitionWork {
final Queue<WorkUnit> partitionQueue = new LinkedList<>();
}
static class WorkUnit {
final TopicPartition partition;
Set<Future<?>> workItems = new HashSet<>();
long nextOffset;
public WorkUnit(TopicPartition partition) {
this.partition = partition;
}
}
PartitionManager(KafkaConsumer<String, MirroredSolrRequest> consumer) {
this.consumer = consumer;
}
public PartitionWork getPartitionWork(TopicPartition partition) {
return partitionWorkMap.compute(partition, (k, v) -> {
if (v == null) {
return new PartitionWork();
}
return v;
});
}
public void checkOffsetUpdates() throws Throwable {
for (TopicPartition partition : partitionWorkMap.keySet()) {
checkForOffsetUpdates(partition);
}
}
void checkForOffsetUpdates(TopicPartition partition) throws Throwable {
synchronized (partition) {
PartitionWork work;
if ((work = partitionWorkMap.get(partition)) != null) {
WorkUnit workUnit = work.partitionQueue.peek();
if (workUnit != null) {
boolean allFuturesDone = true;
for (Future<?> future : workUnit.workItems) {
if (!future.isDone()) {
if (log.isTraceEnabled()) {
log.trace("Future for update is not done topic={}", partition.topic());
}
allFuturesDone = false;
break;
}
try {
future.get();
} catch (InterruptedException e) {
log.error("Error updating offset for partition: {}", partition, e);
throw e;
} catch (ExecutionException e) {
log.error("Error updating offset for partition: {}", partition, e);
throw e.getCause();
}
if (log.isTraceEnabled()) {
log.trace("Future for update is done topic={}", partition.topic());
}
}
if (allFuturesDone) {
work.partitionQueue.poll();
updateOffset(partition, workUnit.nextOffset);
}
}
}
}
}
/**
* Reset the local offset so that the consumer reads the records from Kafka again.
*
* @param partition The TopicPartition to reset the offset for
* @param partitionRecords PartitionRecords for the specified partition
*/
private void resetOffsetForPartition(TopicPartition
partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
if (log.isTraceEnabled()) {
log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
}
long resetOffset = partitionRecords.get(0).offset();
consumer.seek(partition, resetOffset);
}
/**
* Logs and updates the commit point for the partition that has been processed.
*
* @param partition The TopicPartition to update the offset for
* @param nextOffset The next offset to commit for this partition.
*/
private void updateOffset(TopicPartition partition, long nextOffset) {
if (log.isTraceEnabled()) {
log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(), partition.partition(), nextOffset);
}
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
}
static long getOffsetForPartition(List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
return partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
}
}