| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus; |
| import static org.mockito.Mockito.anyBoolean; |
| import static org.mockito.Mockito.anyInt; |
| import static org.mockito.Mockito.anyString; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.xml.parsers.ParserConfigurationException; |
| |
| import org.apache.lucene.util.LuceneTestCase.Slow; |
| import org.apache.solr.SolrTestCaseJ4; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.cloud.DistributedQueue; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.impl.SolrClientCloudManager; |
| import org.apache.solr.cloud.overseer.NodeMutator; |
| import org.apache.solr.cloud.overseer.OverseerAction; |
| import org.apache.solr.cloud.overseer.ZkWriteCommand; |
| import org.apache.solr.common.AlreadyClosedException; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.cloud.ZkNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.CollectionParams; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.core.CloudConfig; |
| import org.apache.solr.core.CoreContainer; |
| import org.apache.solr.core.PluginInfo; |
| import org.apache.solr.core.SolrResourceLoader; |
| import org.apache.solr.handler.component.HttpShardHandler; |
| import org.apache.solr.handler.component.HttpShardHandlerFactory; |
| import org.apache.solr.metrics.SolrMetricManager; |
| import org.apache.solr.update.UpdateShardHandler; |
| import org.apache.solr.update.UpdateShardHandlerConfig; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.KeeperException.NodeExistsException; |
| import org.apache.zookeeper.KeeperException.SessionExpiredException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher.Event; |
| import org.apache.zookeeper.data.Stat; |
| import org.apache.zookeeper.proto.WatcherEvent; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.internal.util.reflection.FieldSetter; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.xml.sax.SAXException; |
| |
| import com.codahale.metrics.Snapshot; |
| import com.codahale.metrics.Timer; |
| |
| @Slow |
| @SolrTestCaseJ4.SuppressSSL |
| public class OverseerTest extends SolrTestCaseJ4 { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| static final int TIMEOUT = 30000; |
| |
| private static ZkTestServer server; |
| |
| private static SolrZkClient zkClient; |
| |
| |
| private volatile boolean testDone = false; |
| |
| private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>()); |
| private final List<Overseer> overseers = Collections.synchronizedList(new ArrayList<>()); |
| private final List<ZkStateReader> readers = Collections.synchronizedList(new ArrayList<>()); |
| private final List<SolrZkClient> zkClients = Collections.synchronizedList(new ArrayList<>()); |
| private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>()); |
| private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>()); |
| private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>()); |
| |
| private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME; |
| |
| public static class MockZKController{ |
| |
| private final SolrZkClient zkClient; |
| private final ZkStateReader zkStateReader; |
| private final String nodeName; |
| private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>()); |
| private List<Overseer> overseers; |
| |
| public MockZKController(String zkAddress, String nodeName, List<Overseer> overseers) throws InterruptedException, TimeoutException, IOException, KeeperException { |
| this.overseers = overseers; |
| this.nodeName = nodeName; |
| zkClient = new SolrZkClient(zkAddress, TIMEOUT); |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| zkStateReader = new ZkStateReader(zkClient); |
| zkStateReader.createClusterStateWatchersAndUpdate(); |
| |
| // live node |
| final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName; |
| zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true); |
| } |
| |
| private void deleteNode(final String path) { |
| |
| try { |
| zkClient.delete(path, -1, true); |
| } catch (NoNodeException e) { |
| // fine |
| log.warn("cancelElection did not find election node to remove"); |
| } catch (KeeperException e) { |
| fail("Unexpected KeeperException!" + e); |
| } catch (InterruptedException e) { |
| fail("Unexpected InterruptedException!" + e); |
| } |
| } |
| |
| public void close() { |
| for (ElectionContext ec : electionContext.values()) { |
| try { |
| ec.cancelElection(); |
| } catch (Exception e) { |
| log.warn(String.format(Locale.ROOT, "Error cancelling election for %s", ec.id), e); |
| } |
| } |
| deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName); |
| zkClient.close(); |
| zkStateReader.close(); |
| } |
| |
| public void createCollection(String collection, int numShards) throws Exception { |
| |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", collection, |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, numShards+"", |
| "createNodeSet", ""); |
| ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| |
| } |
| |
| public String publishState(String collection, String coreName, String coreNodeName, String shard, Replica.State stateName, int numShards, boolean startElection, Overseer overseer) |
| throws Exception { |
| if (stateName == null) { |
| ElectionContext ec = electionContext.remove(coreName); |
| if (ec != null) { |
| ec.cancelElection(); |
| } |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, nodeName, |
| ZkStateReader.CORE_NAME_PROP, coreName, |
| ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName, |
| ZkStateReader.COLLECTION_PROP, collection); |
| ZkDistributedQueue q = overseer.getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| return null; |
| } else { |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.STATE_PROP, stateName.toString(), |
| ZkStateReader.NODE_NAME_PROP, nodeName, |
| ZkStateReader.CORE_NAME_PROP, coreName, |
| ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName, |
| ZkStateReader.COLLECTION_PROP, collection, |
| ZkStateReader.SHARD_ID_PROP, shard, |
| ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards)); |
| ZkDistributedQueue q = overseer.getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| } |
| |
| if (startElection && collection.length() > 0) { |
| zkStateReader.waitForState(collection, 45000, TimeUnit.MILLISECONDS, |
| (liveNodes, collectionState) -> getShardId(collectionState, coreNodeName) != null); |
| String shardId = getShardId(collection, coreNodeName); |
| if (shardId != null) { |
| ElectionContext prevContext = electionContext.get(coreName); |
| if (prevContext != null) { |
| prevContext.cancelElection(); |
| } |
| |
| try { |
| zkClient.makePath("/collections/" + collection + "/leader_elect/" |
| + shardId + "/election", true); |
| } catch (NodeExistsException nee) {} |
| ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, nodeName, |
| ZkStateReader.CORE_NAME_PROP, coreName, |
| ZkStateReader.SHARD_ID_PROP, shardId, |
| ZkStateReader.COLLECTION_PROP, collection, |
| ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); |
| LeaderElector elector = new LeaderElector(zkClient); |
| ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase( |
| elector, shardId, collection, nodeName + coreName, props, |
| MockSolrSource.makeSimpleMock(overseer, zkStateReader, null)); |
| elector.setup(ctx); |
| electionContext.put(coreName, ctx); |
| elector.joinElection(ctx, false); |
| return shardId; |
| } |
| } |
| return null; |
| } |
| |
| private String getShardId(String collection, String coreNodeName) { |
| DocCollection dc = zkStateReader.getClusterState().getCollectionOrNull(collection); |
| return getShardId(dc, coreNodeName); |
| } |
| |
| private String getShardId(DocCollection collection, String coreNodeName) { |
| if (collection == null) return null; |
| Map<String,Slice> slices = collection.getSlicesMap(); |
| if (slices != null) { |
| for (Slice slice : slices.values()) { |
| for (Replica replica : slice.getReplicas()) { |
| String cnn = replica.getName(); |
| if (coreNodeName.equals(cnn)) { |
| return slice.getName(); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| |
| public ZkStateReader getZkReader() { |
| return zkStateReader; |
| } |
| } |
| |
| @BeforeClass |
| public static void beforeClass() throws Exception { |
| assumeWorkingMockito(); |
| |
| System.setProperty("solr.zkclienttimeout", "30000"); |
| |
| Path zkDir = createTempDir("zkData"); |
| |
| server = new ZkTestServer(zkDir); |
| server.run(); |
| |
| zkClient = server.getZkClient(); |
| |
| initCore(); |
| } |
| |
| |
| @Before |
| public void setUp() throws Exception { |
| testDone = false; |
| super.setUp(); |
| } |
| |
| @AfterClass |
| public static void afterClass() throws Exception { |
| if (null != zkClient) { |
| zkClient.printLayoutToStdOut(); |
| } |
| |
| System.clearProperty("solr.zkclienttimeout"); |
| |
| if (null != server) { |
| server.shutdown(); |
| } |
| |
| server = null; |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| testDone = true; |
| |
| ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool")); |
| |
| for (ZkController zkController : zkControllers) { |
| customThreadPool.submit(zkController::close); |
| } |
| |
| for (HttpShardHandlerFactory httpShardHandlerFactory : httpShardHandlerFactorys) { |
| customThreadPool.submit(httpShardHandlerFactory::close); |
| } |
| |
| for (UpdateShardHandler updateShardHandler : updateShardHandlers) { |
| customThreadPool.submit(updateShardHandler::close); |
| } |
| |
| for (SolrClient solrClient : solrClients) { |
| customThreadPool.submit( () -> IOUtils.closeQuietly(solrClient)); |
| } |
| |
| for (ZkStateReader reader : readers) { |
| customThreadPool.submit(reader::close); |
| } |
| |
| for (SolrZkClient solrZkClient : zkClients) { |
| customThreadPool.submit( () -> IOUtils.closeQuietly(solrZkClient)); |
| } |
| |
| ExecutorUtil.shutdownAndAwaitTermination(customThreadPool); |
| |
| customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool")); |
| |
| |
| for (Overseer overseer : overseers) { |
| customThreadPool.submit(overseer::close); |
| } |
| |
| ExecutorUtil.shutdownAndAwaitTermination(customThreadPool); |
| |
| overseers.clear(); |
| zkControllers.clear(); |
| httpShardHandlerFactorys.clear(); |
| updateShardHandlers.clear(); |
| solrClients.clear(); |
| readers.clear(); |
| zkClients.clear(); |
| |
| server.tryCleanSolrZkNode(); |
| server.makeSolrZkNode(); |
| |
| super.tearDown(); |
| } |
| |
| @Test |
| public void testShardAssignment() throws Exception { |
| |
| MockZKController mockController = null; |
| SolrZkClient overseerClient = null; |
| |
| try { |
| |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| try (ZkStateReader reader = new ZkStateReader(zkClient)) { |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| final int numShards = 6; |
| |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", COLLECTION, |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "3", |
| "createNodeSet", ""); |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| |
| for (int i = 0; i < numShards; i++) { |
| assertNotNull("shard got no id?", mockController.publishState(COLLECTION, "core" + (i + 1), "node" + (i + 1), "shard" + ((i % 3) + 1), Replica.State.ACTIVE, 3, true, overseers.get(0))); |
| } |
| |
| reader.waitForState(COLLECTION, 30, TimeUnit.SECONDS, MiniSolrCloudCluster.expectedShardsAndActiveReplicas(3, 6)); |
| |
| final Map<String, Replica> rmap = reader.getClusterState().getCollection(COLLECTION).getSlice("shard1").getReplicasMap(); |
| assertEquals(rmap.toString(), 2, rmap.size()); |
| assertEquals(rmap.toString(), 2, reader.getClusterState().getCollection(COLLECTION).getSlice("shard2").getReplicasMap().size()); |
| assertEquals(rmap.toString(), 2, reader.getClusterState().getCollection(COLLECTION).getSlice("shard3").getReplicasMap().size()); |
| |
| //make sure leaders are in cloud state |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard1", 15000)); |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard2", 15000)); |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard3", 15000)); |
| } |
| } finally { |
| if (mockController != null) { |
| mockController.close(); |
| } |
| close(overseerClient); |
| } |
| } |
| |
| @Test |
| public void testBadQueueItem() throws Exception { |
| |
| MockZKController mockController = null; |
| SolrZkClient overseerClient = null; |
| |
| try { |
| ZkController.createClusterZkNodes(zkClient); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| try (ZkStateReader reader = new ZkStateReader(zkClient)) { |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| final int numShards = 3; |
| mockController.createCollection(COLLECTION, 3); |
| for (int i = 0; i < numShards; i++) { |
| assertNotNull("shard got no id?", mockController.publishState(COLLECTION, "core" + (i + 1), |
| "node" + (i + 1), "shard" + ((i % 3) + 1), Replica.State.ACTIVE, 3, true, overseers.get(0))); |
| } |
| |
| reader.waitForState(COLLECTION, 30, TimeUnit.SECONDS, MiniSolrCloudCluster.expectedShardsAndActiveReplicas(3, 3)); |
| |
| assertEquals(1, reader.getClusterState().getCollection(COLLECTION).getSlice("shard1").getReplicasMap().size()); |
| assertEquals(1, reader.getClusterState().getCollection(COLLECTION).getSlice("shard2").getReplicasMap().size()); |
| assertEquals(1, reader.getClusterState().getCollection(COLLECTION).getSlice("shard3").getReplicasMap().size()); |
| |
| //make sure leaders are in cloud state |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard1", 15000)); |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard2", 15000)); |
| assertNotNull(reader.getLeaderUrl(COLLECTION, "shard3", 15000)); |
| |
| // publish a bad queue item |
| String emptyCollectionName = ""; |
| mockController.publishState(emptyCollectionName, "core0", "node0", "shard1", Replica.State.ACTIVE, 1, true, overseers.get(0)); |
| mockController.publishState(emptyCollectionName, "core0", "node0", "shard1", null, 1, true, overseers.get(0)); |
| |
| mockController.createCollection("collection2", 3); |
| // make sure the Overseer is still processing items |
| for (int i = 0; i < numShards; i++) { |
| assertNotNull("shard got no id?", mockController.publishState("collection2", |
| "core" + (i + 1), "node" + (i + 1), "shard" + ((i % 3) + 1), Replica.State.ACTIVE, 3, true, overseers.get(0))); |
| } |
| |
| reader.waitForState("collection2", 30, TimeUnit.SECONDS, MiniSolrCloudCluster.expectedShardsAndActiveReplicas(3, 3)); |
| |
| assertEquals(1, reader.getClusterState().getCollection("collection2").getSlice("shard1").getReplicasMap().size()); |
| assertEquals(1, reader.getClusterState().getCollection("collection2").getSlice("shard2").getReplicasMap().size()); |
| assertEquals(1, reader.getClusterState().getCollection("collection2").getSlice("shard3").getReplicasMap().size()); |
| |
| //make sure leaders are in cloud state |
| assertNotNull(reader.getLeaderUrl("collection2", "shard1", 15000)); |
| assertNotNull(reader.getLeaderUrl("collection2", "shard2", 15000)); |
| assertNotNull(reader.getLeaderUrl("collection2", "shard3", 15000)); |
| } |
| |
| } finally { |
| if (mockController != null) { |
| mockController.close(); |
| } |
| close(overseerClient); |
| } |
| } |
| |
| @Test |
| @SuppressWarnings({"try"}) |
| public void testDownNodeFailover() throws Exception { |
| MockZKController mockController = null; |
| SolrZkClient overseerClient = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| try (ZkStateReader reader = new ZkStateReader(zkClient)) { |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| try (ZkController zkController = createMockZkController(server.getZkAddress(), zkClient, reader)) { |
| |
| for (int i = 0; i < 5; i++) { |
| mockController.createCollection("collection" + i, 1); |
| assertNotNull("shard got no id?", mockController.publishState("collection" + i, "core1", |
| "core_node1", "shard1", Replica.State.ACTIVE, 1, true, overseers.get(0))); |
| } |
| } |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr"); |
| List<ZkWriteCommand> commands = new NodeMutator(null).downNode(reader.getClusterState(), m); |
| |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| verifyReplicaStatus(reader, commands.get(0).name, "shard1", "core_node1", Replica.State.DOWN); |
| overseerClient.close(); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| for (int i = 0; i < 5; i++) { |
| verifyReplicaStatus(reader, "collection" + i, "shard1", "core_node1", Replica.State.DOWN); |
| } |
| } |
| } finally { |
| if (mockController != null) { |
| mockController.close(); |
| } |
| close(overseerClient); |
| } |
| } |
| |
| //wait until collections are available |
| private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException, TimeoutException { |
| int maxIterations = 100; |
| while (0 < maxIterations--) { |
| |
| final ClusterState state = stateReader.getClusterState(); |
| Set<String> availableCollections = state.getCollectionsMap().keySet(); |
| int availableCount = 0; |
| for(String requiredCollection: collections) { |
| stateReader.waitForState(requiredCollection, 30000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null); |
| if(availableCollections.contains(requiredCollection)) { |
| availableCount++; |
| } |
| if(availableCount == collections.length) return; |
| |
| } |
| } |
| log.warn("Timeout waiting for collections: {} state: {}" |
| , Arrays.asList(collections), stateReader.getClusterState()); |
| } |
| |
| @Test |
| public void testStateChange() throws Exception { |
| |
| ZkStateReader reader = null; |
| SolrZkClient overseerClient = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", COLLECTION, |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| "createNodeSet", ""); |
| q.offer(Utils.toJSON(m)); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, "node1:8983_", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.CORE_NODE_NAME_PROP, "core_node1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| waitForCollections(reader, COLLECTION); |
| verifyReplicaStatus(reader, "collection1", "shard1", "core_node1", Replica.State.RECOVERING); |
| |
| //publish node state (active) |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, "node1:8983_", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| verifyReplicaStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE); |
| |
| } finally { |
| |
| close(overseerClient); |
| |
| close(reader); |
| } |
| } |
| |
| private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) |
| throws InterruptedException, KeeperException, TimeoutException { |
| |
| reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS, |
| (liveNodes, collectionState) -> collectionState != null |
| && expectedCore.equals((collectionState.getLeader(shard) != null) |
| ? collectionState.getLeader(shard).getStr(ZkStateReader.CORE_NAME_PROP) : null)); |
| |
| DocCollection docCollection = reader.getClusterState().getCollection(collection); |
| assertEquals("Unexpected shard leader coll:" + collection + " shard:" + shard, expectedCore, |
| (docCollection.getLeader(shard) != null) ? docCollection.getLeader(shard).getStr(ZkStateReader.CORE_NAME_PROP) |
| : null); |
| } |
| |
| private Overseer getOpenOverseer() { |
| return MiniSolrCloudCluster.getOpenOverseer(overseers); |
| } |
| |
| @Test |
| public void testOverseerFailure() throws Exception { |
| |
| SolrZkClient overseerClient = null; |
| ZkStateReader reader = null; |
| MockZKController mockController = null; |
| |
| try { |
| |
| final String core = "core1"; |
| final String core_node = "core_node1"; |
| final String shard = "shard1"; |
| final int numShards = 1; |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| mockController.createCollection(COLLECTION, 1); |
| |
| ZkController zkController = createMockZkController(server.getZkAddress(), zkClient, reader); |
| |
| mockController.publishState(COLLECTION, core, core_node, "shard1", |
| Replica.State.RECOVERING, numShards, true, overseers.get(0)); |
| |
| waitForCollections(reader, COLLECTION); |
| verifyReplicaStatus(reader, COLLECTION, "shard1", "core_node1", Replica.State.RECOVERING); |
| |
| mockController.publishState(COLLECTION, core, core_node, "shard1", Replica.State.ACTIVE, |
| numShards, true, overseers.get(0)); |
| |
| verifyReplicaStatus(reader, COLLECTION, "shard1", "core_node1", Replica.State.ACTIVE); |
| |
| mockController.publishState(COLLECTION, core, core_node, "shard1", |
| Replica.State.RECOVERING, numShards, true, overseers.get(0)); |
| |
| overseerClient.close(); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| verifyReplicaStatus(reader, COLLECTION, "shard1", "core_node1", Replica.State.RECOVERING); |
| |
| assertEquals("Live nodes count does not match", 1, reader |
| .getClusterState().getLiveNodes().size()); |
| assertEquals(shard+" replica count does not match", 1, reader.getClusterState() |
| .getCollection(COLLECTION).getSlice(shard).getReplicasMap().size()); |
| mockController.publishState(COLLECTION, core, core_node, "shard1", null, numShards, true, overseers.get(1)); |
| |
| reader.waitForState(COLLECTION, 5000, |
| TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null && collectionState.getReplica(core_node) == null); |
| |
| reader.forceUpdateCollection(COLLECTION); |
| // as of SOLR-5209 core removal does not cascade to remove the slice and collection |
| assertTrue(COLLECTION +" should remain after removal of the last core", |
| reader.getClusterState().hasCollection(COLLECTION)); |
| assertTrue(core_node+" should be gone after publishing the null state", |
| null == reader.getClusterState().getCollection(COLLECTION).getReplica(core_node)); |
| } finally { |
| close(mockController); |
| close(overseerClient); |
| close(reader); |
| } |
| } |
| |
| @Test |
| public void testOverseerStatsReset() throws Exception { |
| ZkStateReader reader = null; |
| MockZKController mockController = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| LeaderElector overseerElector = new LeaderElector(zkClient); |
| if (overseers.size() > 0) { |
| overseers.get(overseers.size() -1).close(); |
| overseers.get(overseers.size() -1).getZkStateReader().getZkClient().close(); |
| } |
| ZkController zkController = createMockZkController(server.getZkAddress(), zkClient, reader); |
| |
| UpdateShardHandler updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT); |
| updateShardHandlers.add(updateShardHandler); |
| HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); |
| httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap())); |
| httpShardHandlerFactorys.add(httpShardHandlerFactory); |
| Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController, |
| new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build()); |
| overseers.add(overseer); |
| ElectionContext ec = new OverseerElectionContext(zkClient, overseer, |
| server.getZkAddress().replaceAll("/", "_")); |
| overseerElector.setup(ec); |
| overseerElector.joinElection(ec, false); |
| |
| mockController.createCollection(COLLECTION, 1); |
| |
| mockController.publishState(COLLECTION, "core1", "core_node1", "shard1", Replica.State.ACTIVE, 1, true, overseers.get(0)); |
| |
| assertNotNull(overseer.getStats()); |
| assertTrue((overseer.getStats().getSuccessCount(OverseerAction.STATE.toLower())) > 0); |
| |
| // shut it down |
| overseer.close(); |
| ec.cancelElection(); |
| |
| // start it again |
| overseerElector.setup(ec); |
| overseerElector.joinElection(ec, false); |
| assertNotNull(overseer.getStats()); |
| assertEquals(0, (overseer.getStats().getSuccessCount(OverseerAction.STATE.toLower()))); |
| |
| } finally { |
| close(mockController); |
| close(reader); |
| } |
| } |
| |
| private AtomicInteger killCounter = new AtomicInteger(); |
| |
| private class OverseerRestarter implements Runnable{ |
| SolrZkClient overseerClient = null; |
| public volatile boolean run = true; |
| private final String zkAddress; |
| |
| public OverseerRestarter(String zkAddress) { |
| this.zkAddress = zkAddress; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| overseerClient = electNewOverseer(zkAddress); |
| while (run) { |
| if (killCounter.get()>0) { |
| try { |
| killCounter.decrementAndGet(); |
| log.info("Killing overseer."); |
| overseerClient.close(); |
| overseerClient = electNewOverseer(zkAddress); |
| } catch (Throwable e) { |
| // e.printStackTrace(); |
| } |
| } |
| try { |
| Thread.sleep(100); |
| } catch (Throwable e) { |
| // e.printStackTrace(); |
| } |
| } |
| } catch (Throwable t) { |
| // ignore |
| } finally { |
| if (overseerClient != null) { |
| try { |
| // overseerClient.close(); |
| } catch (Throwable t) { |
| // ignore |
| } |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testExceptionWhenFlushClusterState() throws Exception { |
| |
| SolrZkClient overseerClient = null; |
| ZkStateReader reader = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| // We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate |
| ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", "collection1", |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| DocCollection.STATE_FORMAT, "2", |
| "createNodeSet", ""); |
| ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", "collection2", |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| DocCollection.STATE_FORMAT, "1", |
| "createNodeSet", ""); |
| ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats()); |
| workQueue.offer(Utils.toJSON(badMessage)); |
| workQueue.offer(Utils.toJSON(goodMessage)); |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| waitForCollections(reader, "collection2"); |
| |
| ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue(); |
| q.offer(Utils.toJSON(badMessage)); |
| q.offer(Utils.toJSON(goodMessage.plus("name", "collection3"))); |
| waitForCollections(reader, "collection2", "collection3"); |
| assertNotNull(reader.getClusterState().getCollectionOrNull("collection2")); |
| assertNotNull(reader.getClusterState().getCollectionOrNull("collection3")); |
| |
| TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while(!timeOut.hasTimedOut()) { |
| if (q.peek() == null) { |
| break; |
| } |
| Thread.sleep(50); |
| } |
| |
| assertTrue(showQpeek(workQueue), workQueue.peek() == null); |
| assertTrue(showQpeek(q), q.peek() == null); |
| } finally { |
| close(overseerClient); |
| close(reader); |
| } |
| } |
| |
| private String showQpeek(ZkDistributedQueue q) throws KeeperException, InterruptedException { |
| if (q == null) { |
| return ""; |
| } |
| byte[] bytes = q.peek(); |
| if (bytes == null) { |
| return ""; |
| } |
| |
| ZkNodeProps json = ZkNodeProps.load(bytes); |
| return json.toString(); |
| } |
| |
| |
| @Test |
| public void testShardLeaderChange() throws Exception { |
| ZkStateReader reader = null; |
| MockZKController mockController = null; |
| MockZKController mockController2 = null; |
| OverseerRestarter killer = null; |
| Thread killerThread = null; |
| |
| try { |
| ZkController.createClusterZkNodes(zkClient); |
| |
| killer = new OverseerRestarter(server.getZkAddress()); |
| killerThread = new Thread(killer); |
| killerThread.start(); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| UpdateShardHandler updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT); |
| updateShardHandlers.add(updateShardHandler); |
| HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); |
| httpShardHandlerFactorys.add(httpShardHandlerFactory); |
| |
| electNewOverseer(server.getZkAddress()); |
| |
| for (int i = 0; i < atLeast(4); i++) { |
| killCounter.incrementAndGet(); // for each round allow 1 kill |
| |
| mockController = new MockZKController(server.getZkAddress(), "node1:8983_", overseers); |
| |
| TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| try { |
| mockController.createCollection(COLLECTION, 1); |
| break; |
| } catch (SolrException | KeeperException | AlreadyClosedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| try { |
| mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE, |
| 1, true, getOpenOverseer()); |
| break; |
| } catch (SolrException | KeeperException | AlreadyClosedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| if (mockController2 != null) { |
| mockController2.close(); |
| mockController2 = null; |
| } |
| |
| Thread.sleep(100); |
| |
| timeout = new TimeOut(1, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| try { |
| mockController.publishState(COLLECTION, "core1", "node1", "shard1", |
| Replica.State.RECOVERING, 1, true, getOpenOverseer()); |
| break; |
| } catch (SolrException | AlreadyClosedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| mockController2 = new MockZKController(server.getZkAddress(), "node2:8984_", overseers); |
| |
| timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| try { |
| mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE, |
| 1, true, getOpenOverseer()); |
| break; |
| } catch (SolrException | AlreadyClosedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| verifyShardLeader(reader, COLLECTION, "shard1", "core1"); |
| |
| |
| timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| try { |
| mockController2.publishState(COLLECTION, "core4", "node2", "shard1", Replica.State.ACTIVE, |
| 1, true, getOpenOverseer()); |
| break; |
| } catch (SolrException | AlreadyClosedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| |
| mockController.close(); |
| mockController = null; |
| |
| ZkController zkController = createMockZkController(server.getZkAddress(), null, reader); |
| zkControllers.add(zkController); |
| |
| TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| timeOut.waitFor("Timed out waiting to see core4 as leader", () -> { |
| |
| ZkCoreNodeProps leaderProps; |
| try { |
| leaderProps = zkController.getLeaderProps(COLLECTION, "shard1", 1000, false); |
| } catch (SolrException e) { |
| return false; |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } catch (SessionExpiredException e) { |
| return false; |
| } |
| if (leaderProps.getCoreName().equals("core4")) { |
| return true; |
| } |
| return false; |
| |
| }); |
| |
| } |
| |
| } finally { |
| if (killer != null) { |
| killer.run = false; |
| if (killerThread != null) { |
| killerThread.join(); |
| } |
| } |
| close(mockController); |
| |
| close(mockController2); |
| close(reader); |
| } |
| } |
| |
| @Test |
| public void testDoubleAssignment() throws Exception { |
| |
| SolrZkClient overseerClient = null; |
| ZkStateReader reader = null; |
| MockZKController mockController = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| mockController.createCollection(COLLECTION, 1); |
| |
| ZkController zkController = createMockZkController(server.getZkAddress(), zkClient, reader); |
| |
| mockController.publishState(COLLECTION, "core1", "core_node1", "shard1", Replica.State.RECOVERING, 1, true, overseers.get(0)); |
| |
| waitForCollections(reader, COLLECTION); |
| |
| verifyReplicaStatus(reader, COLLECTION, "shard1", "core_node1", Replica.State.RECOVERING); |
| |
| mockController.close(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| mockController.publishState(COLLECTION, "core1", "core_node1","shard1", Replica.State.RECOVERING, 1, true, overseers.get(0)); |
| |
| reader.forceUpdateCollection(COLLECTION); |
| ClusterState state = reader.getClusterState(); |
| |
| int numFound = 0; |
| Map<String, DocCollection> collectionsMap = state.getCollectionsMap(); |
| for (Map.Entry<String, DocCollection> entry : collectionsMap.entrySet()) { |
| DocCollection collection = entry.getValue(); |
| for (Slice slice : collection.getSlices()) { |
| if (slice.getReplicasMap().get("core_node1") != null) { |
| numFound++; |
| } |
| } |
| } |
| assertEquals("Shard was found more than once in ClusterState", 1, |
| numFound); |
| } finally { |
| close(overseerClient); |
| close(mockController); |
| close(reader); |
| } |
| } |
| |
| @Test |
| @Ignore |
| public void testPerformance() throws Exception { |
| |
| SolrZkClient overseerClient = null; |
| ZkStateReader reader = null; |
| MockZKController mockController = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| mockController = new MockZKController(server.getZkAddress(), "127.0.0.1:8983_solr", overseers); |
| |
| final int MAX_COLLECTIONS = 10, MAX_CORES = 10, MAX_STATE_CHANGES = 20000, STATE_FORMAT = 2; |
| |
| for (int i=0; i<MAX_COLLECTIONS; i++) { |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", "perf" + i, |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| "stateFormat", String.valueOf(STATE_FORMAT), |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.MAX_SHARDS_PER_NODE, "1" |
| ); |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| zkClient.makePath("/collections/perf" + i, true); |
| } |
| |
| for (int i = 0, j = 0, k = 0; i < MAX_STATE_CHANGES; i++, j++, k++) { |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString(), |
| ZkStateReader.NODE_NAME_PROP, "node1", |
| ZkStateReader.CORE_NAME_PROP, "core" + k, |
| ZkStateReader.CORE_NODE_NAME_PROP, "node1", |
| ZkStateReader.COLLECTION_PROP, "perf" + j, |
| ZkStateReader.NUM_SHARDS_PROP, "1"); |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| if (j >= MAX_COLLECTIONS - 1) j = 0; |
| if (k >= MAX_CORES - 1) k = 0; |
| if (i > 0 && i % 100 == 0) log.info("Published {} items", i); |
| } |
| |
| // let's publish a sentinel collection which we'll use to wait for overseer to complete operations |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString(), |
| ZkStateReader.NODE_NAME_PROP, "node1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.CORE_NODE_NAME_PROP, "node1", |
| ZkStateReader.COLLECTION_PROP, "perf_sentinel", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| ZkStateReader.BASE_URL_PROP, "http://" + "node1" |
| + "/solr/"); |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| q.offer(Utils.toJSON(m)); |
| |
| Timer t = new Timer(); |
| Timer.Context context = t.time(); |
| try { |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| assertTrue(overseers.size() > 0); |
| |
| reader.waitForState("perf_sentinel", 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null); |
| |
| } finally { |
| context.stop(); |
| } |
| |
| log.info("Overseer loop finished processing: "); |
| printTimingStats(t); |
| |
| Overseer overseer = overseers.get(0); |
| Stats stats = overseer.getStats(); |
| |
| String[] interestingOps = {"state", "update_state", "am_i_leader", ""}; |
| Arrays.sort(interestingOps); |
| for (Map.Entry<String, Stats.Stat> entry : stats.getStats().entrySet()) { |
| String op = entry.getKey(); |
| if (Arrays.binarySearch(interestingOps, op) < 0) |
| continue; |
| Stats.Stat stat = entry.getValue(); |
| if (log.isInfoEnabled()) { |
| log.info("op: {}, success: {}, failure: {}", op, stat.success.get(), stat.errors.get()); |
| } |
| Timer timer = stat.requestTime; |
| printTimingStats(timer); |
| } |
| |
| } finally { |
| close(overseerClient); |
| close(mockController); |
| close(reader); |
| } |
| } |
| |
| private void printTimingStats(Timer timer) { |
| Snapshot snapshot = timer.getSnapshot(); |
| if (log.isInfoEnabled()) { |
| log.info("\t avgRequestsPerSecond: {}", timer.getMeanRate()); |
| log.info("\t 5minRateRequestsPerSecond: {}", timer.getFiveMinuteRate()); // nowarn |
| log.info("\t 15minRateRequestsPerSecond: {}", timer.getFifteenMinuteRate()); // nowarn |
| log.info("\t avgTimePerRequest: {}", nsToMs(snapshot.getMean())); // nowarn |
| log.info("\t medianRequestTime: {}", nsToMs(snapshot.getMedian())); // nowarn |
| log.info("\t 75thPcRequestTime: {}", nsToMs(snapshot.get75thPercentile())); // nowarn |
| log.info("\t 95thPcRequestTime: {}", nsToMs(snapshot.get95thPercentile())); // nowarn |
| log.info("\t 99thPcRequestTime: {}", nsToMs(snapshot.get99thPercentile())); // nowarn |
| log.info("\t 999thPcRequestTime: {}", nsToMs(snapshot.get999thPercentile())); // nowarn |
| } |
| } |
| |
| private static long nsToMs(double ns) { |
| return TimeUnit.MILLISECONDS.convert((long)ns, TimeUnit.NANOSECONDS); |
| } |
| |
| private void close(MockZKController mockController) { |
| if (mockController != null) { |
| mockController.close(); |
| } |
| } |
| |
| |
| @Test |
| public void testReplay() throws Exception{ |
| |
| SolrZkClient overseerClient = null; |
| ZkStateReader reader = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| //prepopulate work queue with some items to emulate previous overseer died before persisting state |
| DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Stats()); |
| |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", COLLECTION, |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| "createNodeSet", ""); |
| queue.offer(Utils.toJSON(m)); |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| queue.offer(Utils.toJSON(m)); |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", |
| ZkStateReader.NODE_NAME_PROP, "node1:8983_", |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NAME_PROP, "core2", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| queue.offer(Utils.toJSON(m)); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| //submit to proper queue |
| queue = overseers.get(0).getStateUpdateQueue(); |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NAME_PROP, "core3", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| queue.offer(Utils.toJSON(m)); |
| |
| reader.waitForState(COLLECTION, 1000, TimeUnit.MILLISECONDS, |
| (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice("shard1") != null |
| && collectionState.getSlice("shard1").getReplicas().size() == 3); |
| |
| assertNotNull(reader.getClusterState().getCollection(COLLECTION).getSlice("shard1")); |
| assertEquals(3, reader.getClusterState().getCollection(COLLECTION).getSlice("shard1").getReplicasMap().size()); |
| } finally { |
| close(overseerClient); |
| close(reader); |
| } |
| } |
| |
| @Test |
| public void testExternalClusterStateChangeBehavior() throws Exception { |
| |
| ZkStateReader reader = null; |
| SolrZkClient overseerClient = null; |
| |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true); |
| |
| reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| |
| |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", "c1", |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| "createNodeSet", ""); |
| q.offer(Utils.toJSON(m)); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.COLLECTION_PROP, "c1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.CORE_NODE_NAME_PROP, "core_node1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| waitForCollections(reader, "c1"); |
| verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.COLLECTION_PROP, "c1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.SHARD_ID_PROP, "shard1", |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.COLLECTION_PROP, "c1", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| Stat stat = new Stat(); |
| byte[] data = zkClient.getData("/clusterstate.json", null, stat, true); |
| // Simulate an external modification |
| zkClient.setData("/clusterstate.json", data, true); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", "test", |
| ZkStateReader.NUM_SHARDS_PROP, "1", |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| DocCollection.STATE_FORMAT, "2" |
| ); |
| q.offer(Utils.toJSON(m)); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(), |
| "collection", "test", |
| ZkStateReader.SHARD_ID_PROP, "x", |
| ZkStateReader.REPLICATION_FACTOR, "1" |
| ); |
| q.offer(Utils.toJSON(m)); |
| |
| m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(), |
| "collection", "test", |
| ZkStateReader.SHARD_ID_PROP, "x", |
| ZkStateReader.CORE_NODE_NAME_PROP, "core_node1", |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.CORE_NAME_PROP, "core1", |
| ZkStateReader.STATE_PROP, Replica.State.DOWN.toString() |
| ); |
| q.offer(Utils.toJSON(m)); |
| |
| waitForCollections(reader, "test"); |
| verifyReplicaStatus(reader, "test", "x", "core_node1", Replica.State.DOWN); |
| |
| waitForCollections(reader, "c1"); |
| verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE); |
| |
| } finally { |
| close(overseerClient); |
| close(reader); |
| } |
| } |
| |
| private void close(ZkStateReader reader) { |
| if (reader != null) { |
| reader.close(); |
| } |
| } |
| |
| private void close(SolrZkClient client) throws InterruptedException { |
| if (client != null) { |
| client.close(); |
| } |
| } |
| |
| private SolrZkClient electNewOverseer(String address) |
| throws InterruptedException, TimeoutException, IOException, |
| KeeperException, ParserConfigurationException, SAXException, NoSuchFieldException, SecurityException { |
| SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT); |
| zkClients.add(zkClient); |
| ZkStateReader reader = new ZkStateReader(zkClient); |
| readers.add(reader); |
| LeaderElector overseerElector = new LeaderElector(zkClient); |
| if (overseers.size() > 0) { |
| overseers.get(0).close(); |
| overseers.get(0).getZkStateReader().getZkClient().close(); |
| } |
| UpdateShardHandler updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT); |
| updateShardHandlers.add(updateShardHandler); |
| HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); |
| httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap())); |
| httpShardHandlerFactorys.add(httpShardHandlerFactory); |
| |
| ZkController zkController = createMockZkController(address, null, reader); |
| zkControllers.add(zkController); |
| Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController, |
| new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build()); |
| overseers.add(overseer); |
| ElectionContext ec = new OverseerElectionContext(zkClient, overseer, |
| address.replaceAll("/", "_")); |
| overseerElector.setup(ec); |
| overseerElector.joinElection(ec, false); |
| return zkClient; |
| } |
| |
| private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException, SessionExpiredException { |
| ZkController zkController = mock(ZkController.class); |
| |
| if (zkClient == null) { |
| SolrZkClient newZkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT); |
| Mockito.doAnswer( |
| new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| newZkClient.close(); |
| return null; |
| }}).when(zkController).close(); |
| zkClient = newZkClient; |
| } else { |
| doNothing().when(zkController).close(); |
| } |
| |
| CoreContainer mockAlwaysUpCoreContainer = mock(CoreContainer.class, |
| Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS)); |
| SolrMetricManager mockMetricManager = mock(SolrMetricManager.class); |
| when(mockAlwaysUpCoreContainer.getMetricManager()).thenReturn(mockMetricManager); |
| when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry |
| when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader(createTempDir())); |
| FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient); |
| FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer); |
| when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer); |
| when(zkController.getZkClient()).thenReturn(zkClient); |
| when(zkController.getZkStateReader()).thenReturn(reader); |
| |
| when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod(); |
| when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod(); |
| doReturn(getCloudDataProvider(zkAddress, zkClient, reader)) |
| .when(zkController).getSolrCloudManager(); |
| return zkController; |
| } |
| |
| private SolrCloudManager getCloudDataProvider(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) { |
| CloudSolrClient client = new CloudSolrClient.Builder(Collections.singletonList(zkAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build(); |
| solrClients.add(client); |
| SolrClientCloudManager sccm = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), client); |
| sccm.getClusterStateProvider().connect(); |
| return sccm; |
| } |
| |
| @Test |
| public void testRemovalOfLastReplica() throws Exception { |
| |
| final Integer numReplicas = 1+random().nextInt(4); // between 1 and 4 replicas |
| final Integer numShards = 1+random().nextInt(4); // between 1 and 4 shards |
| |
| ZkStateReader zkStateReader = null; |
| SolrZkClient overseerClient = null; |
| try { |
| |
| ZkController.createClusterZkNodes(zkClient); |
| |
| zkStateReader = new ZkStateReader(zkClient); |
| zkStateReader.createClusterStateWatchersAndUpdate(); |
| |
| overseerClient = electNewOverseer(server.getZkAddress()); |
| |
| ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue(); |
| |
| // create collection |
| { |
| final Integer maxShardsPerNode = numReplicas * numShards; |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), |
| "name", COLLECTION, |
| ZkStateReader.NUM_SHARDS_PROP, numShards.toString(), |
| ZkStateReader.REPLICATION_FACTOR, "1", |
| ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode.toString() |
| ); |
| q.offer(Utils.toJSON(m)); |
| } |
| waitForCollections(zkStateReader, COLLECTION); |
| |
| // create nodes with state recovering |
| for (int rr = 1; rr <= numReplicas; ++rr) { |
| for (int ss = 1; ss <= numShards; ++ss) { |
| final int N = (numReplicas-rr)*numShards + ss; |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.SHARD_ID_PROP, "shard"+ss, |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NAME_PROP, "core"+N, |
| ZkStateReader.CORE_NODE_NAME_PROP, "core_node"+N, |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| } |
| } |
| // verify recovering |
| for (int rr = 1; rr <= numReplicas; ++rr) { |
| for (int ss = 1; ss <= numShards; ++ss) { |
| final int N = (numReplicas-rr)*numShards + ss; |
| verifyReplicaStatus(zkStateReader, COLLECTION, "shard"+ss, "core_node"+N, Replica.State.RECOVERING); |
| } |
| } |
| |
| // publish node states (active) |
| for (int rr = 1; rr <= numReplicas; ++rr) { |
| for (int ss = 1; ss <= numShards; ++ss) { |
| final int N = (numReplicas-rr)*numShards + ss; |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), |
| ZkStateReader.SHARD_ID_PROP, "shard"+ss, |
| ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr", |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NAME_PROP, "core"+N, |
| ZkStateReader.ROLES_PROP, "", |
| ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); |
| |
| q.offer(Utils.toJSON(m)); |
| } |
| } |
| // verify active |
| for (int rr = 1; rr <= numReplicas; ++rr) { |
| for (int ss = 1; ss <= numShards; ++ss) { |
| final int N = (numReplicas-rr)*numShards + ss; |
| verifyReplicaStatus(zkStateReader, COLLECTION, "shard"+ss, "core_node"+N, Replica.State.ACTIVE); |
| } |
| } |
| |
| // delete node |
| for (int rr = 1; rr <= numReplicas; ++rr) { |
| for (int ss = 1; ss <= numShards; ++ss) { |
| final int N = (numReplicas-rr)*numShards + ss; |
| ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), |
| ZkStateReader.COLLECTION_PROP, COLLECTION, |
| ZkStateReader.CORE_NODE_NAME_PROP, "core_node"+N); |
| |
| q.offer(Utils.toJSON(m)); |
| |
| { |
| String shard = "shard"+ss; |
| zkStateReader.waitForState(COLLECTION, 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null && (collectionState.getSlice(shard) == null || collectionState.getSlice(shard).getReplicasMap().get("core_node"+N) == null)); |
| } |
| |
| final DocCollection docCollection = zkStateReader.getClusterState().getCollection(COLLECTION); |
| assertTrue("found no "+ COLLECTION, (null != docCollection)); |
| |
| final Slice slice = docCollection.getSlice("shard"+ss); |
| assertTrue("found no "+ COLLECTION +" shard"+ss+" slice after removal of replica "+rr+" of "+numReplicas, (null != slice)); |
| |
| final Collection<Replica> replicas = slice.getReplicas(); |
| assertEquals("wrong number of "+ COLLECTION +" shard"+ss+" replicas left, replicas="+replicas, numReplicas-rr, replicas.size()); |
| } |
| } |
| |
| } finally { |
| |
| close(overseerClient); |
| close(zkStateReader); |
| } |
| } |
| |
| @Test |
| public void testLatchWatcher() throws InterruptedException { |
| OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher(); |
| long before = System.nanoTime(); |
| latch1.await(100); |
| long after = System.nanoTime(); |
| assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50); |
| assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken |
| latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar"))); |
| before = System.nanoTime(); |
| latch1.await(10000);// Expecting no wait |
| after = System.nanoTime(); |
| assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000); |
| |
| final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false); |
| final AtomicBoolean doneWaiting = new AtomicBoolean(false); |
| final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated); |
| Thread t = new Thread(()->{ |
| //Process an event of a different type first, this shouldn't release the latch |
| latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar"))); |
| |
| assertFalse("Latch shouldn't have been released", doneWaiting.get()); |
| // Now process the correct type of event |
| expectedEventProcessed.set(true); |
| latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar"))); |
| }); |
| t.start(); |
| before = System.nanoTime(); |
| latch2.await(10000); // It shouldn't wait this long, t should notify the lock |
| after = System.nanoTime(); |
| doneWaiting.set(true); |
| assertTrue(expectedEventProcessed.get()); |
| assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000); |
| } |
| |
| } |