| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| # |
| # This script reads a test configuration and updates the test job descriptions |
| # accordingly. |
| # |
| # The job desciprions contain a collection of "elapsed times", one for each |
| # workitem. Each AE sleeps for this time to simulate compuing. The driver |
| # program, "runducc" submits the jobs accordng to a schedule generated by |
| # this program. |
| # |
| # Each simulated job contains a simulated "start time" based on actual |
| # usage in a development cluster. This is used to calculate a total elapsed |
| # time for the run. The test configuration specifies a comprssion rate and |
| # spread, used thus: |
| # Each actual elapsesd time for the job is divided by the compression rate. |
| # The set of jobs is submitted with random spacing over the time specified |
| # by the spread. |
| # Knowing the original time span for the jobs, it is possible to execute a |
| # compressed run somewhat similar to the original. |
| # |
| # The configuration file also allows weighted-random selection of job memories, |
| # job classes, and service assignments (for running service tests). |
| # |
| import os |
| import sys |
| import getopt |
| import random |
| |
| DUCC_HOME = os.path.abspath(__file__ + '/../../..') |
| sys.path.append(DUCC_HOME + '/admin') |
| |
| from ducc_util import DuccUtil |
| from properties import Properties |
| from ducc import Ducc |
| |
| class Prepare(DuccUtil): |
| |
| def usage(self, msg): |
| if ( msg != None ): |
| print msg |
| |
| print 'Usage:' |
| print ' prepare.py <properties>' |
| sys.exit(1) |
| |
| def error(self, *str): |
| print ' '.join(str) |
| sys.exit(1) |
| |
| def toint(self, props, name, dflt=None): |
| val = props.get(name) |
| if ( val == None ): |
| if ( dflt == None ): |
| self.error("Missing required property", name) |
| else: |
| return dflt |
| |
| try: |
| return int(val) |
| except: |
| self.error("Property", name, "is not an int:", val) |
| |
| def toboolean(self, props, name, dflt=None): |
| if ( str == None ): |
| return deflt |
| return str in ['t', 'T', 'true', 'True', 'TRUE','y', 'Y', 'yes', 'Yes', 'YES'] |
| |
| def toarray(self, props, name): |
| ''' |
| Read 'name' from props, whose value is blank delimeted strings. Each string is |
| used as an associative index 'ndx' into a map. Construct the index |
| 'name.ndx' for each index, look up the value in the props file, and return a |
| map (dictionary) of the values. |
| e.g the properties below this map: {'37': '50', '28': '50'} |
| job.memory = 28 37 |
| job.memory.28 = 50 |
| job.memory.37 = 50 |
| |
| ''' |
| stem = props.get(name) |
| if ( stem == None ): |
| self.error('No such property:', name) |
| vals = stem.split() |
| |
| ret = {} |
| for val in vals: |
| d = props.get(name + '.' + val) |
| if ( d == None ): |
| self.error('No such property:', d) |
| ret[val] = d |
| |
| return ret |
| |
| def show(self, vals, tag): |
| tally = {} |
| for v in vals: |
| if ( tally.has_key(v) ): |
| tally[v] = tally[v] + 1 |
| else: |
| tally[v] = 1 |
| |
| for k in tally.keys(): |
| print '%10s tally: %12s --> %d' % (tag, k, tally[k]) |
| |
| def distributeParameters(self, parmset, count, tag): |
| ''' |
| Randomly assign the values in the memory set to the jobs. Otherwise use the supplied values. |
| |
| parmset is a map where the keys are the set of memory values to set, and the values |
| are the weights used to distribute the memories among the jobs. |
| |
| count is the number of things we need to distribute the stuff in parmset over |
| ''' |
| |
| |
| denom = 0 |
| for v in parmset.values(): |
| denom = denom + int(v) |
| |
| ndx = 0 |
| tmp = [] |
| ovfl = [] |
| |
| # set up an array (list) with the target values according the their configured distribution |
| for k in parmset.keys(): |
| num = float(parmset[k]) |
| ovfl.append(k) |
| val = ((num / denom) * count) |
| bound = int(round(val)) |
| for j in range(0, bound): |
| tmp.append(k) |
| ndx = ndx + 1 |
| |
| # deal with leftovers (non-integral solution to the loop above) |
| if ( ndx < count ): |
| while ( ndx < count ): |
| x = ovfl[ random.randint(0, (len(ovfl)-1)) ] |
| tmp.append(x) |
| ndx = ndx + 1 |
| |
| # gotta love python - now randomly shuffle the values |
| random.shuffle(tmp) |
| self.show(tmp, tag) |
| return tmp |
| |
| def writeControlFile(self, allfiles): |
| ''' |
| allfiles is a list of tuples. Each tuple is (submittod, filename). The list is |
| sorted by tod so all we have to do is bop through it picking up files in the |
| right order. We calculate the "spread" - the approximate elapsed time of the test |
| from the submit tod and compression and write the control file accordingly. |
| ''' |
| size = len(allfiles) |
| files_per_interval = self.spread / size |
| range = files_per_interval * 2 |
| total = 0 |
| print 'Size', size, 'files-per-interval', files_per_interval, 'range', range |
| |
| outf = open(self.testdir + '/job.ctl', 'w') |
| for (tod, fname) in allfiles: |
| outf.write('s -c ' + str(self.compression) + ' ' + fname + '\n') |
| delay = random.randint(0, range) |
| outf.write('[sleep ' + str(delay) + 'S]\n\n') |
| total += delay |
| |
| outf.close() |
| print 'Spread:', self.spread, 'actual:', total |
| |
| def run(self): |
| print 'Prepare starts...' |
| |
| if ( self.randomseed == 'TOD' ): |
| random.seed() |
| else: |
| random.seed(int(self.randomseed)) |
| |
| files = os.listdir(self.srcdir) |
| count = len(files) |
| mem_assignments = self.distributeParameters(self.memory, count, 'memory') |
| class_assignments = self.distributeParameters(self.classes, count, 'class') |
| if ( self.services != None ): |
| service_assignments = self.distributeParameters(self.services, count, 'services') |
| |
| if ( not os.path.exists(self.destdir) ): |
| os.makedirs(self.destdir) |
| |
| ndx = 0 |
| allfiles = [] |
| for f in files: |
| props = Properties() |
| props.load(self.srcdir + '/' + f) |
| cls_assignment = class_assignments[ndx] |
| |
| mem_override = self.maxmem[cls_assignment] |
| if ( mem_override == None ): |
| props.put('memory', mem_assignments[ndx]) |
| else: |
| props.put('memory', mem_override) |
| |
| props.put('class' , cls_assignment) |
| if ( self.services != None ): |
| props.put('services', service_assignments[ndx]) |
| |
| process_override = self.maxproc[cls_assignment] |
| if ( process_override != None ): # force override of max_processes |
| props.put('machines', process_override) |
| |
| type_override = self.type[cls_assignment] |
| if ( type_override != None ): # force override of max_processes |
| props.put('type', 'reserve') |
| |
| fname = self.destdir + '/' + f |
| allfiles.append( (int(props.get('tod')), fname) ) |
| props.write(self.destdir + '/' + f) |
| ndx = ndx + 1 |
| |
| self.writeControlFile(sorted(allfiles)) |
| |
| def from_keys(self, props, base, stem): |
| ''' |
| Given a base dictionary, "base", use its keyset to form a property from the |
| stem and look for it in props. |
| |
| Assign None as the value if not found. |
| ''' |
| ret = {} |
| for bk in base.keys(): |
| k = stem + '.' + bk |
| print 'LOOKING FOR', k |
| if ( props.has_key(k) ): |
| ret[bk] = props.get(k) |
| else: |
| ret[bk] = None |
| return ret |
| |
| |
| def main(self, argv): |
| |
| if ( len(argv) < 1 ): |
| self.usage("Missing 'prepare' properties.") |
| inprops = argv[0] |
| |
| if inprops in ('-h', '-?', '--help', '-help'): |
| self.usage(None) |
| |
| self.testdir = os.path.dirname(inprops) |
| props = Properties() |
| props.load(inprops) |
| |
| self.srcdir = self.testdir + '/' + props.get('src.dir') |
| self.destdir = self.testdir + '/' + props.get('dest.dir') |
| self.spread = self.toint (props, 'submission.spread') |
| self.compression = self.toint (props, 'compression' , 1) |
| self.classes = self.toarray (props, 'scheduling.classes') |
| self.maxproc = self.from_keys(props, self.classes, 'scheduling.maxproc') |
| self.maxmem = self.from_keys(props, self.classes, 'scheduling.maxmem') |
| self.type = self.from_keys(props, self.classes, 'scheduling.type') |
| self.memory = self.toarray (props, 'job.memory') |
| self.randomseed = props.get('random.seed') |
| |
| if ( props.get('job.services') == None ): |
| self.services = None |
| else: |
| self.services = self.toarray (props, 'job.services') |
| |
| print 'Running with' |
| print ' properties :', inprops |
| print ' testdir :', self.testdir |
| print ' srcdir :', self.srcdir |
| print ' destdir :', self.destdir |
| print ' spread :', self.spread |
| print ' compression :', self.compression |
| print ' classes :', self.classes |
| print ' maxproc :', self.maxproc |
| print ' maxmem :', self.maxmem |
| print ' type :', self.type |
| print ' memory :', self.memory |
| print ' services :', self.services |
| print ' randomseed :', self.randomseed |
| |
| self.run() |
| |
| if __name__ == "__main__": |
| prepare = Prepare() |
| prepare.main(sys.argv[1:]) |