blob: 28592efbc197043ed35f23f3b4823d47b9b11349 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solr;
import static org.apache.beam.sdk.io.solr.SolrIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
import static org.apache.beam.sdk.io.solr.SolrIOTestUtils.namedThreadIsAlive;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
import com.carrotsearch.randomizedtesting.RandomizedRunner;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.solr.SolrIOTestUtils.LenientRetryPredicate;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.security.Sha256AuthenticationProvider;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A test of {@link SolrIO} on an independent Solr instance. */
@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
@SolrTestCaseJ4.SuppressSSL
@RunWith(RandomizedRunner.class)
public class SolrIOTest extends SolrCloudTestCase {
private static final Logger LOG = LoggerFactory.getLogger(SolrIOTest.class);
private static final String SOLR_COLLECTION = "beam";
private static final int NUM_SHARDS = 3;
private static final long NUM_DOCS = 400L;
private static final int NUM_SCIENTISTS = 10;
private static final int BATCH_SIZE = 200;
private static final int DEFAULT_BATCH_SIZE = 1000;
private static AuthorizedSolrClient<CloudSolrClient> solrClient;
private static SolrIO.ConnectionConfiguration connectionConfiguration;
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(SolrIO.class);
@BeforeClass
public static void beforeClass() throws Exception {
// setup credential for solr user,
// See https://cwiki.apache.org/confluence/display/solr/Basic+Authentication+Plugin
String password = "SolrRocks";
// salt's size can be arbitrary
byte[] salt = new byte[random().nextInt(30) + 1];
random().nextBytes(salt);
String base64Salt = BaseEncoding.base64().encode(salt);
String sha56 = Sha256AuthenticationProvider.sha256(password, base64Salt);
String credential = sha56 + " " + base64Salt;
String securityJson =
"{"
+ "'authentication':{"
+ " 'blockUnknown': true,"
+ " 'class':'solr.BasicAuthPlugin',"
+ " 'credentials':{'solr':'"
+ credential
+ "'}}"
+ "}";
configureCluster(3).addConfig("conf", getFile("cloud-minimal/conf").toPath()).configure();
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
zkStateReader
.getZkClient()
.setData("/security.json", securityJson.getBytes(Charset.defaultCharset()), true);
String zkAddress = cluster.getZkServer().getZkAddress();
connectionConfiguration =
SolrIO.ConnectionConfiguration.create(zkAddress).withBasicCredentials("solr", password);
solrClient = connectionConfiguration.createClient();
SolrIOTestUtils.createCollection(SOLR_COLLECTION, NUM_SHARDS, 1, solrClient);
}
@AfterClass
public static void afterClass() throws Exception {
solrClient.close();
}
@Before
public void before() throws Exception {
SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient);
}
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testBadCredentials() throws IOException {
thrown.expect(SolrException.class);
String zkAddress = cluster.getZkServer().getZkAddress();
SolrIO.ConnectionConfiguration connectionConfiguration =
SolrIO.ConnectionConfiguration.create(zkAddress)
.withBasicCredentials("solr", "wrongpassword");
try (AuthorizedSolrClient solrClient = connectionConfiguration.createClient()) {
SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
}
}
@Test
public void testRead() throws Exception {
SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
PCollection<SolrDocument> output =
pipeline.apply(
SolrIO.read()
.withConnectionConfiguration(connectionConfiguration)
.from(SOLR_COLLECTION)
.withBatchSize(101));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_DOCS);
pipeline.run();
}
@Test
public void testReadWithQuery() throws Exception {
SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient);
PCollection<SolrDocument> output =
pipeline.apply(
SolrIO.read()
.withConnectionConfiguration(connectionConfiguration)
.from(SOLR_COLLECTION)
.withQuery("scientist:Franklin"));
PAssert.thatSingleton(output.apply("Count", Count.globally()))
.isEqualTo(NUM_DOCS / NUM_SCIENTISTS);
pipeline.run();
}
@Test
public void testWrite() throws Exception {
List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS);
SolrIO.Write write =
SolrIO.write().withConnectionConfiguration(connectionConfiguration).to(SOLR_COLLECTION);
pipeline.apply(Create.of(data)).apply(write);
pipeline.run();
long currentNumDocs = SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
assertEquals(NUM_DOCS, currentNumDocs);
QueryResponse response = solrClient.query(SOLR_COLLECTION, new SolrQuery("scientist:Lovelace"));
assertEquals(NUM_DOCS / NUM_SCIENTISTS, response.getResults().getNumFound());
}
@Test
public void testWriteWithMaxBatchSize() throws Exception {
SolrIO.Write write =
SolrIO.write()
.withConnectionConfiguration(connectionConfiguration)
.to(SOLR_COLLECTION)
.withMaxBatchSize(BATCH_SIZE);
// write bundles size is the runner decision, we cannot force a bundle size,
// so we test the Writer as a DoFn outside of a runner.
try (DoFnTester<SolrInputDocument, Void> fnTester =
DoFnTester.of(new SolrIO.Write.WriteFn(write))) {
List<SolrInputDocument> input = SolrIOTestUtils.createDocuments(NUM_DOCS);
long numDocsProcessed = 0;
long numDocsInserted = 0;
for (SolrInputDocument document : input) {
fnTester.processElement(document);
numDocsProcessed++;
// test every 100 docs to avoid overloading Solr
if ((numDocsProcessed % 100) == 0) {
// force the index to upgrade after inserting for the inserted docs
// to be searchable immediately
long currentNumDocs =
SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient);
if ((numDocsProcessed % BATCH_SIZE) == 0) {
/* bundle end */
assertEquals(
"we are at the end of a bundle, we should have inserted all processed documents",
numDocsProcessed,
currentNumDocs);
numDocsInserted = currentNumDocs;
} else {
/* not bundle end */
assertEquals(
"we are not at the end of a bundle, we should have inserted no more documents",
numDocsInserted,
currentNumDocs);
}
}
}
}
}
/**
* Test that retries are invoked when Solr returns error. We invoke this by calling a non existing
* collection, and use a strategy that will retry on any SolrException. The logger is used to
* verify expected behavior.
*/
@Test
public void testWriteRetry() throws Throwable {
thrown.expect(IOException.class);
thrown.expectMessage("Error writing to Solr");
// entry state of the release tracker to ensure we only unregister newly created objects
@SuppressWarnings("unchecked")
Set<Object> entryState = ImmutableSet.copyOf(ObjectReleaseTracker.OBJECTS.keySet());
SolrIO.Write write =
SolrIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withRetryConfiguration(
SolrIO.RetryConfiguration.create(3, Duration.standardMinutes(3))
.withRetryPredicate(new LenientRetryPredicate()))
.to("wrong-collection");
List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS);
pipeline.apply(Create.of(data)).apply(write);
try {
pipeline.run();
} catch (final Pipeline.PipelineExecutionException e) {
// Hack: await all worker threads completing (BEAM-4040)
int waitAttempts = 30; // defensive coding
while (namedThreadIsAlive("direct-runner-worker") && waitAttempts-- >= 0) {
LOG.info("Pausing to allow direct-runner-worker threads to finish");
Thread.sleep(1000);
}
// remove solrClients created by us as there are no guarantees on Teardown here
for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
if (o instanceof SolrZkClient && !entryState.contains(o)) {
LOG.info("Removing unreleased SolrZkClient");
ObjectReleaseTracker.release(o);
}
}
// check 2 retries were initiated by inspecting the log before passing on the exception
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1));
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 2));
throw e.getCause();
}
fail("Pipeline should not have run to completion");
}
/** Tests predicate performs as documented. */
@Test
public void testDefaultRetryPredicate() {
assertTrue(DEFAULT_RETRY_PREDICATE.test(new IOException("test")));
assertTrue(DEFAULT_RETRY_PREDICATE.test(new SolrServerException("test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(new SolrException(SolrException.ErrorCode.CONFLICT, "test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.SERVER_ERROR, "test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.INVALID_STATE, "test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(new SolrException(SolrException.ErrorCode.UNKNOWN, "test")));
assertTrue(
DEFAULT_RETRY_PREDICATE.test(
new HttpSolrClient.RemoteSolrException(
"localhost",
SolrException.ErrorCode.SERVICE_UNAVAILABLE.code,
"test",
new Exception())));
assertFalse(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.BAD_REQUEST, "test")));
assertFalse(
DEFAULT_RETRY_PREDICATE.test(new SolrException(SolrException.ErrorCode.FORBIDDEN, "test")));
assertFalse(
DEFAULT_RETRY_PREDICATE.test(new SolrException(SolrException.ErrorCode.NOT_FOUND, "test")));
assertFalse(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.UNAUTHORIZED, "test")));
assertFalse(
DEFAULT_RETRY_PREDICATE.test(
new SolrException(SolrException.ErrorCode.UNSUPPORTED_MEDIA_TYPE, "test")));
}
/** Tests batch size default and changed value. */
@Test
public void testBatchSize() {
SolrIO.Write write1 =
SolrIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxBatchSize(BATCH_SIZE);
assertTrue(write1.getMaxBatchSize() == BATCH_SIZE);
SolrIO.Write write2 = SolrIO.write().withConnectionConfiguration(connectionConfiguration);
assertTrue(write2.getMaxBatchSize() == DEFAULT_BATCH_SIZE);
}
}