blob: ffe8d10a0754fb193d38f9187e3ce732f6e3fc58 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
# The algorithms in this file are lifted from:
# http://votesystem.cvs.sourceforge.net/viewvc/votesystem/votesystem/src/VoteMain.java?view=markup
# ... there is possibly some room for a "more Pythonic" approach. The
# conversion below was done in a straight-forward manner to ensure a
# true and correct algorithm conversion. The debug output precisely
# matches the output of VoteMain.java.
#
import sys
import os.path
import random
import argparse
import configparser
import re
import functools
ELECTED = 1
HOPEFUL = 2
ELIMINATED = 4
ALMOST = 8
ERROR_MARGIN = 0.00001
BILLIONTH = 0.000000001
RE_VOTE = re.compile(r'\[.{19}\]\s+'
r'(?P<voterhash>[\w\d]{32})\s+'
r'(?P<votes>[a-z]{1,26})',
re.I)
VERBOSE = False
#@deprecated
def load_votes(fname):
line = open(fname).readline()
if line.strip() == 'rank order':
lines = open(fname).readlines()
# The input file was processed by nstv-rank.py, or somehow otherwise
# converted to the standard input for VoteMain.jar
names = [s.strip() for s in lines[1].strip().split(',')][1:]
labels = [s.strip() for s in lines[2].strip().split(',')][1:]
assert len(names) == len(labels)
remap = dict(zip(labels, names))
votes = { }
for line in lines[3:]:
parts = line.strip().split(',')
votes[parts[0]] = [remap[l] for l in parts[1:]]
return names, votes
ini_fname = os.path.join(os.path.dirname(fname),
'board_nominations.ini')
labelmap = read_labelmap(ini_fname)
# Construct a label-sorted list of names from the labelmap.
names = [name for _, name in sorted(labelmap.items())]
# Load the raw votes that were recorded.
votes_by_label = read_votefile(fname)
# Remap all labels to names in the votes.
# NOTE: v represents the voter hash. (### why return this?)
votes = dict((v, [labelmap[l] for l in vote])
for v, vote in votes_by_label.items())
return names, votes
def read_votefile(fname):
votes = { }
for line in open(fname).readlines():
match = RE_VOTE.match(line)
if match:
# For a given voter hashcode, record their latest set of votes.
votes[match.group('voterhash')] = match.group('votes')
### we should discard voterhash, and just return .values()
return votes
#@deprecated
def read_nominees(votefile):
ini_fname = os.path.join(os.path.dirname(votefile),
'board_nominations.ini')
return read_labelmap(ini_fname)
def read_labelmap(fname):
if not os.path.exists(fname):
print(f'ERROR: "{fname}" could not be found.', file=sys.stderr)
sys.exit(2)
config = configparser.ConfigParser()
config.read(fname)
try:
return dict(config.items('nominees'))
except:
print(f'ERROR: could not process input file, "{fname}".', file=sys.stderr)
sys.exit(2)
#@deprecated
def run_vote(names, votes, num_seats):
# List of votestrings, each as a list of ordered name choices.
ordered_votes = votes.values()
return run_stv(names, ordered_votes, num_seats)
def run_stv(names: list, ordered_votes: list, num_seats: int):
# NOTE: NAMES must be a list for repeatability purposes. It does not
# need to obey any particular ordering rules, but when re-running
# tallies, NAMES must be presented with the same ordering.
assert len(set(names)) == len(names), "duplicates present!"
assert num_seats > 0
candidates = CandidateList(names)
# name -> Candidate
remap = dict((c.name, c) for c in candidates.l)
# We can test that ordering of voters has no bearing. Perform runs
# and alter the .seed() value.
#ordered_votes = list(ordered_votes); random.seed(1); random.shuffle(ordered_votes); print('VOTE:', ordered_votes[0])
# VOTES is a list of ordered-lists of Candidate objects
votes = [[remap[n] for n in choices] for choices in ordered_votes]
if candidates.count(ELECTED + HOPEFUL) <= num_seats:
dbg('All candidates elected')
candidates.change_state(HOPEFUL, ELECTED)
return candidates
quota = None # not used on first pass
iteration = 1
while candidates.count(ELECTED) < num_seats:
dbg('Iteration %d', iteration)
iteration += 1
quota = iterate_one(quota, votes, candidates, num_seats)
candidates.reverse_random()
dbg('All seats full')
candidates.change_state(HOPEFUL, ELIMINATED)
return candidates
class CandidateList(object):
def __init__(self, names):
num_cand = len(names)
randset = generate_random(num_cand)
#dbg('RANDSET: %s', randset)
self.l = [ ]
for n, r in zip(names, randset):
c = Candidate(n, r, num_cand-1)
self.l.append(c)
def count(self, state):
return sum(int((c.status & state) != 0) for c in self.l)
def change_state(self, from_state, to_state):
any_changed = False
for c in self.l:
if (c.status & from_state) != 0:
c.status = to_state
if to_state == ELECTED:
c.elect()
elif to_state == ELIMINATED:
c.eliminate()
any_changed = True
return any_changed
def reverse_random(self):
# Flip the values to create a different ordering among candidates. Note
# that this will alternate the domain between [0.0, 1.0) and (0.0, 1.0]
for c in self.l:
c.rand = 1.0 - c.rand
def adjust_weights(self, quota):
for c in self.l:
if c.status == ELECTED:
c.adjust_weight(quota)
def print_results(self):
for c in self.l:
print('%-40s%selected' % (c.name, c.status == ELECTED and ' ' or ' not '))
def dbg_display_tables(self, excess):
total = excess
for c in self.l:
dbg('%-20s %15.9f %15.9f', c.name, c.weight, c.vote)
total += c.vote
dbg('%-20s %15s %15.9f', 'Non-transferable', ' ', excess)
dbg('%-20s %15s %15.9f', 'Total', ' ', total)
def calc_aheads(self):
def compare(c1, c2):
if c1.ahead < c2.ahead:
return -1
if c1.ahead == c2.ahead:
### expression for removed cmp() function
return (c1.vote > c2.vote) - (c1.vote < c2.vote)
return 1
### the algorithm below seems very obtuse, to produce very little
### change. It alters .ahead on the first iteration, then does
### minor tweaks afterwards. And it seems to only work pair-wise.
### It feels this could be simplified.
### refer to STV.java::calcAheads()
c_sorted = sorted(self.l, key=functools.cmp_to_key(compare))
last = 0
for i in range(1, len(c_sorted)+1):
if i == len(c_sorted) or c_sorted[last].vote != c_sorted[i].vote:
for c in c_sorted[last:i]:
c.ahead = (i - 1) + last
last = i
def apply_votes(self, votes):
# Reset each candidates vote 0.0
for c in self.l:
c.vote = 0.0
# Each voter has 1 vote. Due to candidate weighting, it might not
# get fully-assigned to candidates. We need to remember this excess.
excess = 0.0
# Now, process that 1 vote.
for choices in votes:
vote = 1.0
# Distribute the vote, according to their ordered wishes.
for c in choices:
if c.status == HOPEFUL:
c.vote += vote
vote = 0.0
break
if c.status != ELIMINATED:
wv = c.weight * vote # weighted vote
c.vote += wv
vote -= wv
# Note: should probably test for floating point margins, but
# it's fine to just let this spill into the EXCESS value.
if vote == 0.0: # nothing left to distribute
break
excess += vote
# Done. Tell caller what we could not distribute.
return excess
class Candidate(object):
def __init__(self, name, rand, ahead):
self.name = name
self.rand = rand
self.ahead = ahead
self.status = HOPEFUL
self.weight = 1.0
self.vote = None # calculated later
def elect(self):
self.status = ELECTED
dbg('Elected: %s', self.name)
def eliminate(self):
self.status = ELIMINATED
self.weight = 0.0
dbg('Eliminated: %s', self.name)
def adjust_weight(self, quota):
assert quota is not None
self.weight = (self.weight * quota) / self.vote
def iterate_one(quota, votes, candidates, num_seats):
# assume that: count(ELECTED) < num_seats
if candidates.count(ELECTED + HOPEFUL) <= num_seats:
dbg('All remaining candidates elected')
candidates.change_state(HOPEFUL, ELECTED)
return None
candidates.adjust_weights(quota)
changed, new_quota, surplus = recalc(votes, candidates, num_seats)
if not changed and surplus < ERROR_MARGIN:
dbg('Remove Lowest (forced)')
exclude_lowest(candidates.l)
return new_quota
def recalc(votes, candidates, num_seats):
excess = candidates.apply_votes(votes)
a1 = [c.ahead for c in candidates.l]
candidates.calc_aheads()
a2 = [c.ahead for c in candidates.l]
if a1 != a2:
pass #print(f'CHANGE:\n {a1}\n {a2}')
candidates.dbg_display_tables(excess)
quota = calc_quota(len(votes), excess, num_seats)
any_changed = elect(quota, candidates, num_seats)
surplus = calc_surplus(quota, candidates)
any_changed |= try_remove_lowest(surplus, candidates)
return any_changed, quota, surplus
#@deprecated
def calc_totals(votes, candidates):
return candidates.apply_votes(votes)
#@deprecated
def calc_aheads(candidates):
candidates.calc_aheads()
def calc_quota(num_voters, excess, num_seats):
if num_seats > 2:
quota = (float(num_voters) - excess) / (float(num_seats + 1) + BILLIONTH)
else:
quota = (float(num_voters) - excess) / float(num_seats + 1)
if quota < ERROR_MARGIN:
raise Exception('Internal Error - very low quota')
dbg('Quota = %.9f', quota)
return quota
def elect(quota, candidates, num_seats):
for c in candidates.l:
if c.status == HOPEFUL and c.vote >= quota:
c.status = ALMOST
any_changed = False
while candidates.count(ELECTED + ALMOST) > num_seats:
dbg('Vote tiebreaker! voters: %d seats: %d',
candidates.count(ELECTED + ALMOST), num_seats)
candidates.change_state(HOPEFUL, ELIMINATED)
exclude_lowest(candidates.l)
any_changed = True # we changed the candidates via exclude_lowest()
any_changed |= candidates.change_state(ALMOST, ELECTED)
return any_changed
def calc_surplus(quota, candidates):
surplus = sum(c.vote - quota for c in candidates.l if c.status == ELECTED)
dbg('Total Surplus = %.9f', surplus)
return surplus
def try_remove_lowest(surplus, candidates):
lowest1 = 1e18
lowest2 = 1e18
which = None
for c in candidates.l:
if c.status == HOPEFUL and c.vote < lowest1:
lowest1 = c.vote
which = c
for c in candidates.l:
if c.status != ELIMINATED and c.vote > lowest1 and c.vote < lowest2:
lowest2 = c.vote
diff = lowest2 - lowest1
if diff >= 0.0:
dbg('Lowest Difference = %.9f - %.9f = %.9f', lowest2, lowest1, diff)
if diff > surplus:
dbg('Remove Lowest (unforced)')
which.eliminate()
return True
return False
def exclude_lowest(candidates):
which = None
use_rand = False
for c in candidates:
if c.status == HOPEFUL or c.status == ALMOST:
if which is None or c.ahead < which.ahead:
which = c
use_rand = False
elif c.ahead == which.ahead:
use_rand = True
if c.rand < which.rand:
which = c
if use_rand:
dbg('Random choice used!')
assert which
which.eliminate()
def generate_random(count):
random.seed(0) ### choose a seed based on input? for now: repeatable.
while True:
# Generate COUNT values in [0.0, 1.0)
# NOTE: use a list (not a set or dict) for repeatable ordering.
values = [random.random() for x in range(count)]
# Use a set() to check for dups. If no dups, then return the values.
if len(set(values)) == count:
return values
def dbg(fmt, *args):
if VERBOSE:
print(fmt % args)
def main(argv):
parser = argparse.ArgumentParser(description="Calculate a winner for a vote")
parser.add_argument('raw_file')
parser.add_argument("-s", "--seats", dest="seats", type=int,
help="Number of seats available, default 9",
default=9)
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="Enable verbose logging", default=False)
args = parser.parse_args(argv)
global VERBOSE
VERBOSE = args.verbose
if not os.path.exists(args.raw_file):
parser.print_help()
sys.exit(1)
ini_fname = os.path.join(os.path.dirname(args.raw_file),
'board_nominations.ini')
labelmap = read_labelmap(ini_fname)
# Construct a label-sorted list of names from the labelmap.
names = [name for _, name in sorted(labelmap.items())]
# Turn votes using labels into by-name.
votes_by_label = read_votefile(args.raw_file).values()
votes = [[labelmap[l] for l in vote] for vote in votes_by_label]
candidates = run_stv(names, votes, args.seats)
candidates.print_results()
print('Done!')
if __name__ == '__main__':
main(sys.argv[1:])