import hashlib
import json
import os
import random
import time
from lib import constants
import elasticsearch
class SteveESWrapper(object):
Class for rewriting old-style queries to the new ones,
where doc_type is an integral part of the DB name
def __init__(self, ES):
self.ES = ES
def get(self, index, doc_type, id):
return self.ES.get(index = index+'_'+doc_type, doc_type = '_doc', id = id)
def exists(self, index, doc_type, id):
return self.ES.exists(index = index+'_'+doc_type, doc_type = '_doc', id = id)
def delete(self, index, doc_type, id):
return self.ES.delete(index = index+'_'+doc_type, doc_type = '_doc', id = id)
def index(self, index, doc_type, id = None, body = None):
return self.ES.index(index = index+'_'+doc_type, doc_type = '_doc', id = id, body = body)
def update(self, index, doc_type, id, body):
return self.ES.update(index = index+'_'+doc_type, doc_type = '_doc', id = id, body = body)
def scroll(self, scroll_id, scroll):
return self.ES.scroll(scroll_id = scroll_id, scroll = scroll)
def delete_by_query(self, **kwargs):
return self.ES.delete_by_query(**kwargs)
def search(self, index, doc_type, size = 100, scroll = None, _source_include = None, body = None, q = None, sort = None):
if q:
body = {
"query": {
"query_string": {
"query": q
if sort and body:
if '.keyword' not in sort:
sort = sort + ".keyword"
body['sort'] = [
{ sort: 'asc'}
index = index+'_'+doc_type,
doc_type = '_doc',
size = size,
scroll = scroll,
_source_include = _source_include,
body = body
def count(self, index, doc_type = '*', body = None):
return self.ES.count(
index = index+'_'+doc_type,
doc_type = '_doc',
body = body
class SteveESWrapperSeven(object):
Class for rewriting old-style queries to the >= 7.x ones,
where doc_type is an integral part of the DB name and NO DOC_TYPE!
def __init__(self, ES):
self.ES = ES
def get(self, index, doc_type, id):
return self.ES.get(index = index+'_'+doc_type, id = id)
def exists(self, index, doc_type, id):
return self.ES.exists(index = index+'_'+doc_type, id = id)
def delete(self, index, doc_type, id):
return self.ES.delete(index = index+'_'+doc_type, id = id)
def index(self, index, doc_type, id = None, body = None):
return self.ES.index(index = index+'_'+doc_type, id = id, body = body)
def update(self, index, doc_type, id, body):
return self.ES.update(index = index+'_'+doc_type, id = id, body = body)
def scroll(self, scroll_id, scroll):
return self.ES.scroll(scroll_id = scroll_id, scroll = scroll)
def delete_by_query(self, **kwargs):
return self.ES.delete_by_query(**kwargs)
def search(self, index, doc_type, size = 100, scroll = None, _source_include = None, body = None, q = None, sort = None):
if q:
body = {
"query": {
"query_string": {
"query": q
if sort and body:
if '.keyword' not in sort:
sort = sort + ".keyword"
body['sort'] = [
{ sort: 'asc'}
index = index+'_'+doc_type,
size = size,
scroll = scroll,
_source_includes = _source_include,
body = body
def count(self, index, doc_type = '*', body = None):
return self.ES.count(
index = index+'_'+doc_type,
body = body
class SteveDatabase(object):
def __init__(self, config):
self.config = config
self.dbname = config.get('elasticsearch','index')
self.ES = elasticsearch.Elasticsearch([{
'host': config.get('elasticsearch', 'host'),
'port': int(config.get('elasticsearch','port')),
'use_ssl': config.get('elasticsearch', 'secure'),
'verify_certs': False,
# IMPORTANT BIT: Figure out if this is ES < 6.x, 6.x or >= 7.x.
# If so, we're using the new ES DB mappings, and need to adjust ALL
# ES calls to match this.
self.ESversion = int(['version']['number'].split('.')[0])
if self.ESversion >= 7:
self.ES = SteveESWrapperSeven(self.ES)
elif self.ESVersion >= 6:
self.ES = SteveESWrapper(self.ES)
class ElasticSearchBackend:
es = None
def __init__(self, config):
" Init - get config and turn it into an ES instance"
self.index = config.get("elasticsearch", "index") if config.has_option("elasticsearch", "index") else "steve"
self.DB = SteveDatabase(config)
# Check that we have a 'steve' index. If not, create it.
if not self.DB.ES.ES.indices.exists(self.index):
self.DB.ES.ES.indices.create(index = self.index, body = {
"settings": {
"number_of_shards" : 3,
"number_of_replicas" : 1
def document_exists(self, election, *issue):
"Does this election or issue exist?"
doc = "elections"
eid = election
if issue and issue[0]:
doc = "issues"
eid = hashlib.sha224(election + "/" + issue[0]).hexdigest()
return self.DB.ES.exists(index=self.index, doc_type=doc, id=eid)
def get_basedata(self, election):
"Get base data from an election"
res = self.DB.ES.get(index=self.index, doc_type="elections", id=election)
if res:
return res['_source']
return None
def close(self, election, reopen = False):
"Mark an election as closed"
basedata = self.get_basedata(election)
if reopen == True:
basedata['closed'] = False
basedata['closed'] = True
self.DB.ES.index(index=self.index, doc_type="elections", id=election, body = basedata )
def issue_get(self, electionID, issueID):
"Get JSON data from an issue"
issuedata = None
ihash = ""
iid = hashlib.sha224(electionID + "/" + issueID).hexdigest()
res = self.DB.ES.get(index=self.index, doc_type="issues", id=iid)
if res:
issuedata = res['_source']
ihash = hashlib.sha224(json.dumps(issuedata)).hexdigest()
return issuedata, ihash
def votes_get(self, electionID, issueID):
"Read votes and return as a dict"
res =, doc_type="votes", q = "election:%s AND issue:%s" % (electionID, issueID), size = 9999)
results = len(res['hits']['hits'])
if results > 0:
votes = {}
for entry in res['hits']['hits']:
votes[entry['_source']['key']] = entry['_source']['data']['vote']
return votes
return {}
def votes_get_raw(self, electionID, issueID):
"Read votes and return raw format"
res =, doc_type="votes", q = "election:%s AND issue:%s" % (electionID, issueID), size = 9999)
results = len(res['hits']['hits'])
if results > 0:
votes = []
for entry in res['hits']['hits']:
return votes
return {}
def vote_history(self, electionID, issueID):
"Read vote history and return raw format"
res =, doc_type="vote_history", sort = "data.timestamp", q = "election:%s AND issue:%s" % (electionID, issueID), size = 9999)
results = len(res['hits']['hits'])
if results > 0:
votes = []
for entry in res['hits']['hits']:
return votes
return []
def election_create(self,electionID, basedata):
"Create a new election"
self.DB.ES.index(index=self.index, doc_type="elections", id=electionID, body =
def election_update(self,electionID, basedata):
"Update an election with new data"
self.DB.ES.index(index = self.index, doc_type = "elections", id=electionID, body = basedata)
def issue_update(self,electionID, issueID, issueData):
"Update an issue with new data"
self.DB.ES.index(index = self.index, doc_type = "issues", id=hashlib.sha224(electionID + "/" + issueID).hexdigest(), body = issueData)
def issue_list(self, election):
"List all issues in an election"
issues = []
res =, doc_type="issues", sort = "id", q = "election:%s" % election, size = 999, _source_include = 'id')
results = len(res['hits']['hits'])
if results > 0:
for entry in res['hits']['hits']:
return issues
def election_list(self):
"List all elections"
elections = []
res =, doc_type="elections", sort = "id", q = "*", size = 9999)
results = len(res['hits']['hits'])
if results > 0:
for entry in res['hits']['hits']:
source = entry['_source']
except Exception as err:
pass # THIS IS OKAY! On initial setup, this WILL fail until an election has been created
return elections
def vote(self,electionID, issueID, uid, vote, vhash = None):
"Casts a vote on an issue"
eid = hashlib.sha224(electionID + ":" + issueID + ":" + uid).hexdigest()
now = time.time()
if vhash:
eid = vhash
self.DB.ES.index(index=self.index, doc_type="votes", id=eid, body =
'issue': issueID,
'election': electionID,
'key': uid,
'data': {
'timestamp': now,
'vote': vote
# Backlog of changesets
self.DB.ES.index(index=self.index, doc_type="vote_history", body =
'issue': issueID,
'election': electionID,
'key': uid,
'data': {
'timestamp': now,
'vote': vote
def issue_delete(self, electionID, issueID):
"Deletes an issue if it exists"
self.DB.ES.delete(index=self.index, doc_type="issues", id=hashlib.sha224(electionID + "/" + issueID).hexdigest());
def issue_create(self,electionID, issueID, data):
"Create an issue"
self.DB.ES.index(index=self.index, doc_type="issues", id=hashlib.sha224(electionID + "/" + issueID).hexdigest(), body = data);
def voter_get_uid(self, electionID, votekey):
"Get the UID/email for a voter given the vote key hash"
# First, try the raw hash as an ID
res = self.DB.ES.get(index=self.index, doc_type="voters", id=votekey)
if res:
return res['_source']['uid']
# Now, look for it as hash inside the doc
res =, doc_type="voters", q = "election:%s" % electionID, size = 9999)
results = len(res['hits']['hits'])
if results > 0:
for entry in res['hits']['hits']:
voter = entry['_source']
if voter['hash'] == votekey:
return voter['uid']
return False # ES Error, probably not seeded the voters doc yet
def voter_add(self,election, PID, xhash):
"Add a voter to the DB"
eid = hashlib.sha224(election + ":" + PID).hexdigest()
self.DB.ES.index(index=self.index, doc_type="voters", id=eid, body = {
'election': election,
'hash': xhash,
'uid': PID
def ballot_scrub(self,election, xhash, uid = None):
"Scrub a ballot"
if uid:
xhash = hashlib.sha224(election + ":" + uid).hexdigest()
# Find ballots and votes matching
bid = self.voter_get_uid(election, xhash)
if not bid:
return None
issues = self.issue_list(election)
for issue in issues:
vhash = hashlib.sha224(xhash + issue).hexdigest()
self.DB.ES.delete(index=self.index, doc_type="votes", id=vhash);
return True
def voter_remove(self,election, UID):
"Remove the voter with the given UID"
votehash = hashlib.sha224(election + ":" + UID).hexdigest()
self.DB.ES.delete(index=self.index, doc_type="voters", id=votehash);
def voter_has_voted(self,election, issue, uid):
"Return true if the voter has voted on this issue, otherwise false"
eid = hashlib.sha224(election + ":" + issue + ":" + uid).hexdigest()
return self.DB.ES.exists(index=self.index, doc_type="votes", id=eid)
return False
def voter_ballots(self, UID):
"""Find all elections (and ballots) this user has participated in"""
# First, get all elections
elections = {}
res =, doc_type="elections", sort = "id", q = "*", size = 9999)
results = len(res['hits']['hits'])
if results > 0:
for entry in res['hits']['hits']:
election = entry['_source']
# Mark election open or closed
elections[election['id']] = {
'title': election['title'],
'open': False if election['closed'] else True
# Then, get all ballots and note whether they still apply or not
ballots = {}
res =, doc_type="voters", body = {
"query": {
"match": {
"uid": UID
}, size = 999)
results = len(res['hits']['hits'])
if results > 0:
for entry in res['hits']['hits']:
ballot = entry['_source']
ballots[ballot['election']] = {
'ballot': entry['_id'],
'metadata': elections[ballot['election']]
return ballots
constants.appendBackend("elasticsearch", ElasticSearchBackend)