| #! /usr/local/bin/python |
| |
| import sys |
| import datetime |
| |
| import pdb |
| #pdb.set_trace() |
| |
| # Test and declaration generator for linear_interpolation |
| |
| # Utility |
| |
| def kwot(s): |
| """Single quote a string, doubling contained quotes as needed.""" |
| return "'" + "''".join(s.split("'")) + "'" |
| |
| # Following section is just fragile helper functions to produce triples |
| # of values (v0, v, v1) such that, for any choice of scale and transform |
| # (scale applied first, transform last): |
| # |
| # (v-v0)/(v1-v) = p1/p2 |
| # |
| # The idea is that, by keeping p1 and p2 uniform in a test set, we can |
| # know the result of linear interpolation independently of the actual |
| # function linear_interpolate and include it in the regressions to make |
| # it easy to identify issues. |
| |
| def int_triple(p1, p2, scale, transform): |
| return tuple( [x*scale + transform for x in [0, p1, p1+p2]] ) |
| |
| def num_triple(p1, p2, scale, transform): |
| return tuple( [str(x) for x in int_triple(p1, p2, scale, transform)] ) |
| |
| def date_triple(p1, p2, scale, transform, basedate): |
| i = int_triple(p1, p2, scale, transform) |
| g = basedate.toordinal() |
| d = [datetime.date.fromordinal(x + g) for x in i] |
| return tuple( [kwot(x.isoformat()) for x in d] ) |
| |
| def time_offset_by_minutes(t, d): |
| x = t.hour * 60 + t.minute + d |
| h = x / 60 |
| m = x - 60 * h |
| h = h - 24 * (h / 24) |
| return datetime.time(hour = h, minute = m) |
| |
| def time_triple(p1, p2, scale, transform, basetime): |
| i = int_triple(p1, p2, scale, transform) |
| t = [ time_offset_by_minutes(basetime, x) for x in i] |
| return tuple( [kwot(x.isoformat()) for x in t] ) |
| |
| def datetime_triple(p1, p2, scale, transform, basestamp): |
| i = int_triple(p1, p2, scale, transform) |
| d = [ datetime.timedelta(minutes=x) for x in i ] |
| return [ kwot( (basestamp + x).isoformat() ) for x in d] |
| |
| def interval_triple(p1, p2, scale, transform, baseminutes): |
| i = int_triple(p1, p2, scale, transform+baseminutes) |
| return tuple( [ kwot(str(x) + ' minutes') for x in i ] ) |
| |
| |
| # The following table drives tests and declarations per data type. |
| # The keys are type SQL type names that are known to cattulus.pl. |
| # The values are tuples of |
| # - a 3-tuple of values in SQL form (no casts): low, middle, high. |
| # - a 3-tuple of proportionally spread values like the first. |
| # - an appropriate C type declarator (beware Interval's pointer-ness). |
| # - position as in index into array of types. |
| # |
| # The position value in important to ensure OID consistency. |
| # |
| # The delta values fix the proportions of the 3-tuple values. |
| |
| delta1 = 1 |
| delta2 = 4 |
| |
| type_dict = { |
| 'int8' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'int64', |
| 0), |
| 'int4' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'int32', |
| 1), |
| 'int2' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'int2', |
| 2), |
| 'float8' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'float8', |
| 3), |
| 'float4' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'float4', |
| 4), |
| 'date' : ( |
| date_triple(delta1, delta2, 10, 10, datetime.date(2001, 1, 1)), |
| date_triple(delta1, delta2, 10, 20, datetime.date(2010, 1, 1)), |
| 'DateADT', |
| 5), |
| 'time' : ( |
| time_triple(delta1, delta2, 5, 20, datetime.time(hour=10)), |
| time_triple(delta1, delta2, 10, 300, datetime.time(hour=10)), |
| 'TimeADT', |
| 6), |
| 'timestamp' : ( |
| datetime_triple(delta1, delta2, 1000, 2000, datetime.datetime(2010, 1, 1)), |
| datetime_triple(delta1, delta2, 5000, 1000, datetime.datetime(2012, 6, 1)), |
| 'Timestamp', |
| 7), |
| 'timestamptz' : ( |
| datetime_triple(delta1, delta2, 1000, 2000, datetime.datetime(2010, 1, 1)), |
| datetime_triple(delta1, delta2, 5000, 1000, datetime.datetime(2012, 6, 1)), |
| 'TimestampTz', |
| 8), |
| 'interval' : ( |
| interval_triple(delta1, delta2, 20, 10, 55), |
| interval_triple(delta1, delta2, 10, 20, 30), |
| 'Interval', |
| 9), |
| 'numeric' : ( |
| num_triple(delta1, delta2, 100, 100), |
| num_triple(delta1, delta2, 50, 2000), |
| 'Numeric', |
| 10), |
| } |
| |
| |
| # For OID assignment we choose to order the types and assign ascending |
| # OID values starting from a base. The caller is responsible for setting |
| # base and maintaining the order of assignment. |
| |
| oid_base = 6072 |
| |
| def ordered_types(): |
| """List of types in canonical order of OID assignment.""" |
| keys = type_dict.keys() |
| ordered_keys = [None for k in keys] |
| for key in keys: |
| pos = type_dict[key][3] |
| ordered_keys[pos] = key |
| assert not None in ordered_keys |
| return ordered_keys |
| |
| # Convenient wrapper for one of the table's 3-tuples. |
| class Coordinate(object): |
| |
| def __init__(self, ctype, lmhtuple): |
| |
| self._type = ctype |
| (self._low, self._mid, self._high) = lmhtuple |
| |
| def low(self): |
| return self._low + '::' + self._type |
| |
| def mid(self): |
| return self._mid + '::' + self._type |
| |
| def high(self): |
| return self._high + '::' + self._type |
| |
| |
| def linear_interpolate(x, y, x0, y0, x1, y1): |
| """Format a call to linear_interpolate for the given arguments and expected result. |
| """ |
| |
| fmt = """ |
| select |
| linear_interpolate({x}, {x0}, {y0}, {x1}, {y1}), |
| {y} as answer, |
| {y} = linear_interpolate({x}, {x0}, {y0}, {x1}, {y1}) as match ;""" |
| |
| return fmt.format(x=x, y=y, x0=x0, y0=y0, x1=x1, y1=y1) |
| |
| def preliminary_tests(verbose): |
| """ Preliminary tests (SQL), optionally verbose, as '\n'-delimited string. |
| """ |
| |
| prelim = """ |
| -- A "generic" unsupported type. |
| |
| select linear_interpolate('x'::text, 'x0'::text, 0, 'x1'::text, 1000); |
| select linear_interpolate(5, 0, 'y0'::text, 100, 'y1'::text); |
| |
| |
| -- Check that "divide by zero" returns null""" |
| |
| prelim = prelim.split('\n') |
| fmt = """select linear_interpolate({x}, {x0}, {y0}, {x1}, {y1});""" |
| |
| for t in type_dict: |
| |
| a = Coordinate(t, type_dict[t][0]) |
| o = Coordinate('int4', type_dict['int4'][1]) |
| s = fmt.format(x=a.mid(), x0=a.low(), y0=o.low(), x1=a.low(), y1=o.high()) |
| prelim = prelim + s.split('\n') |
| |
| return '\n'.join(prelim) |
| |
| def basic_tests(abscissa_type, ordinate_type, verbose): |
| """Per-type tests (SQL), optionally verbose, as '\n'-delimited string. |
| """ |
| |
| abscissa = Coordinate(abscissa_type, type_dict[abscissa_type][0]) |
| ordinate = Coordinate(ordinate_type, type_dict[ordinate_type][1]) |
| |
| if verbose: |
| lst = [ |
| '', |
| '\qecho', |
| '\qecho Check interpolation correctness: %s --> %s' % (abscissa_type, ordinate_type), |
| ] |
| prolog = [] |
| else: |
| lst = [] |
| prolog = [ |
| '', |
| '', |
| '-- Abscissa: %s, Ordinate: %s' % (abscissa_type, ordinate_type), |
| '-- Check correctness - all results should have match = t' |
| ] |
| |
| # Use the triples in all combinations to test outlying cases as well as "sweet |
| # spot" cases. |
| |
| tst = linear_interpolate( |
| abscissa.mid(), ordinate.mid(), |
| abscissa.low(), ordinate.low(), |
| abscissa.high(), ordinate.high() ) |
| lst = lst + tst.split('\n') |
| |
| tst = linear_interpolate( |
| abscissa.mid(), ordinate.mid(), |
| abscissa.high(), ordinate.high(), |
| abscissa.low(), ordinate.low() ) |
| lst = lst + tst.split('\n') |
| |
| tst = linear_interpolate( |
| abscissa.low(), ordinate.low(), |
| abscissa.mid(), ordinate.mid(), |
| abscissa.high(), ordinate.high() ) |
| lst = lst + tst.split('\n') |
| |
| tst = linear_interpolate( |
| abscissa.low(), ordinate.low(), |
| abscissa.high(), ordinate.high(), |
| abscissa.mid(), ordinate.mid() ) |
| lst = lst + tst.split('\n') |
| |
| tst = linear_interpolate( |
| abscissa.high(), ordinate.high(), |
| abscissa.mid(), ordinate.mid(), |
| abscissa.low(), ordinate.low() ) |
| lst = lst + tst.split('\n') |
| |
| tst = linear_interpolate( |
| abscissa.high(), ordinate.high(), |
| abscissa.low(), ordinate.low(), |
| abscissa.mid(), ordinate.mid() ) |
| lst = lst + tst.split('\n') |
| |
| # Include one trivial case |
| tst = linear_interpolate( |
| abscissa.mid(), ordinate.mid(), |
| abscissa.mid(), ordinate.mid(), |
| abscissa.mid(), ordinate.mid() ) |
| lst = lst + tst.split('\n') |
| |
| return '\n'.join(prolog + lst) |
| |
| def all_tests(verbose): |
| result = preliminary_tests(verbose) |
| |
| for abscissa_type in type_dict: |
| result = result + basic_tests(abscissa_type, 'int4', verbose) |
| |
| for ordinate_type in type_dict: |
| result = result + basic_tests('int4', ordinate_type, verbose) |
| |
| return result |
| |
| def regression_tests(): |
| return all_tests(False) |
| |
| def readable_tests(): |
| return all_tests(True) |
| |
| declared_description = 'linear interpolation: x, x0,y0, x1,y1' |
| |
| def pg_proc_declarations(): |
| template = """ |
| CREATE FUNCTION linear_interpolate( |
| anyelement, |
| anyelement, |
| {T}, |
| anyelement, |
| {T} |
| ) |
| RETURNS {T} |
| LANGUAGE internal IMMUTABLE STRICT |
| AS 'linterp_{t}' |
| WITH (OID={oid}, DESCRIPTION="{desc}");""" |
| |
| |
| |
| result = ['-- for cdb_pg/src/include/catalog/pg_proc.sql', ''] |
| |
| fmt = ' '.join([x.strip() for x in template.split('\n')]) |
| next_oid = oid_base |
| |
| all_types = ordered_types() |
| |
| for sql_type in all_types: |
| c_type = type_dict[sql_type][2] |
| result = result + [fmt.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description)] |
| next_oid = next_oid + 1 |
| |
| return '\n'.join(result) |
| |
| def upgrade_declarations(): |
| upgrade_1 = """ |
| CREATE FUNCTION @gpupgradeschemaname@.linear_interpolate( |
| anyelement, |
| anyelement, |
| {T}, |
| anyelement, |
| {T} |
| ) |
| RETURNS {T} |
| LANGUAGE internal IMMUTABLE STRICT |
| AS 'linterp_{t}' |
| WITH (OID={oid}, DESCRIPTION="{desc}");""" |
| |
| upgrade_2 = """ |
| COMMENT ON FUNCTION @gpupgradeschemaname@.linear_interpolate( |
| anyelement, |
| anyelement, |
| {T}, |
| anyelement, |
| {T} |
| ) |
| IS '{desc}';""" |
| |
| result = ['-- for src/test/regress/data/upgradeXX/upg2_catupgrade_XXX.sql.in', ''] |
| |
| upg_1 = ' '.join([x.strip() for x in upgrade_1.split('\n')]) |
| upg_2 = ' '.join([x.strip() for x in upgrade_2.split('\n')]) |
| next_oid = oid_base |
| |
| all_types = ordered_types() |
| |
| for sql_type in all_types: |
| c_type = type_dict[sql_type][2] |
| result.append( upg_1.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description) ) |
| result.append( upg_2.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description) ) |
| result.append( '' ) |
| next_oid = next_oid + 1 |
| |
| return '\n'.join(result) |
| |
| # |
| # Interpret the arguments, write result to standard out. |
| |
| def main(): |
| argmap = { |
| 'readable' : readable_tests, |
| 'regression' : regression_tests, |
| 'pg_proc' : pg_proc_declarations, |
| 'upgrade' : upgrade_declarations |
| } |
| efmt = 'argument must be one of (%s), not "%s"' % (', '.join(argmap.keys()), "%s") |
| |
| if len(sys.argv) == 1: |
| fn = argmap['readable'] |
| elif len(sys.argv) == 2 and sys.argv[1] in argmap: |
| fn = argmap[sys.argv[1]] |
| else: |
| sys.exit(efmt % ' '.join(sys.argv[1:])) |
| |
| print fn() |
| |
| if __name__ == '__main__': |
| main() |
| |