| package org.apache.solr.cloud; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import java.io.File; |
| import java.net.ServerSocket; |
| import java.net.URI; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.lucene.util.LuceneTestCase.AwaitsFix; |
| import org.apache.lucene.util.LuceneTestCase.Slow; |
| import org.apache.solr.JSONTestUtil; |
| import org.apache.solr.SolrTestCaseJ4.SuppressSSL; |
| import org.apache.solr.client.solrj.embedded.JettySolrRunner; |
| import org.apache.solr.client.solrj.impl.HttpSolrServer; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkCoreNodeProps; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.util.NamedList; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Simulates HTTP partitions between a leader and replica but the replica does |
| * not lose its ZooKeeper connection. |
| */ |
| @Slow |
| @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") |
| @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241") |
| public class HttpPartitionTest extends AbstractFullDistribZkTestBase { |
| |
| private static final transient Logger log = |
| LoggerFactory.getLogger(HttpPartitionTest.class); |
| |
| // To prevent the test assertions firing too fast before cluster state |
| // recognizes (and propagates) partitions |
| private static final long sleepMsBeforeHealPartition = 2000L; |
| |
| private static final int maxWaitSecsToSeeAllActive = 30; |
| |
| private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>(); |
| |
| public HttpPartitionTest() { |
| super(); |
| sliceCount = 2; |
| shardCount = 2; |
| } |
| |
| @Before |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| System.setProperty("numShards", Integer.toString(sliceCount)); |
| } |
| |
| @Override |
| @After |
| public void tearDown() throws Exception { |
| System.clearProperty("numShards"); |
| |
| try { |
| super.tearDown(); |
| } catch (Exception exc) {} |
| |
| resetExceptionIgnores(); |
| |
| // close socket proxies after super.tearDown |
| if (!proxies.isEmpty()) { |
| for (SocketProxy proxy : proxies.values()) { |
| proxy.close(); |
| } |
| } |
| } |
| |
| /** |
| * Overrides the parent implementation so that we can configure a socket proxy |
| * to sit infront of each Jetty server, which gives us the ability to simulate |
| * network partitions without having to fuss with IPTables (which is not very |
| * cross platform friendly). |
| */ |
| @Override |
| public JettySolrRunner createJetty(File solrHome, String dataDir, |
| String shardList, String solrConfigOverride, String schemaOverride) |
| throws Exception { |
| |
| JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, |
| 0, solrConfigOverride, schemaOverride, false, |
| getExtraServlets(), sslConfig, getExtraRequestFilters()); |
| jetty.setShards(shardList); |
| jetty.setDataDir(getDataDir(dataDir)); |
| |
| // setup to proxy Http requests to this server unless it is the control |
| // server |
| int proxyPort = getNextAvailablePort(); |
| jetty.setProxyPort(proxyPort); |
| jetty.start(); |
| |
| // create a socket proxy for the jetty server ... |
| SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); |
| proxies.put(proxy.getUrl(), proxy); |
| |
| return jetty; |
| } |
| |
| protected int getNextAvailablePort() throws Exception { |
| int port = -1; |
| try (ServerSocket s = new ServerSocket(0)) { |
| port = s.getLocalPort(); |
| } |
| return port; |
| } |
| |
| @Override |
| public void doTest() throws Exception { |
| waitForThingsToLevelOut(30000); |
| |
| // test a 1x2 collection |
| testRf2(); |
| |
| // now do similar for a 1x3 collection while taking 2 replicas on-and-off |
| // each time |
| testRf3(); |
| |
| // kill a leader and make sure recovery occurs as expected |
| testRf3WithLeaderFailover(); |
| } |
| |
| protected void testRf2() throws Exception { |
| // create a collection that has 1 shard but 2 replicas |
| String testCollectionName = "c8n_1x2"; |
| createCollection(testCollectionName, 1, 2, 1); |
| cloudClient.setDefaultCollection(testCollectionName); |
| |
| sendDoc(1); |
| |
| Replica notLeader = |
| ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive).get(0); |
| |
| // ok, now introduce a network partition between the leader and the replica |
| SocketProxy proxy = getProxyForReplica(notLeader); |
| |
| proxy.close(); |
| |
| // indexing during a partition |
| sendDoc(2); |
| |
| // Have the partition last at least 1 sec |
| // While this gives the impression that recovery is timing related, this is |
| // really only |
| // to give time for the state to be written to ZK before the test completes. |
| // In other words, |
| // without a brief pause, the test finishes so quickly that it doesn't give |
| // time for the recovery process to kick-in |
| Thread.sleep(sleepMsBeforeHealPartition); |
| |
| proxy.reopen(); |
| |
| List<Replica> notLeaders = |
| ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive); |
| |
| sendDoc(3); |
| |
| // sent 3 docs in so far, verify they are on the leader and replica |
| assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3); |
| |
| // now up the stakes and do more docs |
| int numDocs = 1000; |
| boolean hasPartition = false; |
| for (int d = 0; d < numDocs; d++) { |
| // create / restore partition every 100 docs |
| if (d % 100 == 0) { |
| if (hasPartition) { |
| proxy.reopen(); |
| hasPartition = false; |
| } else { |
| if (d >= 100) { |
| proxy.close(); |
| hasPartition = true; |
| Thread.sleep(sleepMsBeforeHealPartition); |
| } |
| } |
| } |
| sendDoc(d + 4); // 4 is offset as we've already indexed 1-3 |
| } |
| |
| // restore connectivity if lost |
| if (hasPartition) { |
| proxy.reopen(); |
| } |
| |
| notLeaders = ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive); |
| |
| // verify all docs received |
| assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3); |
| |
| // try to clean up |
| try { |
| CollectionAdminRequest req = new CollectionAdminRequest.Delete(); |
| req.setCollectionName(testCollectionName); |
| req.process(cloudClient); |
| } catch (Exception e) { |
| // don't fail the test |
| log.warn("Could not delete collection {} after test completed", testCollectionName); |
| } |
| } |
| |
| protected void testRf3() throws Exception { |
| // create a collection that has 1 shard but 2 replicas |
| String testCollectionName = "c8n_1x3"; |
| createCollection(testCollectionName, 1, 3, 1); |
| cloudClient.setDefaultCollection(testCollectionName); |
| |
| sendDoc(1); |
| |
| List<Replica> notLeaders = |
| ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); |
| assertTrue("Expected 2 replicas for collection " + testCollectionName |
| + " but found " + notLeaders.size() + "; clusterState: " |
| + printClusterStateInfo(), |
| notLeaders.size() == 2); |
| |
| sendDoc(1); |
| |
| // ok, now introduce a network partition between the leader and the replica |
| SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0)); |
| |
| proxy0.close(); |
| |
| // indexing during a partition |
| sendDoc(2); |
| |
| Thread.sleep(sleepMsBeforeHealPartition); |
| |
| proxy0.reopen(); |
| |
| SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1)); |
| |
| proxy1.close(); |
| |
| sendDoc(3); |
| |
| Thread.sleep(sleepMsBeforeHealPartition); |
| proxy1.reopen(); |
| |
| // sent 4 docs in so far, verify they are on the leader and replica |
| notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); |
| |
| sendDoc(4); |
| |
| assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4); |
| // try to clean up |
| try { |
| CollectionAdminRequest req = new CollectionAdminRequest.Delete(); |
| req.setCollectionName(testCollectionName); |
| req.process(cloudClient); |
| } catch (Exception e) { |
| // don't fail the test |
| log.warn("Could not delete collection {} after test completed", testCollectionName); |
| } |
| } |
| |
| protected void testRf3WithLeaderFailover() throws Exception { |
| // now let's create a partition in one of the replicas and outright |
| // kill the leader ... see what happens |
| // create a collection that has 1 shard but 3 replicas |
| String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails |
| createCollection(testCollectionName, 1, 3, 1); |
| cloudClient.setDefaultCollection(testCollectionName); |
| |
| sendDoc(1); |
| |
| List<Replica> notLeaders = |
| ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); |
| assertTrue("Expected 2 replicas for collection " + testCollectionName |
| + " but found " + notLeaders.size() + "; clusterState: " |
| + printClusterStateInfo(), |
| notLeaders.size() == 2); |
| |
| sendDoc(1); |
| |
| // ok, now introduce a network partition between the leader and the replica |
| SocketProxy proxy0 = null; |
| proxy0 = getProxyForReplica(notLeaders.get(0)); |
| |
| proxy0.close(); |
| |
| // indexing during a partition |
| sendDoc(2); |
| |
| Thread.sleep(sleepMsBeforeHealPartition); |
| |
| proxy0.reopen(); |
| |
| SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1)); |
| |
| proxy1.close(); |
| |
| sendDoc(3); |
| |
| Thread.sleep(sleepMsBeforeHealPartition); |
| proxy1.reopen(); |
| |
| // sent 4 docs in so far, verify they are on the leader and replica |
| notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); |
| |
| sendDoc(4); |
| |
| assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4); |
| |
| Replica leader = |
| cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); |
| String leaderNode = leader.getNodeName(); |
| assertNotNull("Could not find leader for shard1 of "+ |
| testCollectionName+"; clusterState: "+printClusterStateInfo(), leader); |
| JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); |
| |
| // since maxShardsPerNode is 1, we're safe to kill the leader |
| notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); |
| proxy0 = getProxyForReplica(notLeaders.get(0)); |
| proxy0.close(); |
| |
| // indexing during a partition |
| // doc should be on leader and 1 replica |
| sendDoc(5); |
| |
| Thread.sleep(sleepMsBeforeHealPartition); |
| |
| String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName(); |
| |
| // kill the leader |
| leaderJetty.stop(); |
| |
| if (leaderJetty.isRunning()) |
| fail("Failed to stop the leader on "+leaderNode); |
| |
| SocketProxy oldLeaderProxy = getProxyForReplica(leader); |
| if (oldLeaderProxy != null) { |
| oldLeaderProxy.close(); |
| } else { |
| log.warn("No SocketProxy found for old leader node "+leaderNode); |
| } |
| |
| Thread.sleep(10000); // give chance for new leader to be elected. |
| |
| Replica newLeader = |
| cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000); |
| |
| assertNotNull("No new leader was elected after 60 seconds; clusterState: "+ |
| printClusterStateInfo(),newLeader); |
| |
| assertTrue("Expected node "+shouldNotBeNewLeaderNode+ |
| " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+ |
| printClusterStateInfo(), |
| !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName())); |
| |
| proxy0.reopen(); |
| |
| long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); |
| while (System.nanoTime() < timeout) { |
| cloudClient.getZkStateReader().updateClusterState(true); |
| |
| List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); |
| if (activeReps.size() == 2) break; |
| Thread.sleep(1000); |
| } |
| |
| List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); |
| assertTrue("Expected 2 of 3 replicas to be active but only found "+ |
| activeReps.size()+"; "+activeReps+"; clusterState: "+printClusterStateInfo(), |
| activeReps.size() == 2); |
| |
| sendDoc(6); |
| |
| assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6); |
| } |
| |
| protected String printClusterStateInfo() throws Exception { |
| cloudClient.getZkStateReader().updateClusterState(true); |
| return String.valueOf(cloudClient.getZkStateReader().getClusterState()); |
| } |
| |
| protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception { |
| Map<String,Replica> activeReplicas = new HashMap<String,Replica>(); |
| ZkStateReader zkr = cloudClient.getZkStateReader(); |
| ClusterState cs = zkr.getClusterState(); |
| assertNotNull(cs); |
| for (Slice shard : cs.getActiveSlices(testCollectionName)) { |
| if (shard.getName().equals(shardId)) { |
| for (Replica replica : shard.getReplicas()) { |
| String replicaState = replica.getStr(ZkStateReader.STATE_PROP); |
| if (ZkStateReader.ACTIVE.equals(replicaState) || ZkStateReader.RECOVERING.equals(replicaState)) { |
| activeReplicas.put(replica.getName(), replica); |
| } |
| } |
| } |
| } |
| List<Replica> replicas = new ArrayList<Replica>(); |
| replicas.addAll(activeReplicas.values()); |
| return replicas; |
| } |
| |
| protected SocketProxy getProxyForReplica(Replica replica) throws Exception { |
| String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); |
| assertNotNull(replicaBaseUrl); |
| URL baseUrl = new URL(replicaBaseUrl); |
| |
| SocketProxy proxy = proxies.get(baseUrl.toURI()); |
| if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) { |
| baseUrl = new URL(baseUrl.toExternalForm() + "/"); |
| proxy = proxies.get(baseUrl.toURI()); |
| } |
| assertNotNull("No proxy found for " + baseUrl + "!", proxy); |
| return proxy; |
| } |
| |
| protected void assertDocsExistInAllReplicas(List<Replica> notLeaders, |
| String testCollectionName, int firstDocId, int lastDocId) |
| throws Exception { |
| Replica leader = |
| cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000); |
| HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName); |
| List<HttpSolrServer> replicas = |
| new ArrayList<HttpSolrServer>(notLeaders.size()); |
| |
| for (Replica r : notLeaders) { |
| replicas.add(getHttpSolrServer(r, testCollectionName)); |
| } |
| try { |
| for (int d = firstDocId; d <= lastDocId; d++) { |
| String docId = String.valueOf(d); |
| assertDocExists(leaderSolr, testCollectionName, docId); |
| for (HttpSolrServer replicaSolr : replicas) { |
| assertDocExists(replicaSolr, testCollectionName, docId); |
| } |
| } |
| } finally { |
| if (leaderSolr != null) { |
| leaderSolr.shutdown(); |
| } |
| for (HttpSolrServer replicaSolr : replicas) { |
| replicaSolr.shutdown(); |
| } |
| } |
| } |
| |
| protected HttpSolrServer getHttpSolrServer(Replica replica, String coll) throws Exception { |
| ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica); |
| String url = zkProps.getBaseUrl() + "/" + coll; |
| return new HttpSolrServer(url); |
| } |
| |
| protected void sendDoc(int docId) throws Exception { |
| SolrInputDocument doc = new SolrInputDocument(); |
| doc.addField(id, String.valueOf(docId)); |
| doc.addField("a_t", "hello" + docId); |
| cloudClient.add(doc); |
| } |
| |
| protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, int rf, int maxWaitSecs) throws Exception { |
| long startMs = System.currentTimeMillis(); |
| |
| Map<String,Replica> notLeaders = new HashMap<String,Replica>(); |
| |
| ZkStateReader zkr = cloudClient.getZkStateReader(); |
| ClusterState cs = zkr.getClusterState(); |
| Collection<Slice> slices = cs.getActiveSlices(testCollectionName); |
| assertTrue(slices.size() == 1); // shards == 1 |
| boolean allReplicasUp = false; |
| long waitMs = 0L; |
| long maxWaitMs = maxWaitSecs * 1000L; |
| Replica leader = null; |
| while (waitMs < maxWaitMs && !allReplicasUp) { |
| cs = zkr.getClusterState(); |
| assertNotNull(cs); |
| for (Slice shard : cs.getActiveSlices(testCollectionName)) { |
| allReplicasUp = true; // assume true |
| Collection<Replica> replicas = shard.getReplicas(); |
| assertTrue(replicas.size() == rf); |
| leader = shard.getLeader(); |
| assertNotNull(leader); |
| |
| // ensure all replicas are "active" and identify the non-leader replica |
| for (Replica replica : replicas) { |
| String replicaState = replica.getStr(ZkStateReader.STATE_PROP); |
| if (!ZkStateReader.ACTIVE.equals(replicaState)) { |
| log.info("Replica " + replica.getName() + " is currently " + replicaState); |
| allReplicasUp = false; |
| } |
| |
| if (!leader.equals(replica)) |
| notLeaders.put(replica.getName(), replica); |
| } |
| |
| if (!allReplicasUp) { |
| try { |
| Thread.sleep(500L); |
| } catch (Exception ignoreMe) {} |
| waitMs += 500L; |
| } |
| } |
| } // end while |
| |
| if (!allReplicasUp) |
| fail("Didn't see all replicas come up within " + maxWaitMs + |
| " ms! ClusterState: " + printClusterStateInfo()); |
| |
| if (notLeaders.isEmpty()) |
| fail("Didn't isolate any replicas that are not the leader! ClusterState: " + |
| printClusterStateInfo()); |
| |
| long diffMs = (System.currentTimeMillis() - startMs); |
| log.info("Took " + diffMs + " ms to see all replicas become active."); |
| |
| List<Replica> replicas = new ArrayList<Replica>(); |
| replicas.addAll(notLeaders.values()); |
| return replicas; |
| } |
| |
| /** |
| * Query the real-time get handler for a specific doc by ID to verify it |
| * exists in the provided server. |
| */ |
| @SuppressWarnings("rawtypes") |
| protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception { |
| QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId)); |
| NamedList rsp = solr.request(qr); |
| String match = |
| JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId)); |
| assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL() |
| + " due to: " + match, match == null); |
| } |
| |
| protected JettySolrRunner getJettyOnPort(int port) { |
| JettySolrRunner theJetty = null; |
| for (JettySolrRunner jetty : jettys) { |
| if (port == jetty.getLocalPort()) { |
| theJetty = jetty; |
| break; |
| } |
| } |
| |
| if (theJetty == null) { |
| if (controlJetty.getLocalPort() == port) { |
| theJetty = controlJetty; |
| } |
| } |
| |
| if (theJetty == null) |
| fail("Not able to find JettySolrRunner for port: "+port); |
| |
| return theJetty; |
| } |
| |
| protected int getReplicaPort(Replica replica) { |
| String replicaNode = replica.getNodeName(); |
| String tmp = replicaNode.substring(replicaNode.indexOf(':')+1); |
| if (tmp.indexOf('_') != -1) |
| tmp = tmp.substring(0,tmp.indexOf('_')); |
| return Integer.parseInt(tmp); |
| } |
| } |