blob: 28929d4acf0ab164d052553587dd7ef446945c04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.bootstrap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.bootstrap.BootStrapStatus.BSStat;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ncole
*
*/
class BSRunner extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(BSRunner.class);
private static final String DEFAULT_USER = "root";
private static final String DEFAULT_SSHPORT = "22";
private boolean finished = false;
private SshHostInfo sshHostInfo;
private File bootDir;
private String bsScript;
private File requestIdDir;
private File sshKeyFile;
private File passwordFile;
private int requestId;
private String agentSetupScript;
private String agentSetupPassword;
private String ambariHostname;
private boolean verbose;
private BootStrapImpl bsImpl;
private final String clusterOsFamily;
private String projectVersion;
private int serverPort;
public BSRunner(BootStrapImpl impl, SshHostInfo sshHostInfo, String bootDir,
String bsScript, String agentSetupScript, String agentSetupPassword,
int requestId, long timeout, String hostName, boolean isVerbose, String clusterOsFamily,
String projectVersion, int serverPort)
{
this.requestId = requestId;
this.sshHostInfo = sshHostInfo;
this.bsScript = bsScript;
this.bootDir = new File(bootDir);
this.requestIdDir = new File(bootDir, Integer.toString(requestId));
this.sshKeyFile = new File(this.requestIdDir, "sshKey");
this.agentSetupScript = agentSetupScript;
this.agentSetupPassword = agentSetupPassword;
this.ambariHostname = hostName;
this.verbose = isVerbose;
this.clusterOsFamily = clusterOsFamily;
this.projectVersion = projectVersion;
this.bsImpl = impl;
this.serverPort = serverPort;
BootStrapStatus status = new BootStrapStatus();
status.setLog("RUNNING");
status.setStatus(BSStat.RUNNING);
bsImpl.updateStatus(requestId, status);
}
/**
* Update the gathered data from reading output
*
*/
private class BSStatusCollector implements Runnable {
@Override
public void run() {
BSHostStatusCollector collector = new BSHostStatusCollector(requestIdDir,
sshHostInfo.getHosts());
collector.run();
List<BSHostStatus> hostStatus = collector.getHostStatus();
BootStrapStatus status = new BootStrapStatus();
status.setHostsStatus(hostStatus);
status.setLog("");
status.setStatus(BSStat.RUNNING);
bsImpl.updateStatus(requestId, status);
}
}
private String createHostString(List<String> list) {
return list != null ? String.join(",", list) : StringUtils.EMPTY;
}
/** Create request id dir for each bootstrap call **/
private void createRunDir() throws IOException {
if (!bootDir.exists()) {
// create the bootdir directory.
if (! bootDir.mkdirs()) {
throw new IOException("Cannot create " + bootDir);
}
}
/* create the request id directory */
if (requestIdDir.exists()) {
/* delete the directory and make sure we start back */
FileUtils.deleteDirectory(requestIdDir);
}
/* create the directory for the run dir */
if (! requestIdDir.mkdirs()) {
throw new IOException("Cannot create " + requestIdDir);
}
}
private void writeSshKeyFile(String data) throws IOException {
FileUtils.writeStringToFile(sshKeyFile, data);
}
private void writePasswordFile(String data) throws IOException {
FileUtils.writeStringToFile(passwordFile, data);
}
/**
* Waits until the process has terminated or waiting time elapses.
* @param timeout time to wait in miliseconds
* @return true if process has exited, false otherwise
*/
private boolean waitForProcessTermination(Process process, long timeout) throws InterruptedException {
long startTime = System.currentTimeMillis();
do {
try {
process.exitValue();
return true;
} catch (IllegalThreadStateException ignored) {}
// Check if process has terminated once per second
Thread.sleep(1000);
} while (System.currentTimeMillis() - startTime < timeout);
return false;
}
/**
* Calculates bootstrap timeout as a function of number of hosts.
* @return timeout in milliseconds
*/
private long calculateBSTimeout(int hostCount) {
final int PARALLEL_BS_COUNT = 20; // bootstrap.py bootstraps 20 hosts in parallel
final long HOST_BS_TIMEOUT = 300000L; // 5 minutes timeout for a host (average). Same as in bootstrap.py
return Math.max(HOST_BS_TIMEOUT, HOST_BS_TIMEOUT * hostCount / PARALLEL_BS_COUNT);
}
public synchronized void finished() {
this.finished = true;
}
@Override
public void run() {
String hostString = createHostString(sshHostInfo.getHosts());
long bootstrapTimeout = calculateBSTimeout(sshHostInfo.getHosts().size());
// Startup a scheduled executor service to look through the logs
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
BSStatusCollector statusCollector = new BSStatusCollector();
ScheduledFuture<?> handle = null;
LOG.info("Kicking off the scheduler for polling on logs in " +
this.requestIdDir);
String user = sshHostInfo.getUser();
String userRunAs = sshHostInfo.getUserRunAs();
if (user == null || user.isEmpty()) {
user = DEFAULT_USER;
}
String sshPort = sshHostInfo.getSshPort();
if(sshPort == null || sshPort.isEmpty()){
sshPort = DEFAULT_SSHPORT;
}
String command[] = new String[13];
BSStat stat = BSStat.RUNNING;
String scriptlog = "";
try {
createRunDir();
handle = scheduler.scheduleWithFixedDelay(statusCollector,
0, 10, TimeUnit.SECONDS);
if (LOG.isDebugEnabled()) {
// FIXME needs to be removed later
// security hole
LOG.debug("Using ssh key=\"{}\"", sshHostInfo.getSshKey());
}
String password = sshHostInfo.getPassword();
if (password != null && !password.isEmpty()) {
this.passwordFile = new File(this.requestIdDir, "host_pass");
// TODO : line separator should be changed
// if we are going to support multi platform server-agent solution
String lineSeparator = System.getProperty("line.separator");
password = password + lineSeparator;
writePasswordFile(password);
}
writeSshKeyFile(sshHostInfo.getSshKey());
/* Running command:
* script hostlist bsdir user sshkeyfile
*/
command[0] = this.bsScript;
command[1] = hostString;
command[2] = this.requestIdDir.toString();
command[3] = user;
command[4] = sshPort;
command[5] = this.sshKeyFile.toString();
command[6] = this.agentSetupScript.toString();
command[7] = this.ambariHostname;
command[8] = this.clusterOsFamily;
command[9] = this.projectVersion;
command[10] = this.serverPort+"";
command[11] = userRunAs;
command[12] = (this.passwordFile==null) ? "null" : this.passwordFile.toString();
Map<String, String> envVariables = new HashMap<>();
if (System.getProperty("os.name").contains("Windows")) {
String command2[] = new String[command.length + 1];
command2[0] = "python";
System.arraycopy(command, 0, command2, 1, command.length);
command = command2;
Map<String, String> envVarsWin = System.getenv();
if (envVarsWin != null) {
envVariables.putAll(envVarsWin); //envVarsWin is non-modifiable
}
}
LOG.info("Host= " + hostString + " bs=" + this.bsScript + " requestDir=" +
requestIdDir + " user=" + user + " sshPort=" + sshPort + " keyfile=" + this.sshKeyFile +
" passwordFile " + this.passwordFile + " server=" + this.ambariHostname +
" version=" + projectVersion + " serverPort=" + this.serverPort + " userRunAs=" + userRunAs +
" timeout=" + bootstrapTimeout / 1000);
envVariables.put("AMBARI_PASSPHRASE", agentSetupPassword);
if (this.verbose)
envVariables.put("BS_VERBOSE", "\"-vvv\"");
if (LOG.isDebugEnabled()) {
LOG.debug(Arrays.toString(command));
}
String bootStrapOutputFilePath = requestIdDir + File.separator + "bootstrap.out";
String bootStrapErrorFilePath = requestIdDir + File.separator + "bootstrap.err";
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectOutput(new File(bootStrapOutputFilePath));
pb.redirectError(new File(bootStrapErrorFilePath));
Map<String, String> env = pb.environment();
env.putAll(envVariables);
Process process = pb.start();
try {
String logInfoMessage = "Bootstrap output, log="
+ bootStrapErrorFilePath + " " + bootStrapOutputFilePath + " at " + this.ambariHostname;
LOG.info(logInfoMessage);
int exitCode = 1;
boolean timedOut = false;
if (waitForProcessTermination(process, bootstrapTimeout)){
exitCode = process.exitValue();
} else {
LOG.warn("Bootstrap process timed out. It will be destroyed.");
process.destroy();
timedOut = true;
}
String outMesg = "";
String errMesg = "";
try {
outMesg = FileUtils.readFileToString(new File(bootStrapOutputFilePath));
errMesg = FileUtils.readFileToString(new File(bootStrapErrorFilePath));
} catch(IOException io) {
LOG.info("Error in reading files ", io);
}
scriptlog = outMesg + "\n\n" + errMesg;
if (timedOut) {
scriptlog += "\n\n Bootstrap process timed out. It was destroyed.";
}
LOG.info("Script log Mesg " + scriptlog);
if (exitCode != 0) {
stat = BSStat.ERROR;
interuptSetupAgent(99, scriptlog);
} else {
stat = BSStat.SUCCESS;
}
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();
while (true) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for hosts status to be updated");
}
boolean pendingHosts = false;
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
List <BSHostStatus> hostStatusList = tmpStatus.getHostsStatus();
if (hostStatusList != null) {
for (BSHostStatus status : hostStatusList) {
if (status.getStatus().equals("RUNNING")) {
pendingHosts = true;
}
}
} else {
//Failed to get host status, waiting for hosts status to be updated
pendingHosts = true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Whether hosts status yet to be updated, pending={}", pendingHosts);
}
if (!pendingHosts) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
}
long now = System.currentTimeMillis();
if (now >= (startTime+15000)) {
LOG.warn("Gave up waiting for hosts status to be updated");
break;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
if (handle != null) {
handle.cancel(true);
}
/* schedule a last update */
scheduler.schedule(new BSStatusCollector(), 0, TimeUnit.SECONDS);
scheduler.shutdownNow();
try {
scheduler.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interruped while waiting for scheduler");
}
process.destroy();
}
} catch(IOException io) {
LOG.info("Error executing bootstrap " + io.getMessage());
stat = BSStat.ERROR;
interuptSetupAgent(99, io.getMessage());
}
finally {
/* get the bstatus */
BootStrapStatus tmpStatus = bsImpl.getStatus(requestId);
List <BSHostStatus> hostStatusList = tmpStatus.getHostsStatus();
if (hostStatusList != null) {
for (BSHostStatus hostStatus : hostStatusList) {
if ("FAILED".equals(hostStatus.getStatus())) {
stat = BSStat.ERROR;
break;
}
}
} else {
stat = BSStat.ERROR;
}
// creating new status instance to avoid modifying exposed object
BootStrapStatus newStat = new BootStrapStatus();
newStat.setHostsStatus(hostStatusList);
newStat.setLog(scriptlog);
newStat.setStatus(stat);
// Remove private ssh key after bootstrap is complete
try {
FileUtils.forceDelete(sshKeyFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
if (passwordFile != null) {
// Remove password file after bootstrap is complete
try {
FileUtils.forceDelete(passwordFile);
} catch (IOException io) {
LOG.warn(io.getMessage());
}
}
bsImpl.updateStatus(requestId, newStat);
bsImpl.reset();
finished();
}
}
public synchronized void interuptSetupAgent(int exitCode, String errMesg){
PrintWriter setupAgentDoneWriter = null;
PrintWriter setupAgentLogWriter = null;
try {
for (String host : sshHostInfo.getHosts()) {
File doneFile = new File(requestIdDir, host + BSHostStatusCollector.doneFileFilter);
// Do not rewrite finished host statuses
if (!doneFile.exists()) {
setupAgentDoneWriter = new PrintWriter(doneFile);
setupAgentDoneWriter.print(exitCode);
setupAgentDoneWriter.close();
}
File logFile = new File(requestIdDir, host + BSHostStatusCollector.logFileFilter);
if (!logFile.exists()) {
setupAgentLogWriter = new PrintWriter(logFile);
setupAgentLogWriter.print("Error while bootstrapping:\n" + errMesg);
setupAgentLogWriter.close();
}
}
} catch (FileNotFoundException ex) {
LOG.error(ex.toString());
} finally {
if (setupAgentDoneWriter != null) {
setupAgentDoneWriter.close();
}
if (setupAgentLogWriter != null) {
setupAgentLogWriter.close();
}
}
}
public synchronized boolean isRunning() {
return !this.finished;
}
}