blob: 26ff70bbc66cbed51e73aad7c09fb220f6e287f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logsearch.configurer;
import org.apache.ambari.logsearch.conf.SolrClientsHolder;
import org.apache.ambari.logsearch.conf.SolrPropsConfig;
import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
import org.apache.ambari.logsearch.dao.SolrDaoBase;
import org.apache.ambari.logsearch.handler.ACLHandler;
import org.apache.ambari.logsearch.handler.CreateCollectionHandler;
import org.apache.ambari.logsearch.handler.ListCollectionHandler;
import org.apache.ambari.logsearch.handler.ReloadCollectionHandler;
import org.apache.ambari.logsearch.handler.UploadConfigurationHandler;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.data.solr.core.SolrTemplate;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class SolrCollectionConfigurer implements Configurer {
private Logger logger = LogManager.getLogger(SolrCollectionConfigurer.class);
private static final int SETUP_RETRY_SECOND = 10;
private static final int SESSION_TIMEOUT = 15000;
private static final int CONNECTION_TIMEOUT = 30000;
private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
private static final String SOLR_HTTPCLIENT_BUILDER_FACTORY = "solr.httpclient.builder.factory";
private final SolrDaoBase solrDaoBase;
private final boolean hasEnumConfig; // enumConfig.xml for solr collection
private final SolrClientsHolder solrClientsHolder;
private final SolrClientsHolder.CollectionType collectionType;
public SolrCollectionConfigurer(SolrDaoBase solrDaoBase, boolean hasEnumConfig,
SolrClientsHolder solrClientsHolder, SolrClientsHolder.CollectionType collectionType) {
this.solrDaoBase = solrDaoBase;
this.hasEnumConfig = hasEnumConfig;
this.solrClientsHolder = solrClientsHolder;
this.collectionType = collectionType;
}
@Override
public void start() {
setupSecurity();
final SolrPropsConfig solrPropsConfig = solrDaoBase.getSolrPropsConfig();
final SolrCollectionState state = solrDaoBase.getSolrCollectionState();
final String separator = FileSystems.getDefault().getSeparator();
final String localConfigSetLocation = String.format("%s%s%s%sconf", solrPropsConfig.getConfigSetFolder(), separator,
solrPropsConfig.getConfigName(), separator);
final File configSetFolder = new File(localConfigSetLocation);
if (!configSetFolder.exists()) { // show exception only once during startup
throw new RuntimeException(String.format("Cannot load config set location: %s", localConfigSetLocation));
}
Thread setupThread = new Thread("setup_collection_" + solrPropsConfig.getCollection()) {
@Override
public void run() {
logger.info("Started monitoring thread to check availability of Solr server. collection=" + solrPropsConfig.getCollection());
while (!stopSetupCondition(state)) {
int retryCount = 0;
try {
retryCount++;
Thread.sleep(SETUP_RETRY_SECOND * 1000);
openZkConnectionAndUpdateStatus(state, solrPropsConfig);
if (solrDaoBase.getSolrTemplate() == null) {
solrDaoBase.setSolrTemplate(createSolrTemplate(solrPropsConfig));
}
CloudSolrClient cloudSolrClient = (CloudSolrClient) solrClientsHolder.getSolrClient(collectionType);
boolean reloadCollectionNeeded = uploadConfigurationsIfNeeded(cloudSolrClient, configSetFolder, state, solrPropsConfig);
checkSolrStatus(cloudSolrClient);
createCollectionsIfNeeded(cloudSolrClient, state, solrPropsConfig, reloadCollectionNeeded);
} catch (Exception e) {
retryCount++;
logger.error("Error setting collection. collection=" + solrPropsConfig.getCollection() + ", retryCount=" + retryCount, e);
}
}
}
};
setupThread.setDaemon(true);
setupThread.start();
}
private boolean uploadConfigurationsIfNeeded(CloudSolrClient cloudSolrClient, File configSetFolder, SolrCollectionState state, SolrPropsConfig solrPropsConfig) throws Exception {
boolean reloadCollectionNeeded = new UploadConfigurationHandler(configSetFolder, hasEnumConfig).handle(cloudSolrClient, solrPropsConfig);
if (!state.isConfigurationUploaded()) {
state.setConfigurationUploaded(true);
}
return reloadCollectionNeeded;
}
public boolean stopSetupCondition(SolrCollectionState state) {
return state.isSolrCollectionReady();
}
public SolrTemplate createSolrTemplate(SolrPropsConfig solrPropsConfig) {
SolrClient solrClient = createClient(
solrPropsConfig.getSolrUrl(),
solrPropsConfig.getZkConnectString(),
solrPropsConfig.getCollection());
solrClientsHolder.setSolrClient(solrClient, collectionType);
return new SolrTemplate(solrClient);
}
private CloudSolrClient createClient(String solrUrl, String zookeeperConnectString, String defaultCollection) {
if (StringUtils.isNotEmpty(zookeeperConnectString)) {
CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder().withZkHost(zookeeperConnectString).build();
cloudSolrClient.setDefaultCollection(defaultCollection);
return cloudSolrClient;
} else if (StringUtils.isNotEmpty(solrUrl)) {
throw new UnsupportedOperationException("Currently only cloud mode is supported. Set zookeeper connect string.");
}
throw new IllegalStateException(
"Solr url or zookeeper connection string is missing. collection: " + defaultCollection);
}
private void setupSecurity() {
boolean securityEnabled = solrDaoBase.getSolrKerberosConfig().isEnabled();
if (securityEnabled) {
String javaSecurityConfig = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
String solrHttpBuilderFactory = System.getProperty(SOLR_HTTPCLIENT_BUILDER_FACTORY);
logger.info("setupSecurity() called for kerberos configuration, jaas file: {}, solr http client factory: {}",
javaSecurityConfig, solrHttpBuilderFactory);
}
}
private void openZkConnectionAndUpdateStatus(final SolrCollectionState state, final SolrPropsConfig solrPropsConfig) throws Exception {
ZooKeeper zkClient = null;
try {
logger.info("Checking that Znode ('{}') is ready or not... ", solrPropsConfig.getZkConnectString());
zkClient = openZookeeperConnection(solrPropsConfig);
if (!state.isZnodeReady()) {
logger.info("State change: Zookeeper ZNode is available for {}", solrPropsConfig.getZkConnectString());
state.setZnodeReady(true);
}
} catch (Exception e) {
logger.error("Error occurred during the creation of zk client (connection string: {})", solrPropsConfig.getZkConnectString());
throw e;
} finally {
try {
if (zkClient != null) {
zkClient.close();
}
} catch (Exception e) {
logger.error("Could not close zk connection properly.", e);
}
}
}
private ZooKeeper openZookeeperConnection(final SolrPropsConfig solrPropsConfig) throws InterruptedException, IOException {
final CountDownLatch connSignal = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(solrPropsConfig.getZkConnectString(), SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connSignal.countDown();
}
}
});
connSignal.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
return zooKeeper;
}
private boolean checkSolrStatus(CloudSolrClient cloudSolrClient) {
int waitDurationMS = 3 * 60 * 1000;
boolean status = false;
try {
long beginTimeMS = System.currentTimeMillis();
long waitIntervalMS = 2000;
int pingCount = 0;
while (true) {
pingCount++;
try {
List<String> collectionList = new ListCollectionHandler().handle(cloudSolrClient, null);
if (collectionList != null) {
logger.info("checkSolrStatus(): Solr getCollections() is success. collectionList=" + collectionList);
status = true;
break;
}
} catch (Exception ex) {
logger.error("Error while doing Solr check", ex);
}
if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
logger.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS) + " ms. " +
"If you are using alias, then you might have to restart LogSearch after Solr is up and running.");
break;
} else {
logger.warn("Solr is not not reachable yet. getCollections() attempt count=" + pingCount + ". " +
"Will sleep for " + waitIntervalMS + " ms and try again.");
}
Thread.sleep(waitIntervalMS);
}
} catch (Throwable t) {
logger.error("Seems Solr is not up.");
}
return status;
}
private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig,
boolean reloadCollectionNeeded) {
try {
List<String> allCollectionList = new ListCollectionHandler().handle(solrClient, null);
solrDaoBase.waitForLogSearchConfig();
CreateCollectionHandler handler = new CreateCollectionHandler(allCollectionList);
boolean collectionCreated = handler.handle(solrClient, solrPropsConfig);
boolean collectionReloaded = true;
if (reloadCollectionNeeded) {
collectionReloaded = new ReloadCollectionHandler().handle(solrClient, solrPropsConfig);
}
boolean aclsUpdated = new ACLHandler().handle(solrClient, solrPropsConfig);
if (!state.isSolrCollectionReady() && collectionCreated && collectionReloaded && aclsUpdated) {
state.setSolrCollectionReady(true);
}
} catch (Exception ex) {
logger.error("Error during creating/updating collection. collectionName=" + solrPropsConfig.getCollection(), ex);
}
}
}