blob: 5419baf193af32bc09bbba0079e651366a94ecce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JsonRecordReader;
import org.junit.Test;
@SolrTestCaseJ4.SuppressSSL
public class TestExportTool extends SolrCloudTestCase {
@Test(expected = IllegalArgumentException.class)
public void testJsonNotValidOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat(null, "json");
}
public void testJsonlDefaultOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat(null, null);
assertEquals("jsonl", info.format);
}
public void testJsonlValidOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat(null, "jsonl");
}
public void testJavabinValidOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat(null, "javabin");
}
public void testGZJsonlValidOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat("/somedir.jsonl.gz", "jsonl");
}
public void testGZJavabinValidOutputFileFormat() throws Exception {
ExportTool.Info info = new ExportTool.MultiThreadedRunner("http://somesolr/mycollection/");
info.setOutFormat("/somedir.javabin.gz", "javabin");
}
public void testBasic() throws Exception {
String COLLECTION_NAME = "globalLoaderColl";
configureCluster(4)
.addConfig("conf", configset("cloud-dynamic"))
.configure();
try {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 2, 1)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
File.separator).getPath();
UpdateRequest ur = new UpdateRequest();
ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
int docCount = 1000;
for (int i = 0; i < docCount; i++) {
ur.add("id", String.valueOf(i),
"desc_s", TestUtil.randomSimpleString(random(), 10, 50) ,
"a_dt", "2019-09-30T05:58:03Z");
}
cluster.getSolrClient().request(ur, COLLECTION_NAME);
QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
assertEquals(docCount, qr.getResults().getNumFound());
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
ExportTool.Info info = new ExportTool.MultiThreadedRunner(url);
String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".jsonl";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("200");
info.fields = "id,desc_s,a_dt";
info.exportDocs();
assertJsonDocsCount(info, 200, record -> "2019-09-30T05:58:03Z".equals(record.get("a_dt")));
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".jsonl.gz";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("200");
info.fields = "id,desc_s,a_dt";
info.exportDocs();
assertJsonDocsCount(info, 200, record -> "2019-09-30T05:58:03Z".equals(record.get("a_dt")));
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".jsonl";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("-1");
info.fields = "id,desc_s";
info.exportDocs();
assertJsonDocsCount(info, 1000,null);
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("200");
info.fields = "id,desc_s";
info.exportDocs();
assertJavabinDocsCount(info, 200);
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
info.fields = "id,desc_s";
info.exportDocs();
assertJavabinDocsCount(info, 1000);
} finally {
cluster.shutdown();
}
}
@Nightly
public void testVeryLargeCluster() throws Exception {
String COLLECTION_NAME = "veryLargeColl";
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
try {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 8, 1)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8);
String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
File.separator).getPath();
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
int docCount = 0;
for (int j = 0; j < 4; j++) {
int bsz = 10000;
UpdateRequest ur = new UpdateRequest();
ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
for (int i = 0; i < bsz; i++) {
ur.add("id", String.valueOf((j * bsz) + i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50));
}
cluster.getSolrClient().request(ur, COLLECTION_NAME);
docCount += bsz;
}
QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
assertEquals(docCount, qr.getResults().getNumFound());
DocCollection coll = cluster.getSolrClient().getClusterStateProvider().getCollection(COLLECTION_NAME);
HashMap<String, Long> docCounts = new HashMap<>();
long totalDocsFromCores = 0;
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getLeader();
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getBaseUrl()).build()) {
long count = ExportTool.getDocCount(replica.getCoreName(), client);
docCounts.put(replica.getCoreName(), count);
totalDocsFromCores += count;
}
}
assertEquals(docCount, totalDocsFromCores);
ExportTool.MultiThreadedRunner info = null;
String absolutePath = null;
info = new ExportTool.MultiThreadedRunner(url);
info.output = System.out;
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
info.exportDocs();
assertJavabinDocsCount(info, docCount);
for (Map.Entry<String, Long> e : docCounts.entrySet()) {
assertEquals(e.getValue().longValue(), info.corehandlers.get(e.getKey()).receivedDocs.get());
}
info = new ExportTool.MultiThreadedRunner(url);
info.output = System.out;
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".jsonl";
info.setOutFormat(absolutePath, "jsonl");
info.fields = "id,desc_s";
info.setLimit("-1");
info.exportDocs();
long actual = ((ExportTool.JsonSink) info.sink).info.docsWritten.get();
assertTrue("docs written :" + actual + "docs produced : " + info.docsWritten.get(), actual >= docCount);
assertJsonDocsCount(info, docCount,null);
} finally {
cluster.shutdown();
}
}
private void assertJavabinDocsCount(ExportTool.Info info, int expected) throws IOException {
assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
FileInputStream fis = new FileInputStream(info.out);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> {
assertEquals(2, document.size());
count[0]++;
});
assertTrue(count[0] >= expected);
} finally {
fis.close();
}
}
private void assertJsonDocsCount(ExportTool.Info info, int expected, Predicate<Map<String,Object>> predicate) throws IOException {
assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
JsonRecordReader jsonReader;
Reader rdr;
jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
InputStream is = new FileInputStream(info.out);
if(info.out.endsWith(".jsonl.gz")) {
is = new GZIPInputStream(is);
}
rdr = new InputStreamReader(is, StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> {
if(predicate != null){
assertTrue(predicate.test(record));
}
count[0]++;
});
assertTrue(count[0] >= expected);
} finally {
rdr.close();
}
}
}