blob: 2b21a12b3cb6d6db10f3b47dbb524fbcc29884bd [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import os
import sys
from ConfigParser import ConfigParser
from subprocess import Popen, PIPE
class JavaConfig:
'''Enable access to properities in java siteConfig file'''
def __init__(self, fname):
self.prop_d = {}
for line in open(fname):
line = line.strip();
if line.startswith('#') or len(line) == 0:
continue
pair = line.split('=')
if len(pair) != 2:
log.error("Invalid property (%s)" % line)
continue
self.prop_d[pair[0].strip()] = pair[1].strip()
def get(self, prop):
return self.prop_d[prop]
def file_len(fname):
i=0
for line in open(fname):
i += 1
return i
def runTest(testName, siteConfig, testDir, numNodes, fdata):
log('Stopping accumulo')
syscall('$ACCUMULO_HOME/bin/accumulo-cluster stop')
log('Creating tservers file for this test')
tserversPath = siteConfig.get('TSERVERS')
nodesPath = testDir+'/nodes/%d' % numNodes
syscall('head -n %d %s > %s' % (numNodes,tserversPath,nodesPath))
log('Copying tservers file to accumulo config')
syscall('cp '+nodesPath+' $ACCUMULO_CONF_DIR/tservers');
log('Removing /accumulo directory in HDFS')
syscall("hadoop fs -rmr /accumulo")
log('Initializing new Accumulo instance')
instance = siteConfig.get('INSTANCE_NAME')
passwd = siteConfig.get('PASSWORD')
syscall('printf "%s\nY\n%s\n%s\n" | $ACCUMULO_HOME/bin/accumulo init' % (instance, passwd, passwd))
log('Starting new Accumulo instance')
syscall('$ACCUMULO_HOME/bin/accumulo-cluster start')
sleepTime = 30
if numNodes > 120:
sleepTime = int(numNodes / 4)
log('Sleeping for %d seconds' % sleepTime)
time.sleep(sleepTime)
log('Setting up %s test' % testName)
syscall('$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.scalability.Run %s setup %s' % (testName, numNodes))
log('Sleeping for 5 seconds')
time.sleep(5)
log('Starting %s clients' % testName)
numThreads = numNodes
if int(numNodes) > 128:
numThreads='128'
syscall('pssh -P -h %s -p %s "$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.scalability.Run %s client %s >/tmp/scale.out 2>/tmp/scale.err &" < /dev/null' % (nodesPath, numThreads, testName, numNodes))
log('Sleeping for 30 sec before checking how many clients started...')
time.sleep(30)
output = Popen(["hadoop fs -ls /accumulo-scale/clients"], stdout=PIPE, shell=True).communicate()[0]
num_clients = int(output.split()[1])
log('%s clients started!' % num_clients)
log('Waiting until %d clients finish.' % num_clients)
last = 0
done = 0
while done < num_clients:
time.sleep(5)
output = Popen(["hadoop fs -ls /accumulo-scale/results"], stdout=PIPE, shell=True).communicate()[0]
if not output:
sys.stdout.write('.')
sys.stdout.flush()
continue
done = int(output.split()[1])
if done != last:
sys.stdout.write('.%s' % done)
else:
sys.stdout.write('.')
sys.stdout.flush()
last = done
sys.stdout.flush()
log('\nAll clients are finished!')
log('Copying results from HDFS')
resultsDir = "%s/results/%s" % (testDir, numNodes)
syscall('hadoop fs -copyToLocal /accumulo-scale/results %s' % resultsDir)
log('Calculating results from clients')
times = []
totalMs = 0L
totalEntries = 0L
totalBytes = 0L
for fn in os.listdir(resultsDir):
for line in open('%s/%s' % (resultsDir,fn)):
words = line.split()
if words[0] == 'ELAPSEDMS':
ms = long(words[1].strip())
totalMs += ms
times.append(ms)
totalEntries += long(words[2].strip())
totalBytes += long(words[3].strip())
times.sort()
print times
numClients = len(times)
min = times[0] / 1000
avg = (float(totalMs) / numClients) / 1000
median = times[int(numClients/2)] / 1000
max = times[numClients-1] / 1000
q1 = times[int(numClients/4)] / 1000
q3 = times[int((3*numClients)/4)] / 1000
log('Tservs\tClients\tMin\tQ1\tAvg\tMed\tQ3\tMax\tEntries\tMB')
resultStr = '%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%dM\t%d' % (numNodes, numClients, min, q1, avg, median, q3, max, totalEntries / 1000000, totalBytes / 1000000)
log(resultStr)
fdata.write(resultStr + '\n')
fdata.flush()
time.sleep(5)
log('Tearing down %s test' % testName)
syscall('$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.scalability.Run %s teardown %s' % (testName, numNodes))
time.sleep(10)
def syscall(cmd):
log('> %s' % cmd)
os.system(cmd)
def run(cmd, **kwargs):
log.debug("Running %s", ' '.join(cmd))
handle = Popen(cmd, stdout=PIPE, **kwargs)
out, err = handle.communicate()
log.debug("Result %d (%r, %r)", handle.returncode, out, err)
return handle.returncode
def log(msg):
print msg
sys.stdout.flush()
def main():
if not os.getenv('ACCUMULO_HOME'):
raise 'ACCUMULO_HOME needs to be set!'
if not os.getenv('ACCUMULO_CONF_DIR'):
raise 'ACCUMULO_CONF_DIR needs to be set!'
if not os.getenv('HADOOP_HOME'):
raise 'HADOOP_HOME needs to be set!'
if len(sys.argv) != 2:
log('Usage: run.py <testName>')
sys.exit()
testName = sys.argv[1]
logging.basicConfig(level=logging.DEBUG)
log('Creating test directory structure')
testDir = 'test-%d' % time.time()
nodesDir = testDir+'/nodes'
resultsDir = testDir+'/results'
syscall('mkdir %s' % testDir)
syscall('mkdir %s' % nodesDir)
syscall('mkdir %s' % resultsDir)
log('Removing current /accumulo-scale directory')
syscall('hadoop fs -rmr /accumulo-scale')
log('Creating new /accumulo-scale directory structure')
syscall('hadoop fs -mkdir /accumulo-scale')
syscall('hadoop fs -mkdir /accumulo-scale/clients')
syscall('hadoop fs -mkdir /accumulo-scale/results')
syscall('hadoop fs -chmod -R 777 /accumulo-scale')
log('Copying config to HDFS')
syscall('hadoop fs -copyFromLocal ./conf /accumulo-scale/conf')
siteConfig = JavaConfig('conf/site.conf');
tserversPath = siteConfig.get('TSERVERS')
maxNodes = file_len(tserversPath)
fdata = open('%s/scale.dat' % testDir, 'w')
fdata.write('Tservs\tClients\tMin\tAvg\tMed\tMax\tEntries\tMB\n')
for numNodes in siteConfig.get('TEST_CASES').split(','):
log('Running %s test with %s nodes' % (testName, numNodes))
if int(numNodes) > maxNodes:
logging.error('Skipping %r test case as tservers file %r contains only %r nodes', numNodes, tserversPath, maxNodes)
continue
runTest(testName, siteConfig, testDir, int(numNodes), fdata)
sys.stdout.flush()
if __name__ == '__main__':
main()