blob: 6d2d5435708d9e590e59cbaaa6be751be5cca296 [file] [log] [blame]
package org.apache.solr.crossdc;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
@ThreadLeakLingering(linger = 5000) public class RetryQueueIntegrationTest extends
SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String VERSION_FIELD = "_version_";
private static final int NUM_BROKERS = 1;
public static EmbeddedKafkaCluster kafkaCluster;
protected static volatile MiniSolrCloudCluster solrCluster1;
protected static volatile MiniSolrCloudCluster solrCluster2;
protected static volatile Consumer consumer = new Consumer();
private static String TOPIC = "topic1";
private static String COLLECTION = "collection1";
private static Path baseDir1;
private static Path baseDir2;
private static ZkTestServer zkTestServer1;
private static ZkTestServer zkTestServer2;
@BeforeClass
public static void beforeRetryQueueIntegrationTestTest() throws Exception {
Properties config = new Properties();
config.put("unclean.leader.election.enable", "true");
config.put("enable.partition.eof", "false");
kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
public String bootstrapServers() {
return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
}
};
kafkaCluster.start();
kafkaCluster.createTopic(TOPIC, 1, 1);
System.setProperty("topicName", TOPIC);
System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
baseDir1 = createTempDir();
Path zkDir1 = baseDir1.resolve("zookeeper/server1/data");
zkTestServer1 = new ZkTestServer(zkDir1);
try {
zkTestServer1.run();
} catch (Exception e) {
log.error("Error starting Zk Test Server, trying again ...");
zkTestServer1.shutdown();
zkTestServer1 = new ZkTestServer(zkDir1);
zkTestServer1.run();
}
baseDir2 = createTempDir();
Path zkDir2 = baseDir2.resolve("zookeeper/server1/data");
zkTestServer2 = new ZkTestServer(zkDir2);
try {
zkTestServer2.run();
} catch (Exception e) {
log.error("Error starting Zk Test Server, trying again ...");
zkTestServer2.shutdown();
zkTestServer2 = new ZkTestServer(zkDir2);
zkTestServer2.run();
}
solrCluster1 = startCluster(solrCluster1, zkTestServer1, baseDir1);
solrCluster2 = startCluster(solrCluster2, zkTestServer2, baseDir2);
CloudSolrClient client = solrCluster1.getSolrClient();
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
Map<String,Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
consumer.start(properties);
}
private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
MiniSolrCloudCluster cluster =
new MiniSolrCloudCluster(1, baseDir, MiniSolrCloudCluster.DEFAULT_CLOUD_SOLR_XML,
JettyConfig.builder().setContext("/solr")
.withSSLConfig(sslConfig.buildServerSSLConfig()).build(), zkTestServer);
//new SolrCloudTestCase.Builder(1, baseDir).addConfig("conf",
//getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
CloudSolrClient client = cluster.getSolrClient();
((ZkClientClusterStateProvider)client.getClusterStateProvider()).uploadConfig(getFile("src/test/resources/configs/cloud-minimal/conf").toPath(), "conf");
CollectionAdminRequest.Create create2 =
CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
cluster.getSolrClient().request(create2);
cluster.waitForActiveCollection(COLLECTION, 1, 1);
cluster.getSolrClient().setDefaultCollection(COLLECTION);
return cluster;
}
@AfterClass
public static void afterRetryQueueIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
consumer.shutdown();
try {
kafkaCluster.stop();
} catch (Exception e) {
log.error("Exception stopping Kafka cluster", e);
}
if (solrCluster1 != null) {
solrCluster1.shutdown();
}
if (solrCluster2 != null) {
solrCluster2.shutdown();
}
if (zkTestServer1 != null) {
zkTestServer1.shutdown();
}
if (zkTestServer2 != null) {
zkTestServer2.shutdown();
}
consumer = null;
solrCluster1 = null;
solrCluster2 = null;
kafkaCluster = null;
zkTestServer1 = null;
zkTestServer2 = null;
}
@After
public void tearDown() throws Exception {
super.tearDown();
solrCluster1.getSolrClient().deleteByQuery("*:*");
solrCluster2.getSolrClient().deleteByQuery("*:*");
solrCluster1.getSolrClient().commit();
solrCluster2.getSolrClient().commit();
}
@Test
public void testRetryQueue() throws Exception {
Path zkDir = zkTestServer2.getZkDir();
int zkPort = zkTestServer2.getPort();
zkTestServer2.shutdown();
CloudSolrClient client = solrCluster1.getSolrClient();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(System.nanoTime()));
doc.addField("text", "some test");
client.add(doc);
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("id", String.valueOf(System.nanoTime()));
doc2.addField("text", "some test");
client.add(doc2);
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField("id", String.valueOf(System.nanoTime()));
doc3.addField("text", "some test");
client.add(doc3);
client.commit(COLLECTION);
System.out.println("Sent producer record");
Thread.sleep(5000);
zkTestServer2 = new ZkTestServer(zkDir, zkPort);
zkTestServer2.run(false);
QueryResponse results = null;
boolean foundUpdates = false;
for (int i = 0; i < 200; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
if (results.getResults().getNumFound() == 3) {
foundUpdates = true;
} else {
Thread.sleep(100);
}
}
System.out.println("Closed producer");
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);
}
}