blob: 5b0982129e1250fa6ef035d104be0d7ca5cb181c [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Schedule FailMon execution for nodes of file hosts.list, according to
# the properties file conf/global.config.
import time
import ConfigParser
import subprocess
import threading
import random
jobs = []
username = "user"
connections = 10
failmonDir = ""
maxFiles = 100
# This class represents a thread that connects to a set of cluster
# nodes to locally execute monitoring jobs. These jobs are specified
# as a shell command in the constructor.
class sshThread (threading.Thread):
def __init__(self, threadname, username, command, failmonDir):
threading.Thread.__init__(self)
self.name = threadname
self.username = username
self.command = command
self.failmonDir = failmonDir
self.hosts = []
def addHost(self, host):
self.hosts.append(host)
def run (self):
for host in self.hosts:
toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
subprocess.check_call(toRun)
print "Done!"
# This class represents a monitoring job. The param member is a string
# that can be passed in the '--only' list of jobs given to the Java
# class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
# node.
class Job:
def __init__(self, param, interval):
self.param = param
self.interval = interval
self.counter = interval
return
def reset(self):
self.counter = self.interval
# This function reads the configuration file to get the values of the
# configuration parameters.
def getJobs(file):
global username
global connections
global jobs
global failmonDir
global maxFiles
conf = ConfigParser.SafeConfigParser()
conf.read(file)
username = conf.get("Default", "ssh.username")
connections = int(conf.get("Default", "max.connections"))
failmonDir = conf.get("Default", "failmon.dir")
maxFiles = conf.get("Default", "hdfs.files.max")
# Hadoop Log
interval = int(conf.get("Default", "log.hadoop.interval"))
if interval != 0:
jobs.append(Job("hadoopLog", interval))
# System Log
interval = int(conf.get("Default", "log.system.interval"))
if interval != 0:
jobs.append(Job("systemLog", interval))
# NICs
interval = int(conf.get("Default", "nics.interval"))
if interval != 0:
jobs.append(Job("nics", interval))
# CPU
interval = int(conf.get("Default", "cpu.interval"))
if interval != 0:
jobs.append(Job("cpu", interval))
# CPU
interval = int(conf.get("Default", "disks.interval"))
if interval != 0:
jobs.append(Job("disks", interval))
# sensors
interval = int(conf.get("Default", "sensors.interval"))
if interval != 0:
jobs.append(Job("sensors", interval))
# upload
interval = int(conf.get("Default", "upload.interval"))
if interval != 0:
jobs.append(Job("upload", interval))
return
# Compute the gcd (Greatest Common Divisor) of two integerss
def GCD(a, b):
assert isinstance(a, int)
assert isinstance(b, int)
while a:
a, b = b%a, a
return b
# Compute the gcd (Greatest Common Divisor) of a list of integers
def listGCD(joblist):
assert isinstance(joblist, list)
if (len(joblist) == 1):
return joblist[0].interval
g = GCD(joblist[0].interval, joblist[1].interval)
for i in range (2, len(joblist)):
g = GCD(g, joblist[i].interval)
return g
# Merge all failmon files created on the HDFS into a single file
def mergeFiles():
global username
global failmonDir
hostList = []
hosts = open('./conf/hosts.list', 'r')
for host in hosts:
hostList.append(host.strip().rstrip())
randomHost = random.sample(hostList, 1)
mergeCommand = "bin/failmon.sh --mergeFiles"
toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
subprocess.check_call(toRun)
print "Done!"
return
# The actual scheduling is done here
def main():
getJobs("./conf/global.config")
for job in jobs:
print "Configuration: ", job.param, "every", job.interval, "seconds"
globalInterval = listGCD(jobs)
while True :
time.sleep(globalInterval)
params = []
for job in jobs:
job.counter -= globalInterval
if (job.counter <= 0):
params.append(job.param)
job.reset()
if (len(params) == 0):
continue;
onlyStr = "--only " + params[0]
for i in range(1, len(params)):
onlyStr += ',' + params[i]
command = "bin/failmon.sh " + onlyStr
# execute on all nodes
hosts = open('./conf/hosts.list', 'r')
threadList = []
# create a thread for every connection
for i in range(0, connections):
threadList.append(sshThread(i, username, command, failmonDir))
# assign some hosts/connections hosts to every thread
cur = 0;
for host in hosts:
threadList[cur].addHost(host.strip().rstrip())
cur += 1
if (cur == len(threadList)):
cur = 0
for ready in threadList:
ready.start()
for ssht in threading.enumerate():
if ssht != threading.currentThread():
ssht.join()
# if an upload has been done, then maybe we need to merge the
# HDFS files
if "upload" in params:
mergeFiles()
return
if __name__ == '__main__':
main()