blob: 350cccb6e3b1568a671d448acf2df775e6e9ce6c [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import unittest, getpass, os, sys, re, threading, time
myDirectory = os.path.realpath(sys.argv[0])
rootDirectory = re.sub("/testing/.*", "", myDirectory)
sys.path.append(rootDirectory)
import tempfile
from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
from hodlib.Hod.hod import hodRunner, hodState
from hodlib.Common.desc import NodePoolDesc
excludes = []
# Information about all clusters is written to a file called clusters.state.
from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \
INVALID_STATE_FILE_MSGS
# Temp directory prefix
TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
# build a config object with all required keys for initializing hod.
def setupConf():
cfg = {
'hod' : {
'original-dir' : os.getcwd(),
'stream' : True,
# store all the info about clusters in this directory
'user_state' : '/tmp/hodtest',
'debug' : 3,
'java-home' : os.getenv('JAVA_HOME'),
'cluster' : 'dummy',
'cluster-factor' : 1.8,
'xrs-port-range' : (32768,65536),
'allocate-wait-time' : 3600,
'temp-dir' : '/tmp/hod'
},
# just set everything to dummy. Need something to initialize the
# node pool description object.
'resource_manager' : {
'id' : 'dummy',
'batch-home' : 'dummy',
'queue' : 'dummy',
}
}
cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
return cfg
# Test class that defines methods to test invalid arguments to hod operations.
class test_InvalidArgsOperations(unittest.TestCase):
def setUp(self):
self.cfg = setupConf()
# initialize the mock objects
self.log = MockLogger()
self.cluster = MockHadoopCluster()
# Use the test logger. This will be used for test verification.
self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
# Create the hodState object to set the test state you want.
self.state = hodState(self.cfg['hod']['user_state'])
if not os.path.exists(self.cfg['hod']['user_state']):
os.path.mkdir(self.cfg['hod']['user_state'])
p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
# ensure cluster data file exists, so write works in the tests.
f = open(p, 'w')
f.close()
def tearDown(self):
# clean up cluster data file and directory
p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
os.remove(p)
os.rmdir(self.cfg['hod']['user_state'])
# Test that list works with deleted cluster directories - more than one entries which are invalid.
def testListInvalidDirectory(self):
userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1',
os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
self.__setupClusterState(userState)
self.client._op_list(['list'])
# assert that required errors are logged.
for clusterDir in userState.keys():
self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
% (userState[clusterDir], clusterDir), 'info'))
# simulate a test where a directory is deleted, and created again, without deallocation
clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
os.makedirs(clusterDir)
self.assertTrue(os.path.isdir(clusterDir))
userState = { clusterDir : '123.dummy.id3' }
self.__setupClusterState(userState, False)
self.client._op_list(['list'])
self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
% (userState[clusterDir], clusterDir), 'info'))
os.rmdir(clusterDir)
# Test that info works with a deleted cluster directory
def testInfoInvalidDirectory(self):
clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
userState = { clusterDir : '456.dummy.id' }
self.__setupClusterState(userState)
self.client._op_info(['info', clusterDir])
self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
# simulate a test where a directory is deleted, and created again, without deallocation
clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
os.makedirs(clusterDir)
self.assertTrue(os.path.isdir(clusterDir))
userState = { clusterDir : '456.dummy.id1' }
self.__setupClusterState(userState, False)
self.client._op_info(['info', clusterDir])
self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
os.rmdir(clusterDir)
# Test info works with an invalid cluster directory
def testInfoNonExistentDirectory(self):
clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
self.client._op_info(['info', clusterDir])
self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
# Test that deallocation works on a deleted cluster directory
# by clearing the job, and removing the state
def testDeallocateInvalidDirectory(self):
clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
jobid = '789.dummy.id'
userState = { clusterDir : jobid }
self.__setupClusterState(userState)
self.client._op_deallocate(['deallocate', clusterDir])
# verify job was deleted
self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
# verify appropriate message was logged.
self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
# verify that the state information was cleared.
userState = self.state.read(TEST_CLUSTER_DATA_FILE)
self.assertFalse(clusterDir in userState.keys())
# simulate a test where a directory is deleted, and created again, without deallocation
clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
os.makedirs(clusterDir)
self.assertTrue(os.path.isdir(clusterDir))
jobid = '789.dummy.id1'
userState = { clusterDir : jobid }
self.__setupClusterState(userState, False)
self.client._op_deallocate(['deallocate', clusterDir])
# verify job was deleted
self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
# verify appropriate message was logged.
self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
# verify that the state information was cleared.
userState = self.state.read(TEST_CLUSTER_DATA_FILE)
self.assertFalse(clusterDir in userState.keys())
os.rmdir(clusterDir)
# Test that deallocation works on a nonexistent directory.
def testDeallocateNonExistentDirectory(self):
clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
self.client._op_deallocate(['deallocate', clusterDir])
# there should be no call..
self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
# Test that allocation on an previously deleted directory fails.
def testAllocateOnDeletedDirectory(self):
clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
os.makedirs(clusterDir)
self.assertTrue(os.path.isdir(clusterDir))
jobid = '1234.abc.com'
userState = { clusterDir : jobid }
self.__setupClusterState(userState, False)
self.client._op_allocate(['allocate', clusterDir, '3'])
self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "\
"cluster directory '%s'. HOD cannot determine if this cluster "\
"can be automatically deallocated. Deallocate the cluster if it "\
"is unused." % (clusterDir), 'critical'))
os.rmdir(clusterDir)
def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
for clusterDir in clusterStateMap.keys():
# ensure directory doesn't exist, just in case.
if verifyDirIsAbsent:
self.assertFalse(os.path.exists(clusterDir))
# set up required state.
self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
# verify everything is stored correctly.
state = self.state.read(TEST_CLUSTER_DATA_FILE)
for clusterDir in clusterStateMap.keys():
self.assertTrue(clusterDir in state.keys())
self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
class test_InvalidHodStateFiles(unittest.TestCase):
def setUp(self):
self.rootDir = '/tmp/hod-%s' % getpass.getuser()
self.cfg = setupConf() # creat a conf
# Modify hod.user_state
self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
prefix='HodTestSuite.test_InvalidHodStateFiles_')
self.log = MockLogger() # mock logger
self.cluster = MockHadoopCluster() # mock hadoop cluster
self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
self.state = hodState(self.cfg['hod']['user_state'])
self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \
TEST_CLUSTER_DATA_FILE)
self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
prefix='HodTestSuite.test_InvalidHodStateFiles_')
def testOperationWithInvalidStateFile(self):
jobid = '1234.hadoop.apache.org'
# create user state file with invalid permissions
stateFile = open(self.statePath, "w")
os.chmod(self.statePath, 000) # has no read/write permissions
self.client._hodRunner__cfg['hod']['operation'] = \
"info %s" % self.clusterDir
ret = self.client.operation()
os.chmod(self.statePath, 700) # restore permissions
stateFile.close()
os.remove(self.statePath)
# print self.log._MockLogger__logLines
self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
os.path.realpath(self.statePath), 'critical'))
self.assertEquals(ret, 1)
def testAllocateWithInvalidStateFile(self):
jobid = '1234.hadoop.apache.org'
# create user state file with invalid permissions
stateFile = open(self.statePath, "w")
os.chmod(self.statePath, 0400) # has no write permissions
self.client._hodRunner__cfg['hod']['operation'] = \
"allocate %s %s" % (self.clusterDir, '3')
ret = self.client.operation()
os.chmod(self.statePath, 700) # restore permissions
stateFile.close()
os.remove(self.statePath)
# print self.log._MockLogger__logLines
self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \
os.path.realpath(self.statePath), 'critical'))
self.assertEquals(ret, 1)
def testAllocateWithInvalidStateStore(self):
jobid = '1234.hadoop.apache.org'
self.client._hodRunner__cfg['hod']['operation'] = \
"allocate %s %s" % (self.clusterDir, 3)
###### check with no executable permissions ######
stateFile = open(self.statePath, "w") # create user state file
os.chmod(self.cfg['hod']['user_state'], 0600)
ret = self.client.operation()
os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
stateFile.close()
os.remove(self.statePath)
# print self.log._MockLogger__logLines
self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
os.path.realpath(self.statePath), 'critical'))
self.assertEquals(ret, 1)
###### check with no write permissions ######
stateFile = open(self.statePath, "w") # create user state file
os.chmod(self.cfg['hod']['user_state'], 0500)
ret = self.client.operation()
os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
stateFile.close()
os.remove(self.statePath)
# print self.log._MockLogger__logLines
self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
os.path.realpath(self.statePath), 'critical'))
self.assertEquals(ret, 1)
def tearDown(self):
if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
if os.path.exists(self.cfg['hod']['user_state']):
os.rmdir(self.cfg['hod']['user_state'])
class HodTestSuite(BaseTestSuite):
def __init__(self):
# suite setup
BaseTestSuite.__init__(self, __name__, excludes)
pass
def cleanUp(self):
# suite tearDown
pass
def RunHodTests():
# modulename_suite
suite = HodTestSuite()
testResult = suite.runTests()
suite.cleanUp()
return testResult
if __name__ == "__main__":
RunHodTests()