blob: 98c2b990606fb2b04f92fac63240efd0cfb4be6e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static;
import static;
import static;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CursorMarkParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Transforms for reading and writing data from/to Solr.
* <h3>Reading from Solr</h3>
* <p>{@link SolrIO#read} returns a bounded {@link PCollection
* PCollection&lt;SolrDocument&gt;} representing Solr documents.
* <p>To configure the {@link SolrIO#read}, you have to provide a connection configuration
* containing the Zookeeper address of the Solr cluster, and the collection name. The following
* example illustrates options for configuring the source:
* <pre>{@code
* SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration.create("");
* // Optionally: .withBasicCredentials(username, password)
* PCollection<SolrDocument> docs = p.apply(
* }</pre>
* <p>You can specify a query on the {@code read()} using {@code withQuery()}.
* <h3>Writing to Solr</h3>
* <p>To write documents to Solr, use {@link SolrIO#write SolrIO.write()}, which writes Solr
* documents from a {@link PCollection PCollection&lt;SolrInputDocument&gt;} (which can be bounded
* or unbounded).
* <p>To configure {@link SolrIO#write SolrIO.write()}, similar to the read, you have to provide a
* connection configuration, and a collection name. For instance:
* <pre>{@code
* PCollection<SolrInputDocument> inputDocs = ...;
* inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn));
* }</pre>
* <p>When writing it is possible to customize the retry behavior if an error is encountered. By
* default this is disabled and only one attempt will be made.
public class SolrIO {
private static final Logger LOG = LoggerFactory.getLogger(SolrIO.class);
public static Read read() {
// 1000 for batch size is good enough in many cases,
// ex: if document size is large, around 10KB, the response's size will be around 10MB
// if document seize is small, around 1KB, the response's size will be around 1MB
return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build();
public static Write write() {
// 1000 for batch size is good enough in many cases,
// ex: if document size is large, around 10KB, the request's size will be around 10MB
// if document size is small, around 1KB, the request's size will be around 1MB
return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();
private SolrIO() {}
/** A POJO describing a connection configuration to Solr. */
public abstract static class ConnectionConfiguration implements Serializable {
abstract String getZkHost();
abstract String getUsername();
abstract String getPassword();
abstract Builder builder();
abstract static class Builder {
abstract Builder setZkHost(String zkHost);
abstract Builder setUsername(String username);
abstract Builder setPassword(String password);
abstract ConnectionConfiguration build();
* Creates a new Solr connection configuration.
* @param zkHost host of zookeeper
* @return the connection configuration object
public static ConnectionConfiguration create(String zkHost) {
checkArgument(zkHost != null, "zkHost can not be null");
return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(zkHost).build();
/** If Solr basic authentication is enabled, provide the username and password. */
public ConnectionConfiguration withBasicCredentials(String username, String password) {
checkArgument(username != null, "username can not be null");
checkArgument(!username.isEmpty(), "username can not be empty");
checkArgument(password != null, "password can not be null");
checkArgument(!password.isEmpty(), "password can not be empty");
return builder().setUsername(username).setPassword(password).build();
private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("zkHost", getZkHost()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
private HttpClient createHttpClient() {
// This is bug in Solr, if we don't create a customize HttpClient,
// UpdateRequest with commit flag will throw an authentication error.
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_BASIC_AUTH_USER, getUsername());
params.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, getPassword());
return HttpClientUtil.createClient(params);
AuthorizedSolrClient<CloudSolrClient> createClient() {
CloudSolrClient solrClient = new CloudSolrClient(getZkHost(), createHttpClient());
return new AuthorizedSolrClient<>(solrClient, this);
AuthorizedSolrClient<HttpSolrClient> createClient(String shardUrl) {
HttpSolrClient solrClient = new HttpSolrClient(shardUrl, createHttpClient());
return new AuthorizedSolrClient<>(solrClient, this);
* A POJO encapsulating a configuration for retry behavior when issuing requests to Solr. A retry
* will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes first, for
* any of the following exceptions:
* <ul>
* <li>{@link IOException}
* <li>{@link SolrServerException}
* <li>{@link SolrException} where the {@link SolrException.ErrorCode} is one of:
* <ul>
* <li>{@link SolrException.ErrorCode#CONFLICT}
* <li>{@link SolrException.ErrorCode#SERVER_ERROR}
* <li>{@link SolrException.ErrorCode#SERVICE_UNAVAILABLE}
* <li>{@link SolrException.ErrorCode#INVALID_STATE}
* <li>{@link SolrException.ErrorCode#UNKNOWN}
* </ul>
* </ul>
public abstract static class RetryConfiguration implements Serializable {
static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();
abstract int getMaxAttempts();
abstract Duration getMaxDuration();
abstract RetryPredicate getRetryPredicate();
abstract Builder builder();
abstract static class Builder {
abstract SolrIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts);
abstract SolrIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration);
abstract SolrIO.RetryConfiguration.Builder setRetryPredicate(RetryPredicate retryPredicate);
abstract SolrIO.RetryConfiguration build();
public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
"maxDuration must be greater than 0");
return new AutoValue_SolrIO_RetryConfiguration.Builder()
// Exposed only to allow tests to easily simulate server errors
RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
checkArgument(predicate != null, "predicate must be provided");
return builder().setRetryPredicate(predicate).build();
* An interface used to control if we retry the Solr call when a {@link Throwable} occurs. If
* {@link RetryPredicate#test(Object)} returns true, {@link Write} tries to resend the requests
* to the Solr server if the {@link RetryConfiguration} permits it.
interface RetryPredicate extends Predicate<Throwable>, Serializable {}
/** This is the default predicate used to test if a failed Solr operation should be retried. */
private static class DefaultRetryPredicate implements RetryPredicate {
private static final ImmutableSet<Integer> ELIGIBLE_CODES =
public boolean test(Throwable t) {
return (t instanceof IOException
|| t instanceof SolrServerException
|| (t instanceof SolrException && ELIGIBLE_CODES.contains(((SolrException) t).code())));
/** A {@link PTransform} reading data from Solr. */
public abstract static class Read extends PTransform<PBegin, PCollection<SolrDocument>> {
private static final long MAX_BATCH_SIZE = 10000L;
abstract ConnectionConfiguration getConnectionConfiguration();
abstract String getCollection();
abstract String getQuery();
abstract int getBatchSize();
abstract Builder builder();
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
abstract Builder setQuery(String query);
abstract Builder setBatchSize(int batchSize);
abstract Builder setCollection(String collection);
abstract Read build();
/** Provide the Solr connection configuration object. */
public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
return builder().setConnectionConfiguration(connectionConfiguration).build();
* Provide name of collection while reading from Solr.
* @param collection the collection toward which the requests will be issued
public Read from(String collection) {
checkArgument(collection != null, "collection can not be null");
return builder().setCollection(collection).build();
* Provide a query used while reading from Solr.
* @param query the query. See <a
* href="">Solr
* Query </a>
public Read withQuery(String query) {
checkArgument(query != null, "query can not be null");
checkArgument(!query.isEmpty(), "query can not be empty");
return builder().setQuery(query).build();
* Provide a size for the cursor read. See <a
* href="">cursor API</a>
* Default is 100. Maximum is 10 000. If documents are small, increasing batch size might
* improve read performance. If documents are big, you might need to decrease batchSize
* @param batchSize number of documents read in each scroll read
Read withBatchSize(int batchSize) {
// TODO remove this configuration, we can figure out the best number
// by tuning batchSize when pipelines run.
batchSize > 0 && batchSize < MAX_BATCH_SIZE,
"Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ",
return builder().setBatchSize(batchSize).build();
public PCollection<SolrDocument> expand(PBegin input) {
getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
checkArgument(getCollection() != null, "from() is required");
return input
.apply("Create", Create.of(this))
.apply("Split", ParDo.of(new SplitFn()))
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read", ParDo.of(new ReadFn()));
public void populateDisplayData(DisplayData.Builder builder) {
builder.addIfNotNull(DisplayData.item("query", getQuery()));
/** A POJO describing a replica of Solr. */
abstract static class ReplicaInfo implements Serializable {
public abstract String coreName();
public abstract String coreUrl();
public abstract String baseUrl();
static ReplicaInfo create(Replica replica) {
return new AutoValue_SolrIO_ReplicaInfo(
static class SplitFn extends DoFn<SolrIO.Read, KV<Read, ReplicaInfo>> {
public void process(@Element SolrIO.Read spec, OutputReceiver<KV<Read, ReplicaInfo>> out)
throws IOException {
ConnectionConfiguration connectionConfig = spec.getConnectionConfiguration();
try (AuthorizedSolrClient<CloudSolrClient> client = connectionConfig.createClient()) {
String collection = spec.getCollection();
final ClusterState clusterState = AuthorizedSolrClient.getClusterState(client);
DocCollection docCollection = clusterState.getCollection(collection);
for (Slice slice : docCollection.getSlices()) {
ArrayList<Replica> replicas = new ArrayList<>(slice.getReplicas());
// Load balancing by randomly picking an active replica
Replica randomActiveReplica = null;
for (Replica replica : replicas) {
// We need to check both state of the replica and live nodes
// to make sure that the replica is alive
if (replica.getState() == Replica.State.ACTIVE
&& clusterState.getLiveNodes().contains(replica.getNodeName())) {
randomActiveReplica = replica;
// TODO in case of this replica goes inactive while the pipeline runs.
// We should pick another active replica of this shard.
randomActiveReplica != null,
"Can not found an active replica for slice %s",
out.output(KV.of(spec, ReplicaInfo.create(checkNotNull(randomActiveReplica))));
static class ReadFn extends DoFn<KV<SolrIO.Read, ReplicaInfo>, SolrDocument> {
public void process(
@Element KV<SolrIO.Read, ReplicaInfo> specAndReplica, OutputReceiver<SolrDocument> out)
throws IOException {
Read spec = specAndReplica.getKey();
ReplicaInfo replica = specAndReplica.getValue();
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
String query = spec.getQuery();
if (query == null) {
query = "*:*";
SolrQuery solrQuery = new SolrQuery(query);
try (AuthorizedSolrClient<HttpSolrClient> client =
spec.getConnectionConfiguration().createClient(replica.baseUrl())) {
SchemaRequest.UniqueKey request = new SchemaRequest.UniqueKey();
try {
SchemaResponse.UniqueKeyResponse response = client.process(spec.getCollection(), request);
solrQuery.addSort(response.getUniqueKey(), SolrQuery.ORDER.asc);
} catch (SolrServerException e) {
throw new IOException("Can not get unique key from solr", e);
while (true) {
solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
try {
QueryResponse response;
response = client.query(replica.coreName(), solrQuery);
if (cursorMark.equals(response.getNextCursorMark())) {
cursorMark = response.getNextCursorMark();
for (SolrDocument doc : response.getResults()) {
} catch (SolrServerException e) {
throw new IOException(e);
/** A {@link PTransform} writing data to Solr. */
public abstract static class Write extends PTransform<PCollection<SolrInputDocument>, PDone> {
abstract ConnectionConfiguration getConnectionConfiguration();
abstract String getCollection();
abstract int getMaxBatchSize();
abstract Builder builder();
abstract RetryConfiguration getRetryConfiguration();
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
abstract Builder setCollection(String collection);
abstract Builder setMaxBatchSize(int maxBatchSize);
abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);
abstract Write build();
/** Provide the Solr connection configuration object. */
public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
return builder().setConnectionConfiguration(connectionConfiguration).build();
* Provide name of collection while reading from Solr.
* @param collection the collection toward which the requests will be issued
public Write to(String collection) {
checkArgument(collection != null, "collection can not be null");
return builder().setCollection(collection).build();
* Provide a maximum size in number of documents for the batch. Depending on the execution
* engine, size of bundles may vary, this sets the maximum size. Change this if you need to have
* smaller batch. Default max batch size is 1000.
* @param batchSize maximum batch size in number of documents
public Write withMaxBatchSize(int batchSize) {
checkArgument(batchSize > 0, "batchSize must be larger than 0, but was: %s", batchSize);
return builder().setMaxBatchSize(batchSize).build();
* Provides configuration to retry a failed batch call to Solr. A batch is considered as failed
* if the underlying {@link CloudSolrClient} surfaces {@link
* org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException}, {@link
* SolrServerException} or {@link IOException}. Users should consider that retrying might
* compound the underlying problem which caused the initial failure. Users should also be aware
* that once retrying is exhausted the error is surfaced to the runner which <em>may</em> then
* opt to retry the current partition in entirety or abort if the max number of retries of the
* runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of
* 5 seconds and then surfacing the error once the maximum number of retries or maximum
* configuration duration is exceeded.
* <p>Example use:
* <pre>{@code
* SolrIO.write()
* .withRetryConfiguration(SolrIO.RetryConfiguration.create(10, Duration.standardMinutes(3))
* ...
* }</pre>
* @param retryConfiguration the rules which govern the retry behavior
* @return the {@link Write} with retrying configured
public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is required");
return builder().setRetryConfiguration(retryConfiguration).build();
public PDone expand(PCollection<SolrInputDocument> input) {
checkState(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
checkState(getCollection() != null, "to() is required");
input.apply(ParDo.of(new WriteFn(this)));
static class WriteFn extends DoFn<SolrInputDocument, Void> {
static final String RETRY_ATTEMPT_LOG = "Error writing to Solr. Retry attempt[%d]";
private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);
private transient FluentBackoff retryBackoff; // defaults to no retrying
private final Write spec;
private transient AuthorizedSolrClient solrClient;
private Collection<SolrInputDocument> batch;
WriteFn(Write spec) {
this.spec = spec;
public void setup() throws Exception {
solrClient = spec.getConnectionConfiguration().createClient();
retryBackoff =
.withMaxRetries(0) // default to no retrying
if (spec.getRetryConfiguration() != null) {
// FluentBackoff counts retries excluding the original while we count attempts
// to remove ambiguity (hence the -1)
retryBackoff =
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
public void startBundle(StartBundleContext context) {
batch = new ArrayList<>();
public void processElement(ProcessContext context) throws Exception {
SolrInputDocument document = context.element();
if (batch.size() >= spec.getMaxBatchSize()) {
public void finishBundle(FinishBundleContext context) throws Exception {
// Flushes the batch, implementing the retry mechanism as configured in the spec.
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
try {
UpdateRequest updateRequest = new UpdateRequest();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
while (true) {
try {
solrClient.process(spec.getCollection(), updateRequest);
} catch (Exception exception) {
// fail immediately if no retry configuration doesn't handle this
if (spec.getRetryConfiguration() == null
|| !spec.getRetryConfiguration().getRetryPredicate().test(exception)) {
throw new IOException(
"Error writing to Solr (no attempt made to retry)", exception);
// see if we can pause and try again
if (!, backoff)) {
throw new IOException(
"Error writing to Solr after %d attempt(s). No more attempts allowed",
} else {
// Note: this used in test cases to verify behavior
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), exception);
} finally {
public void closeClient() throws IOException {
if (solrClient != null) {