blob: 2d93aefa79c514523d098d62212e1ce25c9658f9 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import socket
import time
import sys
import logging
import pprint
import os
import subprocess
import threading
import traceback
from pprint import pformat
AMBARI_PASSPHRASE_VAR_NAME = "AMBARI_PASSPHRASE"
class SCP(threading.Thread):
""" SCP implementation that is thread based. The status can be returned using
status val """
def __init__(self, sshKeyFile, host, inputFile, remote, bootdir):
self.sshKeyFile = sshKeyFile
self.host = host
self.inputFile = inputFile
self.remote = remote
self.bootdir = bootdir
self.ret = {"exitstatus" : -1, "log" : "FAILED"}
threading.Thread.__init__(self)
pass
def getStatus(self):
return self.ret
pass
def getHost(self):
return self.host
def run(self):
scpcommand = ["scp",
"-o", "ConnectTimeout=60",
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=no",
"-i", self.sshKeyFile, self.inputFile, "root@" +
self.host + ":" + self.remote]
logging.info("Running scp command " + ' '.join(scpcommand))
scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log = scpstat.communicate()
self.ret["exitstatus"] = scpstat.returncode
self.ret["log"] = "STDOUT\n" + log[0] + "\nSTDERR\n" + log[1]
logFilePath = os.path.join(self.bootdir, self.host + ".log")
logFile = open(logFilePath, "a+")
logFile.write(self.ret["log"])
logFile.close
logging.info("scp " + self.inputFile + " done for host " + self.host + ", exitcode=" + str(scpstat.returncode))
pass
class SSH(threading.Thread):
""" Ssh implementation of this """
def __init__(self, sshKeyFile, host, command, bootdir):
self.sshKeyFile = sshKeyFile
self.host = host
self.command = command
self.bootdir = bootdir
self.ret = {"exitstatus" : -1, "log": "FAILED"}
threading.Thread.__init__(self)
pass
def getHost(self):
return self.host
def getStatus(self):
return self.ret
def run(self):
sshcommand = ["ssh",
"-o", "ConnectTimeOut=60",
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-tt", # Should prevent "tput: No value for $TERM and no -T specified" warning
"-i", self.sshKeyFile,
"root@" + self.host, self.command]
logging.info("Running ssh command " + ' '.join(sshcommand))
sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log = sshstat.communicate()
self.ret["exitstatus"] = sshstat.returncode
self.ret["log"] = "STDOUT\n" + log[0] + "\nSTDERR\n" + log[1]
logFilePath = os.path.join(self.bootdir, self.host + ".log")
logFile = open(logFilePath, "a+")
logFile.write(self.ret["log"])
logFile.close
doneFilePath = os.path.join(self.bootdir, self.host + ".done")
doneFile = open(doneFilePath, "w+")
doneFile.write(str(sshstat.returncode))
doneFile.close()
logging.info("Setup agent done for host " + self.host + ", exitcode=" + str(sshstat.returncode))
pass
pass
def splitlist(hosts, n):
return [hosts[i:i+n] for i in range(0, len(hosts), n)]
def skip_failed_hosts(statuses):
""" Takes a dictionary <hostname, hoststatus> and returns list of hosts whose status is SUCCESS"""
res = list(key for key, value in statuses.iteritems() if value["exitstatus"] == 0)
return res
def unite_statuses(statuses, update):
""" Takes two dictionaries <hostname, hoststatus> and returns dictionary with united entries (returncode is set
to the max value per host, logs per host are concatenated)"""
result = {}
for key, value in statuses.iteritems():
if key in update:
upd_status = update[key]
res_status = {
"exitstatus" : max(value["exitstatus"], upd_status["exitstatus"]),
"log" : value["log"] + "\n" + upd_status["log"]
}
result[key] = res_status
else:
result[key] = value
return result
def get_difference(list1, list2):
"""Takes two lists and returns list filled by elements of list1 that are absent at list2.
Duplicates are removed too"""
#res =
s1 = set(list1)
s2 = set(list2)
return list(s1- s2)
class PSSH:
"""Run SSH in parallel for a given list of hosts"""
def __init__(self, hosts, sshKeyFile, command, bootdir):
self.hosts = hosts
self.sshKeyFile = sshKeyFile
self.command = command
self.bootdir = bootdir
self.ret = {}
pass
def getstatus(self):
return self.ret
pass
def run(self):
""" Run 20 at a time in parallel """
for chunk in splitlist(self.hosts, 20):
chunkstats = []
for host in chunk:
ssh = SSH(self.sshKeyFile, host, self.command, self.bootdir)
ssh.start()
chunkstats.append(ssh)
pass
# wait for the ssh's to complete
for chunkstat in chunkstats:
chunkstat.join()
self.ret[chunkstat.getHost()] = chunkstat.getStatus()
pass
pass
pass
class PSCP:
"""Run SCP in parallel for a given list of hosts"""
def __init__(self, hosts, sshKeyFile, inputfile, remote, bootdir):
self.hosts = hosts
self.sshKeyFile = sshKeyFile
self.inputfile = inputfile
self.remote = remote
self.bootdir = bootdir
self.ret = {}
pass
def getstatus(self):
return self.ret
pass
def run(self):
""" Run 20 at a time in parallel """
for chunk in splitlist(self.hosts, 20):
chunkstats = []
for host in chunk:
scp = SCP(self.sshKeyFile, host, self.inputfile, self.remote, self.bootdir)
scp.start()
chunkstats.append(scp)
pass
# wait for the scp's to complete
for chunkstat in chunkstats:
chunkstat.join()
self.ret[chunkstat.getHost()] = chunkstat.getStatus()
pass
pass
pass
class BootStrap:
""" BootStrapping the agents on a list of hosts"""
def __init__(self, hosts, sshkeyFile, scriptDir, boottmpdir, setupAgentFile, ambariServer):
self.hostlist = hosts
self.successive_hostlist = hosts
self.sshkeyFile = sshkeyFile
self.bootdir = boottmpdir
self.scriptDir = scriptDir
self.setupAgentFile = setupAgentFile
self.ambariServer = ambariServer
self.statuses = None
pass
def is_suse(self):
if os.path.isfile("/etc/issue"):
if "suse" in open("/etc/issue").read().lower():
return True
return False
def getRepoDir(self):
""" Ambari repo file for Ambari."""
if self.is_suse():
return "/etc/zypp/repos.d"
else:
return "/etc/yum.repos.d"
def getRepoFile(self):
""" Ambari repo file for Ambari."""
return os.path.join(self.getRepoDir(), "ambari.repo")
def getSetupScript(self):
return os.path.join(self.scriptDir, "setupAgent.py")
def copyNeededFiles(self):
try:
# Copying the files
fileToCopy = self.getRepoFile()
targetDir = self.getRepoDir()
pscp = PSCP(self.hostlist, self.sshkeyFile, fileToCopy, targetDir, self.bootdir)
pscp.run()
out = pscp.getstatus()
# Prepearing report about failed hosts
self.successive_hostlist = skip_failed_hosts(out)
failed = get_difference(self.hostlist, self.successive_hostlist)
logging.info("Parallel scp returns for repo file. Failed hosts are: " + str(failed))
#updating statuses
self.statuses = out
pscp = PSCP(self.successive_hostlist, self.sshkeyFile, self.setupAgentFile, "/tmp", self.bootdir)
pscp.run()
out = pscp.getstatus()
# Prepearing report about failed hosts
failed_current = get_difference(self.successive_hostlist, skip_failed_hosts(out))
self.successive_hostlist = skip_failed_hosts(out)
failed = get_difference(self.hostlist, self.successive_hostlist)
logging.info("Parallel scp returns for agent script. All failed hosts are: " + str(failed) +
". Failed on last step: " + str(failed_current))
#updating statuses
self.statuses = unite_statuses(self.statuses, out)
retstatus = 0
if not failed:
retstatus = 0
else:
retstatus = 1
return retstatus
except Exception, e:
logging.info("Traceback " + traceback.format_exc())
pass
pass
def runSetupAgent(self):
logging.info("Running setup agent...")
command = "python /tmp/setupAgent.py " + os.environ[AMBARI_PASSPHRASE_VAR_NAME] + " " + self.ambariServer
pssh = PSSH(self.successive_hostlist, self.sshkeyFile, command, self.bootdir)
pssh.run()
out = pssh.getstatus()
# Prepearing report about failed hosts
failed_current = get_difference(self.successive_hostlist, skip_failed_hosts(out))
self.successive_hostlist = skip_failed_hosts(out)
failed = get_difference(self.hostlist, self.successive_hostlist)
logging.info("Parallel ssh returns for setup agent. All failed hosts are: " + str(failed) +
". Failed on last step: " + str(failed_current))
#updating statuses
self.statuses = unite_statuses(self.statuses, out)
retstatus = 0
if not failed:
retstatus = 0
else:
retstatus = 1
pass
def createDoneFiles(self):
""" Creates .done files for every host. These files are later read from Java code.
If .done file for any host is not created, the bootstrap will hang or fail due to timeout"""
for key, value in self.statuses.iteritems():
doneFilePath = os.path.join(self.bootdir, key + ".done")
if not os.path.exists(doneFilePath):
doneFile = open(doneFilePath, "w+")
doneFile.write(str(value["exitstatus"]))
doneFile.close()
pass
def run(self):
""" Copy files and run commands on remote hosts """
ret1 = self.copyNeededFiles()
logging.info("Copying files finished")
ret2 = self.runSetupAgent()
logging.info("Running ssh command finished")
retcode = max(ret1, ret2)
self.createDoneFiles()
return retcode
pass
pass
def main(argv=None):
scriptDir = os.path.realpath(os.path.dirname(argv[0]))
onlyargs = argv[1:]
if len(onlyargs) < 3:
sys.stderr.write("Usage: <comma separated hosts> "
"<tmpdir for storage> <sshkeyFile> <agent setup script> <ambari-server name>\n")
sys.exit(2)
pass
#Parse the input
hostList = onlyargs[0].split(",")
bootdir = onlyargs[1]
sshKeyFile = onlyargs[2]
setupAgentFile = onlyargs[3]
ambariServer = onlyargs[4]
# ssh doesn't like open files
stat = subprocess.Popen(["chmod", "600", sshKeyFile], stdout=subprocess.PIPE)
logging.info("BootStrapping hosts " + pprint.pformat(hostList) +
"using " + scriptDir +
" with sshKey File " + sshKeyFile + " using tmp dir " + bootdir + " ambari: " + ambariServer)
bootstrap = BootStrap(hostList, sshKeyFile, scriptDir, bootdir, setupAgentFile, ambariServer)
ret = bootstrap.run()
#return ret
return 0 # Hack to comply with current usage
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main(sys.argv)