blob: 76271c3ba84033f2cf46dc9d316e891674264112 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.logging.LogWatcher;
import org.apache.solr.logging.LogWatcherConfig;
import org.apache.solr.util.IdUtils;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class SystemCollectionCompatTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("managed.schema.mutable", "true");
configureCluster(2)
.addConfig("conf1", configset("cloud-managed"))
.configure();
if (! log.isWarnEnabled()) {
fail("Test requires that log-level is at-least WARN, but WARN is disabled");
}
}
private SolrCloudManager cloudManager;
private CloudSolrClient solrClient;
@Before
public void setupSystemCollection() throws Exception {
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
cloudManager = zkController.getSolrCloudManager();
solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
Optional.empty()).build();
// send a dummy doc to the .system collection
SolrInputDocument doc = new SolrInputDocument(
"id", IdUtils.timeRandomId(),
CommonParams.TYPE, "dummy");
doc.addField("time_l", cloudManager.getTimeSource().getEpochTimeNs());
doc.addField("timestamp", new Date());
solrClient.add(CollectionAdminParams.SYSTEM_COLL, doc);
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
Replica leader
= solrClient.getZkStateReader().getLeaderRetry(CollectionAdminParams.SYSTEM_COLL, "shard1", DEFAULT_TIMEOUT);
final AtomicReference<Long> coreStartTime = new AtomicReference<>(getCoreStatus(leader).getCoreStartTime().getTime());
// trigger compat report by changing the schema
SchemaRequest req = new SchemaRequest();
SchemaResponse rsp = req.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
Map<String, Object> field = getSchemaField("timestamp", rsp);
// make some obviously incompatible changes
field.put("type", "string");
field.put("docValues", false);
SchemaRequest.ReplaceField replaceFieldRequest = new SchemaRequest.ReplaceField(field);
SchemaResponse.UpdateResponse replaceFieldResponse = replaceFieldRequest.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
assertEquals(replaceFieldResponse.toString(), 0, replaceFieldResponse.getStatus());
CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(CollectionAdminParams.SYSTEM_COLL);
CollectionAdminResponse response = reloadRequest.process(solrClient);
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
// wait for the reload to complete
RetryUtil.retryUntil("Timed out waiting for core to reload", 30, 1000, TimeUnit.MILLISECONDS, () -> {
long restartTime = 0;
try {
restartTime = getCoreStatus(leader).getCoreStartTime().getTime();
} catch (Exception e) {
log.warn("Exception getting core start time: {}", e.getMessage());
return false;
}
return restartTime > coreStartTime.get();
});
cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
}
@After
public void doAfter() throws Exception {
log.info("doAfter: deleting all collections...");
cluster.deleteAllCollections();
if (null != solrClient) {
solrClient.close();
solrClient = null;
}
}
private Map<String, Object> getSchemaField(String name, SchemaResponse schemaResponse) {
List<Map<String, Object>> fields = schemaResponse.getSchemaRepresentation().getFields();
for (Map<String, Object> field : fields) {
if (name.equals(field.get("name"))) {
return field;
}
}
return null;
}
@Test
public void testBackCompat() throws Exception {
CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
CloudSolrClient solrClient = cluster.getSolrClient();
CollectionAdminResponse adminResponse = status.process(solrClient);
NamedList<Object> response = adminResponse.getResponse();
String leader = (String) response.get("leader");
log.info("Overseer Status indicates that the overseer is: {}", leader);
JettySolrRunner overseerNode = null;
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
for (int i = 0; i < jettySolrRunners.size(); i++) {
JettySolrRunner runner = jettySolrRunners.get(i);
if (runner.getNodeName().equals(leader)) {
overseerNode = runner;
break;
}
}
assertNotNull(overseerNode);
LogWatcherConfig watcherCfg = new LogWatcherConfig(true, null, "WARN", 100);
@SuppressWarnings({"rawtypes"})
LogWatcher watcher = LogWatcher.newRegisteredLogWatcher(watcherCfg, null);
watcher.reset();
// restart Overseer to trigger the back-compat check
if (log.isInfoEnabled()) {
log.info("Stopping Overseer Node: {} ({})", overseerNode.getNodeName(), overseerNode.getLocalPort());
}
cluster.stopJettySolrRunner(overseerNode);
log.info("Waiting for new overseer election...");
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
while (!timeOut.hasTimedOut()) {
adminResponse = status.process(solrClient);
response = adminResponse.getResponse();
String newLeader = (String) response.get("leader");
if (newLeader != null && !leader.equals(newLeader)) {
log.info("...new overseer is: {}", newLeader);
break;
}
timeOut.sleep(200);
}
if (timeOut.hasTimedOut()) {
fail("time out waiting for new Overseer leader");
}
TimeOut timeOut1 = new TimeOut(60, TimeUnit.SECONDS, cloudManager.getTimeSource());
boolean foundWarning = false;
boolean foundSchemaWarning = false;
// TODO: replace this polling logic with a LogWatcher that uses a queue we can await() on...
log.info("Polling for log watcher to detect expected log messages...");
while (!timeOut1.hasTimedOut()) {
timeOut1.sleep(1000);
SolrDocumentList history = watcher.getHistory(-1, null);
for (SolrDocument doc : history) {
if (!Overseer.class.getName().equals(doc.getFieldValue("logger"))) {
continue;
}
if (doc.getFieldValue("message").toString().contains("re-indexing")) {
if (log.isInfoEnabled()) {
log.info("Found re-indexing message: {}", doc.getFieldValue("message"));
}
foundWarning = true;
}
if (doc.getFieldValue("message").toString().contains("timestamp")) {
if (log.isInfoEnabled()) {
log.info("Found timestamp message: {}", doc.getFieldValue("message"));
}
foundSchemaWarning = true;
}
}
if (foundWarning && foundSchemaWarning) {
break;
}
}
log.info("Done polling log watcher: foundWarning={} foundSchemaWarning={}", foundWarning, foundSchemaWarning);
assertTrue("re-indexing warning not found", foundWarning);
assertTrue("timestamp field incompatibility warning not found", foundSchemaWarning);
}
}