blob: cb9266834cabdaae43015d4ff6dac7a0df21dbc2 [file] [log] [blame]
#!/usr/bin/env python
import BaseHTTPServer
import cgi
import bson
try:
from hashlib import sha1
except ImportError:
import sha as sha1
import hmac
import os
import ssl
import sys
import time
import urllib
import socket
IKEY = 'DIXYZV6YM8IFYVWBINCA'
SKEY = 'yWHSMhWucAcp7qvuH3HWTaSaKABs8Gaddiv1NIRo'
# Used to check if the FQDN is set to either the ipv4 or ipv6 address
IPV6_LOOPBACK_ADDR = '1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa'
IPV4_LOOPBACK_ADDR = '1.0.0.127.in-addr.arpa'
tx_msgs = {
'txPUSH1': [ '0:Pushed a login request to your phone.',
'1:Success. Logging you in...' ],
'txVOICE1': [ '0:Dialing XXX-XXX-1234...',
"1:Answered. Press '#' on your phone to log in.",
'1:Success. Logging you in...' ],
'txSMSREFRESH1': [ '0:New SMS passcodes sent' ],
'txVOICE2': [ '0:Dialing XXX-XXX-5678...',
"1:Answered. Press '#' on your phone to log in.",
'2:Authentication timed out.' ],
}
class MockDuoHandler(BaseHTTPServer.BaseHTTPRequestHandler):
server_version = 'MockDuo/1.0'
protocol_version = 'HTTP/1.1'
def _verify_sig(self):
authz = self.headers['Authorization'].split()[1].decode('base64')
ikey, sig = authz.split(':')
if ikey != IKEY:
return False
canon = [ self.method,
self.headers['Host'].split(':')[0].lower(),
self.path ]
l = []
for k in sorted(self.args.keys()):
l.append('%s=%s' % (urllib.quote(k, '~'),
urllib.quote(self.args[k], '~')))
canon.append('&'.join(l))
h = hmac.new(SKEY, '\n'.join(canon), sha1)
return sig == h.hexdigest()
def _get_args(self):
if self.method == 'POST':
env = { 'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'] }
fs = cgi.FieldStorage(fp=self.rfile, headers=self.headers,
environ=env)
args = {}
for k in fs.keys():
args[k] = fs[k].value
else:
args = dict(cgi.parse_qsl(self.qs))
print 'got %s %s args: %s' % (self.method, self.path, args)
return args
def _get_tx_response(self, txid, async):
last = True
if txid not in tx_msgs:
secs, msg = 0, 'Invalid passcode, please try again.'
elif async:
secs, msg = tx_msgs[txid].pop(0).split(':', 1)
last = not tx_msgs[txid]
else:
secs, msg = tx_msgs[txid][-1].split(':', 1)
if msg.startswith('Success'):
rsp = { 'result': 'allow', 'status': msg }
elif async and not last:
rsp = { 'status': msg }
else:
rsp = { 'result': 'deny', 'status': msg }
time.sleep(int(secs))
return rsp
def _send(self, code, buf=''):
self.send_response(code)
self.send_header("Content-length", str(len(buf)))
if buf:
self.send_header("Content-type", "application/bson")
self.end_headers()
self.wfile.write(buf)
else:
self.end_headers()
def do_GET(self):
self.method = 'GET'
self.path, self.qs = self.path.split('?', 1)
self.args = self._get_args()
if not self._verify_sig():
return self._send(401)
ret = { 'stat': 'OK' }
if self.path == '/rest/v1/status.bson':
ret['response'] = self._get_tx_response(self.args['txid'], 1)
buf = bson.dumps(ret)
return self._send(200, buf)
self._send(404)
def hostname_check(self, hostname):
domain_fqdn = socket.getfqdn().lower()
if hostname == domain_fqdn.lower() or hostname == socket.gethostname().lower():
return True
#Check if socket.getfqdn() is the loopback address for ipv4 or ipv6 then check the hostname of the machine
if domain_fqdn == IPV6_LOOPBACK_ADDR or domain_fqdn == IPV4_LOOPBACK_ADDR:
if hostname == socket.gethostbyaddr(socket.gethostname())[0].lower():
return True
return False
def do_POST(self):
self.method = 'POST'
self.args = self._get_args()
if not self._verify_sig():
return self._send(401)
try:
return self._send(int(self.args['user']))
except:
ret = { 'stat': 'OK' }
if self.path == '/rest/v1/preauth.bson':
if self.args['user'] == 'preauth-ok-missing_response':
pass
elif self.args['user'] == 'preauth-fail-missing_response':
ret['stat'] = 'FAIL'
elif self.args['user'] == 'preauth-bad-stat':
ret['stat'] = 'FFFFUUUU'
elif self.args['user'] == 'preauth-fail':
d = { 'stat': 'FAIL', 'code': 666, 'message': 'you fail' }
elif self.args['user'] == 'preauth-deny':
ret['response'] = { 'result': 'deny', 'status': 'you suck' }
elif self.args['user'] == 'preauth-allow':
ret['response'] = { 'result': 'allow', 'status': 'you rock' }
elif self.args['user'] == 'preauth-allow-bad_response':
ret['response'] = { 'result': 'allow', 'xxx': 'you rock' }
elif (self.args['user'] == 'hostname'):
if (self.hostname_check(self.args['hostname'].lower())):
ret['response'] = { 'result': 'deny', 'status': 'correct hostname' }
else:
response = "hostname recieved: " + self.args['hostname'] + " found: " + socket.getfqdn()
ret['response'] = { 'result': 'deny', 'status': response }
else:
ret['response'] = {
'result': 'auth',
'prompt': 'Duo login for %s\n\n' % self.args['user'] + \
'Choose or lose:\n\n' + \
' 1. Push 1\n 2. Phone 1\n' + \
' 3. SMS 1 (deny)\n 4. Phone 2 (deny)\n\n' + \
'Passcode or option (1-4): ',
'factors': {
'default': 'push1',
'1': 'push1',
'2': 'voice1',
'3': 'smsrefresh1',
'4': 'voice2',
}
}
elif self.path == '/rest/v1/auth.bson':
if self.args['factor'] == 'auto':
txid = 'tx' + self.args['auto'].upper()
if self.args['async'] == '1':
ret['response'] = { 'txid': txid }
else:
ret['response'] = self._get_tx_response(txid, 0)
else:
ret['response'] = { 'result': 'deny',
'status': 'no %s' % self.args['factor'] }
if (self.args['user'] == 'auth_timeout'):
return self._send(500)
else:
return self._send(404)
buf = bson.dumps(ret)
return self._send(200, buf)
def main():
port = 4443
host = 'localhost'
if len(sys.argv) == 1:
cafile = os.path.realpath('%s/certs/mockduo.pem' %
os.path.dirname(__file__))
elif len(sys.argv) == 2:
cafile = sys.argv[1]
else:
print >>sys.stderr, 'Usage: %s [certfile]\n' % sys.argv[0]
sys.exit(1)
httpd = BaseHTTPServer.HTTPServer((host, port), MockDuoHandler)
httpd.socket = ssl.wrap_socket(
httpd.socket,
certfile=cafile,
server_side=True
)
httpd.serve_forever()
if __name__ == '__main__':
main()