blob: dcea1d00b733d4c30df55f66ee3df93362845d19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.crossdc.messageprocessor;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SolrResponseBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.CrossDcConstants;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.SolrExceptionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Message processor implements all the logic to process a MirroredSolrRequest.
* It handles:
* 1. Sending the update request to Solr
* 2. Discarding or retrying failed requests
* 3. Flagging requests for resubmission by the underlying consumer implementation.
*/
public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
final CloudSolrClient client;
private static final String VERSION_FIELD = "_version_";
public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
super(resubmitBackoffPolicy);
this.client = client;
}
@Override
public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
connectToSolrIfNeeded();
// preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring handler?
return processMirroredRequest(mirroredSolrRequest);
}
private Result<MirroredSolrRequest> processMirroredRequest(MirroredSolrRequest request) {
final Result<MirroredSolrRequest> result = handleSolrRequest(request);
// Back-off before returning
backoffIfNeeded(result);
return result;
}
private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
SolrRequest request = mirroredSolrRequest.getSolrRequest();
final SolrParams requestParams = request.getParams();
if (log.isTraceEnabled()) {
log.trace("handleSolrRequest params={}", requestParams);
}
// TODO: isn't this handled by the mirroring handler?
// final String shouldMirror = requestParams.get("shouldMirror");
//
// if ("false".equalsIgnoreCase(shouldMirror)) {
// log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
// return new Result<>(ResultStatus.FAILED_NO_RETRY);
// }
logFirstAttemptLatency(mirroredSolrRequest);
Result<MirroredSolrRequest> result;
try {
prepareIfUpdateRequest(request);
logRequest(request);
result = processMirroredSolrRequest(request);
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
}
return result;
}
private Result<MirroredSolrRequest> handleException(MirroredSolrRequest mirroredSolrRequest, Exception e) {
final SolrException solrException = SolrExceptionUtil.asSolrException(e);
logIf4xxException(solrException);
if (!isRetryable(e)) {
logFailure(mirroredSolrRequest, e, solrException, false);
return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
} else {
logFailure(mirroredSolrRequest, e, solrException, true);
mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
maybeBackoff(solrException);
return new Result<>(ResultStatus.FAILED_RESUBMIT, e, mirroredSolrRequest);
}
}
private void maybeBackoff(SolrException solrException) {
if (solrException == null) {
return;
}
long sleepTimeMs = 1000;
String backoffTimeSuggested = solrException.getMetadata("backoffTime-ms");
if (backoffTimeSuggested != null && !"0".equals(backoffTimeSuggested)) {
// If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
}
log.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
uncheckedSleep(sleepTimeMs);
}
private boolean isRetryable(Exception e) {
SolrException se = SolrExceptionUtil.asSolrException(e);
if (se != null) {
int code = se.code();
if (code == SolrException.ErrorCode.CONFLICT.code) {
return false;
}
}
// Everything other than version conflict exceptions should be retried.
log.warn("Unexpected exception, will resubmit the request to the queue", e);
return true;
}
private void logIf4xxException(SolrException solrException) {
// This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
}
}
private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
// This shouldn't really happen.
if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
return;
}
log.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
}
/**
*
* Process the SolrRequest. If not, this method throws an exception.
*/
private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
if (log.isTraceEnabled()) {
log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
}
Result<MirroredSolrRequest> result;
SolrResponseBase response = (SolrResponseBase) request.process(client);
int status = response.getStatus();
if (log.isTraceEnabled()) {
log.trace("result status={}", status);
}
if (status != 0) {
metrics.counter("processedErrors").inc();
throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
}
metrics.counter("processed").inc();
result = new Result<>(ResultStatus.HANDLED);
return result;
}
private void logRequest(SolrRequest request) {
if(request instanceof UpdateRequest) {
final StringBuilder rmsg = new StringBuilder(64);
String collection = request.getCollection();
rmsg.append("Submitting update request for collection=").append(collection != null ? collection : request.getParams().get("collection"));
if(((UpdateRequest) request).getDeleteById() != null) {
final int numDeleteByIds = ((UpdateRequest) request).getDeleteById().size();
metrics.counter("numDeleteByIds").inc(numDeleteByIds);
rmsg.append(" numDeleteByIds=").append(numDeleteByIds);
}
if(((UpdateRequest) request).getDocuments() != null) {
final int numUpdates = ((UpdateRequest) request).getDocuments().size();
metrics.counter("numUpdates").inc(numUpdates);
rmsg.append(" numUpdates=").append(numUpdates);
}
if(((UpdateRequest) request).getDeleteQuery() != null) {
final int numDeleteByQuery = ((UpdateRequest) request).getDeleteQuery().size();
metrics.counter("numDeleteByQuery").inc(numDeleteByQuery);
rmsg.append(" numDeleteByQuery=").append(numDeleteByQuery);
}
log.info(rmsg.toString());
}
}
/**
* Clean up the Solr request to be submitted locally.
* @param request The SolrRequest to be cleaned up for submitting locally.
*/
private void prepareIfUpdateRequest(SolrRequest request) {
if (request instanceof UpdateRequest) {
// Remove versions from add requests
UpdateRequest updateRequest = (UpdateRequest) request;
List<SolrInputDocument> documents = updateRequest.getDocuments();
if (log.isTraceEnabled()) {
log.trace("update request docs={} deletebyid={} deletebyquery={}", documents, updateRequest.getDeleteById(), updateRequest.getDeleteQuery());
}
if (documents != null) {
for (SolrInputDocument doc : documents) {
sanitizeDocument(doc);
}
}
removeVersionFromDeleteByIds(updateRequest);
}
}
/**
* Strips fields that are problematic for replication.
*/
private void sanitizeDocument(SolrInputDocument doc) {
SolrInputField field = doc.getField(VERSION_FIELD);
if (log.isTraceEnabled()) {
log.trace("Removing {} value={}", VERSION_FIELD,
field == null ? "null" : field.getValue());
}
doc.remove(VERSION_FIELD);
}
private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
if (log.isTraceEnabled()) {
log.trace("remove versions from deletebyids");
}
Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
if (deleteIds != null) {
for (Map<String, Object> idParams : deleteIds.values()) {
if (idParams != null) {
idParams.put(UpdateRequest.VER, null);
}
}
}
}
private void logFirstAttemptLatency(MirroredSolrRequest mirroredSolrRequest) {
// Only record the latency of the first attempt, essentially measuring the latency from submitting on the
// primary side until the request is eligible to be consumed on the buddy side (or vice versa).
if (mirroredSolrRequest.getAttempt() == 1) {
final long latency = System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos());
log.debug("First attempt latency = {}", latency);
metrics.timer("latency").update(latency, TimeUnit.MILLISECONDS);
}
}
/**
* Adds {@link CrossDcConstants#SHOULD_MIRROR}=false to the params if it's not already specified.
* Logs a warning if it is specified and NOT set to false. (i.e. circular mirror may occur)
*
* @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
*/
void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
ModifiableSolrParams params = updateRequest.getParams();
String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
if (shouldMirror == null) {
log.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
} else if (!"false".equalsIgnoreCase(shouldMirror)) {
log.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
}
} else {
SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
if (shouldMirror == null) {
if (params instanceof ModifiableSolrParams) {
log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
} else {
log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
}
} else if (!"false".equalsIgnoreCase(shouldMirror)) {
log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
}
}
}
private void connectToSolrIfNeeded() {
// Don't try to consume anything if we can't connect to the solr server
boolean connected = false;
while (!connected) {
try {
client.connect(); // volatile null-check if already connected
connected = true;
} catch (Exception e) {
log.error("Unable to connect to solr server. Not consuming.", e);
uncheckedSleep(5000);
}
}
}
public void uncheckedSleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private void backoffIfNeeded(Result<MirroredSolrRequest> result) {
if (result.status().equals(ResultStatus.FAILED_RESUBMIT)) {
final long backoffMs = getResubmitBackoffPolicy().getBackoffTimeMs(result.newItem());
if (backoffMs > 0L) {
try {
Thread.sleep(backoffMs);
} catch (final InterruptedException ex) {
// we're about to exit the method anyway, so just log this and return the item. Let the caller
// handle it.
log.warn("Thread interrupted while backing off before retry");
Thread.currentThread().interrupt();
}
}
}
}
}