blob: 52d22f0f69ecacb8ebf1eb1943d29f9609d3237f [file] [log] [blame]
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public abstract class ElectionContext {
private static Logger log = LoggerFactory.getLogger(ElectionContext.class);
final String electionPath;
final ZkNodeProps leaderProps;
final String id;
final String leaderPath;
String leaderSeqPath;
private SolrZkClient zkClient;
public ElectionContext(final String coreNodeName,
final String electionPath, final String leaderPath, final ZkNodeProps leaderProps, final SolrZkClient zkClient) {
this.id = coreNodeName;
this.electionPath = electionPath;
this.leaderPath = leaderPath;
this.leaderProps = leaderProps;
this.zkClient = zkClient;
}
public void close() {}
public void cancelElection() throws InterruptedException, KeeperException {
try {
zkClient.delete(leaderSeqPath, -1, true);
} catch (NoNodeException e) {
// fine
log.warn("cancelElection did not find election node to remove");
}
}
abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
}
class ShardLeaderElectionContextBase extends ElectionContext {
private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContextBase.class);
protected final SolrZkClient zkClient;
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
public ShardLeaderElectionContextBase(LeaderElector leaderElector, final String shardId,
final String collection, final String coreNodeName, ZkNodeProps props, ZkStateReader zkStateReader) {
super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+ shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
props, zkStateReader.getZkClient());
this.leaderElector = leaderElector;
this.zkClient = zkStateReader.getZkClient();
this.shardId = shardId;
this.collection = collection;
}
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, ZkStateReader.LEADER_PROP,
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
}
}
// add core container and stop passing core around...
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContext.class);
private ZkController zkController;
private CoreContainer cc;
private SyncStrategy syncStrategy = new SyncStrategy();
private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
super(leaderElector, shardId, collection, coreNodeName, props,
zkController.getZkStateReader());
this.zkController = zkController;
this.cc = cc;
}
@Override
public void close() {
this.isClosed = true;
syncStrategy.close();
}
/*
* weAreReplacement: has someone else been the leader already?
*/
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
log.info("Running the leader process for shard " + shardId);
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
// clear the leader in clusterstate
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkStateReader.LEADER_PROP,
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
String leaderVoteWait = cc.getZkController().getLeaderVoteWait();
if (!weAreReplacement && leaderVoteWait != null) {
waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
}
SolrCore core = null;
try {
core = cc.getCore(coreName);
if (core == null) {
cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR,
"Fatal Error, SolrCore not found:" + coreName + " in "
+ cc.getCoreNames());
}
// should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
rejoinLeaderElection(leaderSeqPath, core);
return;
}
log.info("I may be the new leader - try and sync");
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
boolean success = false;
try {
success = syncStrategy.sync(zkController, core, leaderProps);
} catch (Throwable t) {
SolrException.log(log, "Exception while trying to sync", t);
success = false;
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
try {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
} finally {
recentUpdates.close();
}
}
if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway
log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
// if !success but no one else is in active mode,
// we are the leader anyway
// TODO: should we also be leader if there is only one other active?
// if we couldn't sync with it, it shouldn't be able to sync with us
// TODO: this needs to be moved to the election context - the logic does
// not belong here.
if (!success
&& !areAnyOtherReplicasActive(zkController, leaderProps, collection,
shardId)) {
log.info("Sync was not a success but no one else is active! I am the leader");
success = true;
}
// solrcloud_debug
// try {
// RefCounted<SolrIndexSearcher> searchHolder =
// core.getNewestSearcher(false);
// SolrIndexSearcher searcher = searchHolder.get();
// try {
// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
// + " synched "
// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
// } finally {
// searchHolder.decref();
// }
// } catch (Exception e) {
//
// }
if (!success) {
rejoinLeaderElection(leaderSeqPath, core);
return;
}
log.info("I am the new leader: "
+ ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
} finally {
if (core != null) {
core.close();
}
}
try {
super.runLeaderProcess(weAreReplacement);
} catch (Throwable t) {
SolrException.log(log, "There was a problem trying to register as the leader", t);
cancelElection();
try {
core = cc.getCore(coreName);
if (core == null) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Fatal Error, SolrCore not found:" + coreName + " in "
+ cc.getCoreNames());
}
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - rejoin election
rejoinLeaderElection(leaderSeqPath, core);
} finally {
if (core != null) {
core.close();
}
}
}
}
private boolean areAnyOtherReplicasActive(ZkController zkController,
ZkNodeProps leaderProps, String collection, String shardId) {
ClusterState clusterState = zkController.getZkStateReader()
.getClusterState();
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
Slice slice = slices.get(shardId);
if (!slice.getState().equals(Slice.ACTIVE)) {
//Return false if the Slice is not active yet.
return false;
}
Map<String,Replica> replicasMap = slice.getReplicasMap();
for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
// System.out.println("state:"
// + state
// + shard.getValue().get(ZkStateReader.NODE_NAME_PROP)
// + " live: "
// + clusterState.liveNodesContain(shard.getValue().get(
// ZkStateReader.NODE_NAME_PROP)));
if (state.equals(ZkStateReader.ACTIVE)
&& clusterState.liveNodesContain(shard.getValue().getStr(
ZkStateReader.NODE_NAME_PROP))
&& !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
return true;
}
}
return false;
}
private void waitForReplicasToComeUp(boolean weAreReplacement,
String leaderVoteWait) throws InterruptedException {
int timeout = Integer.parseInt(leaderVoteWait);
long timeoutAt = System.currentTimeMillis() + timeout;
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
int cnt = 0;
while (true && !isClosed && !cc.isShutDown()) {
// wait for everyone to be up
if (slices != null) {
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
SolrException.log(log,
"Error checking for the number of election participants", e);
}
// on startup and after connection timeout, wait for all known shards
if (found >= slices.getReplicasMap().size()) {
log.info("Enough replicas found to continue.");
return;
} else {
if (cnt % 40 == 0) {
log.info("Waiting until we see more replicas up for shard " + shardId + ": total="
+ slices.getReplicasMap().size() + " found=" + found
+ " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
}
}
if (System.currentTimeMillis() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
return;
}
} else {
log.warn("Shard not found: " + shardId + " for collection " + collection);
return;
}
Thread.sleep(500);
slices = zkController.getClusterState().getSlice(collection, shardId);
// System.out.println("###### waitForReplicasToComeUp : slices=" + slices + " all=" + zkController.getClusterState().getCollectionStates() );
cnt++;
}
}
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
if (cc.isShutDown()) {
log.info("Not rejoining election because CoreContainer is shutdown");
return;
}
log.info("There may be a better leader candidate than us - going back into recovery");
cancelElection();
try {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
} catch (Throwable t) {
SolrException.log(log, "Error trying to start recovery", t);
}
leaderElector.joinElection(this, true);
}
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
log.info("Checking if I should try and be the leader.");
if (isClosed) {
log.info("Bailing on leader process because we have been closed");
return false;
}
if (!weAreReplacement) {
// we are the first node starting in the shard - there is a configurable wait
// to make sure others participate in sync and leader election, we can be leader
return true;
}
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
.equals(ZkStateReader.ACTIVE)) {
log.info("My last published State was Active, it's okay to be the leader.");
return true;
}
log.info("My last published State was "
+ core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
+ ", I won't be the leader.");
// TODO: and if no one is a good candidate?
return false;
}
}
final class OverseerElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
private Overseer overseer;
public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
}
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException {
final String id = leaderSeqPath
.substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps("id", id);
zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps),
CreateMode.EPHEMERAL, true);
overseer.start(id);
}
}