blob: 36c1a068b5a8bfc5b4d02deb14363b3244f68d3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.BeforeClass;
import org.junit.Test;
public class ConcurrentUpdateHttp2SolrClientTest extends SolrJettyTestBase {
@BeforeClass
public static void beforeTest() throws Exception {
JettyConfig jettyConfig = JettyConfig.builder()
.withServlet(new ServletHolder(ConcurrentUpdateSolrClientTest.TestServlet.class), "/cuss/*")
.withSSLConfig(sslConfig.buildServerSSLConfig())
.build();
createAndStartJetty(legacyExampleCollection1SolrHome(), jettyConfig);
}
@Test
public void testConcurrentUpdate() throws Exception {
ConcurrentUpdateSolrClientTest.TestServlet.clear();
String serverUrl = jetty.getBaseUrl().toString() + "/cuss/foo";
int cussThreadCount = 2;
int cussQueueSize = 100;
// for tracking callbacks from CUSS
final AtomicInteger successCounter = new AtomicInteger(0);
final AtomicInteger errorCounter = new AtomicInteger(0);
final StringBuilder errors = new StringBuilder();
try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build();
ConcurrentUpdateHttp2SolrClient concurrentClient = new OutcomeCountingConcurrentUpdateSolrClient.Builder(serverUrl, http2Client, successCounter, errorCounter, errors)
.withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount)
.build()) {
concurrentClient.setPollQueueTime(0);
// ensure it doesn't block where there's nothing to do yet
concurrentClient.blockUntilFinished();
int poolSize = 5;
ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrNamedThreadFactory("testCUSS"));
int numDocs = 100;
int numRunnables = 5;
for (int r=0; r < numRunnables; r++)
threadPool.execute(new ConcurrentUpdateSolrClientTest.SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient));
// ensure all docs are sent
threadPool.awaitTermination(5, TimeUnit.SECONDS);
threadPool.shutdown();
// wait until all requests are processed by CUSS
concurrentClient.blockUntilFinished();
concurrentClient.shutdownNow();
assertEquals("post", ConcurrentUpdateSolrClientTest.TestServlet.lastMethod);
// expect all requests to be successful
int expectedSuccesses = ConcurrentUpdateSolrClientTest.TestServlet.numReqsRcvd.get();
assertTrue(expectedSuccesses > 0); // at least one request must have been sent
assertTrue("Expected no errors but got "+errorCounter.get()+
", due to: "+errors.toString(), errorCounter.get() == 0);
assertTrue("Expected "+expectedSuccesses+" successes, but got "+successCounter.get(),
successCounter.get() == expectedSuccesses);
int expectedDocs = numDocs * numRunnables;
assertTrue("Expected CUSS to send "+expectedDocs+" but got "+ ConcurrentUpdateSolrClientTest.TestServlet.numDocsRcvd.get(),
ConcurrentUpdateSolrClientTest.TestServlet.numDocsRcvd.get() == expectedDocs);
}
}
@Test
public void testCollectionParameters() throws IOException, SolrServerException {
int cussThreadCount = 2;
int cussQueueSize = 10;
try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build();
ConcurrentUpdateHttp2SolrClient concurrentClient
= (new ConcurrentUpdateHttp2SolrClient.Builder(jetty.getBaseUrl().toString(), http2Client))
.withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount).build()) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "collection");
concurrentClient.add("collection1", doc);
concurrentClient.commit("collection1");
assertEquals(1, concurrentClient.query("collection1", new SolrQuery("id:collection")).getResults().getNumFound());
}
try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build();
ConcurrentUpdateHttp2SolrClient concurrentClient
= new ConcurrentUpdateHttp2SolrClient.Builder(jetty.getBaseUrl().toString() + "/collection1", http2Client)
.withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount).build()) {
assertEquals(1, concurrentClient.query(new SolrQuery("id:collection")).getResults().getNumFound());
}
}
@Test
public void testConcurrentCollectionUpdate() throws Exception {
int cussThreadCount = 2;
int cussQueueSize = 100;
int numDocs = 100;
int numRunnables = 5;
int expected = numDocs * numRunnables;
try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build();
ConcurrentUpdateHttp2SolrClient concurrentClient
= new ConcurrentUpdateHttp2SolrClient.Builder(jetty.getBaseUrl().toString(), http2Client)
.withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount).build()) {
concurrentClient.setPollQueueTime(0);
// ensure it doesn't block where there's nothing to do yet
concurrentClient.blockUntilFinished();
// Delete all existing documents.
concurrentClient.deleteByQuery("collection1", "*:*");
int poolSize = 5;
ExecutorService threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrNamedThreadFactory("testCUSS"));
for (int r=0; r < numRunnables; r++)
threadPool.execute(new ConcurrentUpdateSolrClientTest.SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient, "collection1"));
// ensure all docs are sent
threadPool.awaitTermination(5, TimeUnit.SECONDS);
threadPool.shutdown();
concurrentClient.commit("collection1");
assertEquals(expected, concurrentClient.query("collection1", new SolrQuery("*:*")).getResults().getNumFound());
// wait until all requests are processed by CUSS
concurrentClient.blockUntilFinished();
concurrentClient.shutdownNow();
}
try (Http2SolrClient http2Client = new Http2SolrClient.Builder().build();
ConcurrentUpdateHttp2SolrClient concurrentClient
= new ConcurrentUpdateHttp2SolrClient.Builder(jetty.getBaseUrl().toString() + "/collection1", http2Client)
.withQueueSize(cussQueueSize)
.withThreadCount(cussThreadCount).build()) {
assertEquals(expected, concurrentClient.query(new SolrQuery("*:*")).getResults().getNumFound());
}
}
static class OutcomeCountingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient {
private final AtomicInteger successCounter;
private final AtomicInteger failureCounter;
private final StringBuilder errors;
public OutcomeCountingConcurrentUpdateSolrClient(OutcomeCountingConcurrentUpdateSolrClient.Builder builder) {
super(builder);
this.successCounter = builder.successCounter;
this.failureCounter = builder.failureCounter;
this.errors = builder.errors;
}
@Override
public void handleError(Throwable ex) {
failureCounter.incrementAndGet();
errors.append(" "+ex);
}
@Override
public void onSuccess(Response resp, InputStream respBody) {
successCounter.incrementAndGet();
}
static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
protected final AtomicInteger successCounter;
protected final AtomicInteger failureCounter;
protected final StringBuilder errors;
public Builder(String baseSolrUrl, Http2SolrClient http2Client, AtomicInteger successCounter, AtomicInteger failureCounter, StringBuilder errors) {
super(baseSolrUrl, http2Client);
this.successCounter = successCounter;
this.failureCounter = failureCounter;
this.errors = errors;
}
public OutcomeCountingConcurrentUpdateSolrClient build() {
return new OutcomeCountingConcurrentUpdateSolrClient(this);
}
}
}
}