blob: af9020ae5e499f5e3f8d63a0956f8456cb28b505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.UpdateLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The state of the replication with a target cluster.
*/
class CdcrReplicatorState {
private final String targetCollection;
private final String zkHost;
private final CloudSolrClient targetClient;
private CdcrUpdateLog.CdcrLogReader logReader;
private long consecutiveErrors = 0;
private final Map<ErrorType, Long> errorCounters = new HashMap<>();
private final FixedQueue<ErrorQueueEntry> errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
private BenchmarkTimer benchmarkTimer;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
private final AtomicInteger numBootstraps = new AtomicInteger();
CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
this.targetCollection = targetCollection;
this.targetClient = targetClient;
this.zkHost = zkHost;
this.benchmarkTimer = new BenchmarkTimer();
}
/**
* Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
* that is positioned at the last target cluster checkpoint.
*/
void init(final CdcrUpdateLog.CdcrLogReader logReader) {
this.logReader = logReader;
}
void closeLogReader() {
if (logReader != null) {
logReader.close();
logReader = null;
}
}
CdcrUpdateLog.CdcrLogReader getLogReader() {
return logReader;
}
String getTargetCollection() {
return targetCollection;
}
String getZkHost() {
return zkHost;
}
CloudSolrClient getClient() {
return targetClient;
}
void shutdown() {
try {
targetClient.close();
} catch (IOException ioe) {
log.warn("Caught exception trying to close server: ", ioe);
}
this.closeLogReader();
}
void reportError(ErrorType error) {
if (!errorCounters.containsKey(error)) {
errorCounters.put(error, 0l);
}
errorCounters.put(error, errorCounters.get(error) + 1);
errorsQueue.add(new ErrorQueueEntry(error, new Date()));
consecutiveErrors++;
}
void resetConsecutiveErrors() {
consecutiveErrors = 0;
}
/**
* Returns the number of consecutive errors encountered while trying to forward updates to the target.
*/
long getConsecutiveErrors() {
return consecutiveErrors;
}
/**
* Gets the number of errors of a particular type.
*/
long getErrorCount(ErrorType type) {
if (errorCounters.containsKey(type)) {
return errorCounters.get(type);
} else {
return 0;
}
}
/**
* Gets the last errors ordered by timestamp (most recent first)
*/
List<String[]> getLastErrors() {
List<String[]> lastErrors = new ArrayList<>();
synchronized (errorsQueue) {
Iterator<ErrorQueueEntry> it = errorsQueue.iterator();
while (it.hasNext()) {
ErrorQueueEntry entry = it.next();
lastErrors.add(new String[]{entry.timestamp.toInstant().toString(), entry.type.toLower()});
}
}
return lastErrors;
}
/**
* Return the timestamp of the last processed operations
*/
String getTimestampOfLastProcessedOperation() {
if (logReader != null && logReader.getLastVersion() != -1) {
// Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
return Instant.ofEpochMilli(logReader.getLastVersion() >> 20).toString();
}
return "";
}
/**
* Gets the benchmark timer.
*/
BenchmarkTimer getBenchmarkTimer() {
return this.benchmarkTimer;
}
/**
* @return true if a bootstrap operation is in progress, false otherwise
*/
boolean isBootstrapInProgress() {
return bootstrapInProgress.get();
}
void setBootstrapInProgress(boolean inProgress) {
if (bootstrapInProgress.compareAndSet(true, false)) {
numBootstraps.incrementAndGet();
}
bootstrapInProgress.set(inProgress);
}
public int getNumBootstraps() {
return numBootstraps.get();
}
enum ErrorType {
INTERNAL,
BAD_REQUEST;
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
}
static class BenchmarkTimer {
private long startTime;
private long runTime = 0;
private Map<Integer, Long> opCounters = new HashMap<>();
/**
* Start recording time.
*/
void start() {
startTime = System.nanoTime();
}
/**
* Stop recording time.
*/
void stop() {
runTime += System.nanoTime() - startTime;
startTime = -1;
}
void incrementCounter(final int operationType) {
switch (operationType) {
case UpdateLog.ADD:
case UpdateLog.DELETE:
case UpdateLog.DELETE_BY_QUERY: {
if (!opCounters.containsKey(operationType)) {
opCounters.put(operationType, 0l);
}
opCounters.put(operationType, opCounters.get(operationType) + 1);
return;
}
default:
}
}
long getRunTime() {
long totalRunTime = runTime;
if (startTime != -1) { // we are currently recording the time
totalRunTime += System.nanoTime() - startTime;
}
return totalRunTime;
}
double getOperationsPerSecond() {
long total = 0;
for (long counter : opCounters.values()) {
total += counter;
}
double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
return total / elapsedTimeInSeconds;
}
double getAddsPerSecond() {
long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
return total / elapsedTimeInSeconds;
}
double getDeletesPerSecond() {
long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
return total / elapsedTimeInSeconds;
}
}
private static class ErrorQueueEntry {
private ErrorType type;
private Date timestamp;
private ErrorQueueEntry(ErrorType type, Date timestamp) {
this.type = type;
this.timestamp = timestamp;
}
}
private static class FixedQueue<E> extends LinkedList<E> {
private int maxSize;
public FixedQueue(int maxSize) {
this.maxSize = maxSize;
}
@Override
public synchronized boolean add(E e) {
super.addFirst(e);
if (size() > maxSize) {
removeLast();
}
return true;
}
}
}