blob: 1a063b4eb7b2a929e16cce504cf7973ed647ba14 [file] [log] [blame]
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.PeerSync;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncStrategy {
protected final Logger log = LoggerFactory.getLogger(getClass());
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final ShardHandler shardHandler;
private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
"recoveryCmdExecutor"));
private volatile boolean isClosed;
private final static HttpClient client;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
}
public SyncStrategy() {
shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
}
private static class ShardCoreRequest extends ShardRequest {
String coreName;
public String baseUrl;
}
public boolean sync(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps) {
if (SKIP_AUTO_RECOVERY) {
return true;
}
log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
// TODO: look at our state usage of sync
// zkController.publish(core, ZkStateReader.SYNC);
// solrcloud_debug
// System.out.println("SYNC UP");
if (core.getUpdateHandler().getUpdateLog() == null) {
log.error("No UpdateLog found - cannot sync");
return false;
}
boolean success = syncReplicas(zkController, core, leaderProps);
return success;
}
private boolean syncReplicas(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps) {
boolean success = false;
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
if (isClosed) {
log.info("We have been closed, won't sync with replicas");
return false;
}
// if no one that is up is active, we are willing to wait...
// we don't want a recovering node to become leader and then
// a better candidate pops up a second later.
// int tries = 20;
// while (!areAnyReplicasActive(zkController, collection, shardId)) {
// if (tries-- == 0) {
// break;
// }
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
// }
// first sync ourselves - we are the potential leader after all
try {
success = syncWithReplicas(zkController, core, leaderProps, collection,
shardId);
} catch (Exception e) {
SolrException.log(log, "Sync Failed", e);
}
try {
if (isClosed) {
log.info("We have been closed, won't attempt to sync replicas back to leader");
return false;
}
if (success) {
log.info("Sync Success - now sync replicas to me");
syncToMe(zkController, collection, shardId, leaderProps, core.getCoreDescriptor());
} else {
log.info("Leader's attempt to sync with shard failed, moving to the next canidate");
// lets see who seems ahead...
}
} catch (Exception e) {
SolrException.log(log, "Sync Failed", e);
}
return success;
}
private boolean syncWithReplicas(ZkController zkController, SolrCore core,
ZkNodeProps props, String collection, String shardId) {
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
.getReplicaProps(collection, shardId,
zkController.getCoreNodeName(core.getCoreDescriptor()),
props.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
// I have no replicas
return true;
}
List<String> syncWith = new ArrayList<String>();
for (ZkCoreNodeProps node : nodes) {
syncWith.add(node.getCoreUrl());
}
// if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more?
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true);
return peerSync.sync();
}
private void syncToMe(ZkController zkController, String collection,
String shardId, ZkNodeProps leaderProps, CoreDescriptor cd) {
// sync everyone else
// TODO: we should do this in parallel at least
List<ZkCoreNodeProps> nodes = zkController
.getZkStateReader()
.getReplicaProps(collection, shardId,
zkController.getCoreNodeName(cd),
leaderProps.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");
return;
}
ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
for (ZkCoreNodeProps node : nodes) {
try {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": try and ask " + node.getCoreUrl() + " to sync");
requestSync(node.getBaseUrl(), node.getCoreUrl(), zkLeader.getCoreUrl(), node.getCoreName());
} catch (Exception e) {
SolrException.log(log, "Error syncing replica to leader", e);
}
}
for(;;) {
ShardResponse srsp = shardHandler.takeCompletedOrError();
if (srsp == null) break;
boolean success = handleResponse(srsp);
if (srsp.getException() != null) {
SolrException.log(log, "Sync request error: " + srsp.getException());
}
if (!success) {
try {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Sync failed - asking replica (" + srsp.getShardAddress() + ") to recover.");
if (isClosed) {
log.info("We have been closed, don't request that a replica recover");
} else {
requestRecovery(leaderProps, ((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
}
} catch (Throwable t) {
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
}
} else {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
}
}
}
private boolean handleResponse(ShardResponse srsp) {
NamedList<Object> response = srsp.getSolrResponse().getResponse();
// TODO: why does this return null sometimes?
if (response == null) {
return false;
}
Boolean success = (Boolean) response.get("sync");
if (success == null) {
success = false;
}
return success;
}
private void requestSync(String baseUrl, String replica, String leaderUrl, String coreName) {
ShardCoreRequest sreq = new ShardCoreRequest();
sreq.coreName = coreName;
sreq.baseUrl = baseUrl;
sreq.purpose = 1;
// TODO: this sucks
if (replica.startsWith("http://"))
replica = replica.substring(7);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.params = new ModifiableSolrParams();
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",Integer.toString(100));
sreq.params.set("sync",leaderUrl);
shardHandler.submit(sreq, replica, sreq.params);
}
public void close() {
this.isClosed = true;
try {
ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
}
private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
// TODO: do this in background threads
Thread thread = new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreName);
HttpSolrServer server = new HttpSolrServer(baseUrl);
server.setConnectionTimeout(45000);
server.setSoTimeout(45000);
try {
server.request(recoverRequestCmd);
} catch (Throwable t) {
SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
}
}
};
recoveryCmdExecutor.execute(thread);
}
public static ModifiableSolrParams params(String... params) {
ModifiableSolrParams msp = new ModifiableSolrParams();
for (int i = 0; i < params.length; i += 2) {
msp.add(params[i], params[i + 1]);
}
return msp;
}
}