blob: a3a81c53cb6d1b8bdb71ee74698277340618218e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.client.HttpClient;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
*
*/
private final HttpClient httpClient;
private volatile boolean stop = false;
int clientIndex = 0;
private ConcurrentUpdateSolrClient cusc;
private List<SolrClient> clients;
private AtomicInteger fails = new AtomicInteger();
public FullThrottleStoppableIndexingThread(HttpClient httpClient, SolrClient controlClient, CloudSolrClient cloudClient, List<SolrClient> clients,
String id, boolean doDeletes, int clientSoTimeout) {
super(controlClient, cloudClient, id, doDeletes);
setName("FullThrottleStopableIndexingThread");
setDaemon(true);
this.clients = clients;
this.httpClient = httpClient;
cusc = new ErrorLoggingConcurrentUpdateSolrClient.Builder(((HttpSolrClient) clients.get(0)).getBaseURL())
.withHttpClient(httpClient)
.withQueueSize(8)
.withThreadCount(2)
.withConnectionTimeout(10000)
.withSocketTimeout(clientSoTimeout)
.build();
}
@Override
public void run() {
int i = 0;
int numDeletes = 0;
int numAdds = 0;
while (true && !stop) {
String id = this.id + "-" + i;
++i;
if (doDeletes && LuceneTestCase.random().nextBoolean() && deletes.size() > 0) {
String delete = deletes.remove(0);
try {
numDeletes++;
cusc.deleteById(delete);
} catch (Exception e) {
changeUrlOnError(e);
fails.incrementAndGet();
}
}
try {
numAdds++;
if (numAdds > (LuceneTestCase.TEST_NIGHTLY ? 4002 : 197))
continue;
SolrInputDocument doc = AbstractFullDistribZkTestBase.getDoc(
"id",
id,
i1,
50,
t1,
"Saxon heptarchies that used to rip around so in old times and raise Cain. My, you ought to seen old Henry the Eight when he was in bloom. He WAS a blossom. He used to marry a new wife every day, and chop off her head next morning. And he would do it just as indifferent as if ");
cusc.add(doc);
} catch (Exception e) {
changeUrlOnError(e);
fails.incrementAndGet();
}
if (doDeletes && LuceneTestCase.random().nextBoolean()) {
deletes.add(id);
}
}
log.info("FT added docs:{} with {} fails deletes:{}", numAdds, fails, numDeletes);
}
private void changeUrlOnError(Exception e) {
if (e instanceof ConnectException) {
clientIndex++;
if (clientIndex > clients.size() - 1) {
clientIndex = 0;
}
cusc.shutdownNow();
cusc = new ErrorLoggingConcurrentUpdateSolrClient.Builder(((HttpSolrClient) clients.get(clientIndex)).getBaseURL())
.withHttpClient(httpClient)
.withQueueSize(30)
.withThreadCount(3)
.build();
}
}
@Override
public void safeStop() {
stop = true;
try {
cusc.blockUntilFinished();
} catch (IOException e) {
log.warn("Exception waiting for the indexing client to finish", e);
} finally {
cusc.shutdownNow();
}
}
@Override
public int getFailCount() {
return fails.get();
}
@Override
public Set<String> getAddFails() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getDeleteFails() {
throw new UnsupportedOperationException();
}
static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
public ErrorLoggingConcurrentUpdateSolrClient(Builder builder) {
super(builder);
}
@Override
public void handleError(Throwable ex) {
log.warn("cusc error", ex);
}
static class Builder extends ConcurrentUpdateSolrClient.Builder {
public Builder(String baseSolrUrl) {
super(baseSolrUrl);
}
public ErrorLoggingConcurrentUpdateSolrClient build() {
return new ErrorLoggingConcurrentUpdateSolrClient(this);
}
}
}
}