blob: 12f987ea0433426b5be77b735e9f74618eea272f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Utility functions for working with secrets."""
from __future__ import division, absolute_import, print_function, unicode_literals
import json
import os
import time
import math
from pbkdf2 import PBKDF2
import tornado
from tornado.httputil import url_concat
from tornado.log import app_log as log
from tornado.options import define, options
import crypto
from mpin_utils.common import (
fetchConfig,
Keys,
SIGNATURE_EXPIRES_OFFSET_SECONDS,
signMessage,
Time,
)
today = crypto.today
def generate_aes_key(passphrase, salt):
"""Return AES key (128 bit) using a pass-phrase.
It uses the passphrase argument or in its absence asks the user for
a pass-phrase to derive an AES key. The algorithm used is
Password-Based Key Derivation Function 2 (PBKDF2)
Keyword arguments:
passphrase -- A string used to generate an AES key
salt -- Salt value for the PBKDF2 Algorithm. 64 bits hex encoded
"""
return PBKDF2(passphrase, str(salt)).read(16)
def backup_master_secret(master_secret, encrypt_master_secret, passphrase, salt, backup_file, time, rng):
"""Write the master secret to file."""
aes_key = generate_aes_key(passphrase, salt)
data = {
'startTime': time.strftime('%Y-%m-%dT%H:%M:%SZ')
}
if encrypt_master_secret:
ciphertext_hex, iv_hex, tag_hex = crypto.aes_gcm_encrypt(
master_secret, aes_key, rng, time.strftime('%Y-%m-%dT%H:%M:%SZ'))
data.update({
'IV': iv_hex,
'ciphertext': ciphertext_hex,
'tag': tag_hex})
else:
data['master_secret_hex'] = master_secret.encode('hex'),
with open(backup_file, 'w') as json_file:
json.dump(data, json_file)
def generate_random_number(rng, length):
"""Return random number with predefined length."""
return crypto.random_generate(rng, length)
def generate_ott(length, rng, encoding=None):
"""Generate a one time token (OTT).
Uses the Random Number Generator to generate a value
of length OTTLength set in the config file. This is
then encoded.
"""
ott_hex = generate_random_number(rng, length)
if encoding == 'hex':
return ott_hex
ott = ott_hex.decode('hex')
if encoding:
ott = ott.encode(encoding)
return ott
def generate_otp(rng):
"""Generate a one time password (OTP).
Uses the Random Number Generator to generate 6 long value
"""
return crypto.generate_otp(rng)
def get_checksum(num, length):
"""Return checksum."""
sum_digits = sum([
int(digit) * (length + 1 - i)
for i, digit in enumerate(str(num).zfill(length))])
checksum = (11 - (sum_digits % 11)) % 11
return checksum if checksum != 10 else None
def generate_random_webid(rng, use_checksum=True):
"""Generate a web identifier for mobile login.
Generates a random six digit integer. This is
appended with a one digit checksum.
"""
num = generate_otp(rng)
checksum = get_checksum(num, 6) if use_checksum else ''
if not checksum and use_checksum:
return None
return "{0:06d}{1}".format(num, checksum)
def generate_auth_ott(rng):
"""Return auth OTT."""
return generate_random_number(rng, crypto.PAS)
def get_random_bytes(rng, byte_length = 4):
"""Return random byte-array."""
if type(byte_length) is not int:
return None
if byte_length <= 0:
return None
r_hex = crypto.random_generate(rng, byte_length)
bytes = []
for idx in range(len(r_hex) - 2, -1, -2):
bytes.append(int(r_hex[idx:idx + 2], 16))
return bytes
def get_random_integer(rng, max_digit = 4):
"""Return random integer."""
if type(max_digit) is not int:
return None
if max_digit <= 0:
return None
mod_val = 10 ** max_digit
byte_length = int(math.ceil(max_digit * math.log(10, 2)/8))
bytes = get_random_bytes(rng, byte_length)
r = 0
for idx in range(byte_length - 1, -1, -1):
r = (r << 8) + bytes[idx]
r = r % mod_val
return r
class SecretsError(Exception):
"""Exception raises by secrets module."""
pass
class MasterSecret(object):
"""Master Secret."""
master_secret = None
start_time = None
def __init__(self, passphrase, salt, seed, time, backup_file=None, encrypt_master_secret=True):
"""Constructor."""
self.rng = crypto.get_random_generator(seed)
self.master_secret, self.start_time = self._get_master_secret(
passphrase, salt, time, backup_file, encrypt_master_secret)
def _get_master_secret(self, passphrase, salt, time, backup_file=None, encrypt_master_secret=True):
"""Restore/generate master secret.
Restore from backup_file if such is provided, generate new otherwise.
Set backup_file=None for in memory master_secret.
"""
if not backup_file:
log.info('Master Secret Share not backed up to file')
return self._generate_master_secret(), time
if not os.path.exists(backup_file):
log.info('Master Secret backup file doesn\'t exists. Generate new.')
master_secret = self._generate_master_secret()
backup_master_secret(
master_secret, encrypt_master_secret, passphrase, salt, backup_file, time, self.rng)
return master_secret, time
log.info('Restore Master Secret Share from file')
return self._restore_master_secret(
backup_file,
encrypt_master_secret,
passphrase,
salt)
def _generate_master_secret(self):
"""Generate the M-Pin Master Secret."""
try:
return crypto.mpin_random_generate(self.rng)
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('M-Pin Master Secret Generation Failed')
def _restore_master_secret(self, backup_file, encrypt_master_secret, passphrase, salt):
"""Restore secret from file.
Decode secret if encrypted.
"""
try:
with open(backup_file) as json_file:
backup = json.load(json_file)
except ValueError:
raise SecretsError('Master Secret backup file is corrupted.')
if encrypt_master_secret:
tag, plaintext = crypto.aes_gcm_decrypt(
aes_key=generate_aes_key(passphrase, salt),
iv=str(backup['IV'].decode('hex')),
header=str(backup['startTime']),
ciphertext=str(backup['ciphertext'].decode('hex')))
# Check authentication tag
if backup['tag'] != tag:
raise SecretsError('AES-GSM Decryption Failed. Authentication tag is not correct')
self.start_time = Time.ISOtoDateTime(str(backup['startTime']))
master_secret = plaintext.decode('hex')
else:
self.start_time = Time.ISOtoDateTime(backup['startTime'])
master_secret = backup['master_secret_hex'].decode('hex')
return master_secret, self.start_time
def get_server_secret(self):
"""Generate server secret."""
try:
return crypto.get_server_secret(self.master_secret)
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('Server Secret generation failed')
def get_client_secret(self, mpin_id):
"""Generate client secret."""
try:
return crypto.get_client_multiple(self.master_secret, mpin_id)
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('Client secret generation failed')
def get_time_permits(self, mpin_id, count):
"""Generate client time permit."""
start_date = crypto.today()
try:
return dict(
(date, crypto.get_time_permit(self.master_secret, mpin_id, date))
for date in range(start_date, start_date + count))
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('M-Pin Time Permit Generation Failed')
define("certivoxServerSecret", default='dta', type=unicode)
class ServerSecret(object):
"""Server Secret."""
server_secret = None
def __init__(self, seed, app_id, app_key):
"""Constructor."""
self.rng = crypto.get_random_generator(seed)
self.app_id = app_id
self.app_key = app_key
self.server_secret = self._get_server_secret()
def _get_certivox_server_secret_share_dta(self, expires):
path = 'serverSecret'
url_params = url_concat('{0}{1}'.format(Keys.certivoxServer(), path), {
'app_id': self.app_id,
'expires': expires,
'signature': signMessage('{0}{1}{2}'.format(path, self.app_id, expires), self.app_key)
})
log.debug('MIRACL server secret request: {0}'.format(url_params))
httpclient = tornado.httpclient.HTTPClient()
try:
response = httpclient.fetch(url_params, **fetchConfig(url_params))
except tornado.httpclient.HTTPError as e:
log.error(e)
raise SecretsError('Unable to get Server Secret from the MIRACL TA server')
httpclient.close()
try:
data = json.loads(response.body)
except ValueError as e:
log.error(e)
raise SecretsError('Invalid response from TA server')
if 'serverSecret' not in data:
raise SecretsError('serverSecret not in response from TA server')
return data["serverSecret"]
def _get_certivox_server_secret_share_credentials(self, expires):
if not hasattr(Keys, 'certivox_server_secret'):
raise SecretsError(
'MIRACL server secret share is not in the credentials.json. '
'You can get it by: \n'
'scripts/getServerSecretShare.py credentials.json > credentials_with_secret.json')
return Keys.certivox_server_secret
def _get_certivox_server_secret_share(self, expires):
method = options.certivoxServerSecret
methods = {
'dta': self._get_certivox_server_secret_share_dta,
'credentials.json': self._get_certivox_server_secret_share_credentials,
'manual': lambda x: raw_input('MIRACL server secret share:'),
'config': lambda x: options.certivoxServerSecret
}
func = methods[method if method in methods else 'config']
certivox_server_secret_hex = func(expires)
try:
return certivox_server_secret_hex.decode("hex")
except TypeError as e:
log.error(e)
raise SecretsError('Invalid CertiVox server secret share')
def _get_customer_server_secret_share(self, expires):
path = 'serverSecret'
url_params = url_concat(
'{0}/{1}'.format(options.DTALocalURL, path),
{
'app_id': self.app_id,
'expires': expires,
'signature': signMessage('{0}{1}{2}'.format(path, self.app_id, expires), self.app_key)
})
log.debug('customer server secret request: {0}'.format(url_params))
httpclient = tornado.httpclient.HTTPClient()
import socket
# Make at most 30 attempts to get server secret from local TA
for attempt in range(30):
try:
response = httpclient.fetch(url_params)
except (tornado.httpclient.HTTPError, socket.error) as e:
log.error(e)
log.error(
'Unable to get Server Secret from the customer TA server. '
'Retying...')
time.sleep(2)
continue
httpclient.close()
break
else:
# Max attempts reached
raise SecretsError(
'Unable to get Server Secret from the customer TA server.')
try:
data = json.loads(response.body)
except ValueError:
raise SecretsError('TA server response contains invalid JSON')
if 'serverSecret' not in data:
raise SecretsError('serverSecret not in response from TA server')
return data["serverSecret"].decode("hex")
def _get_server_secret(self):
expires = Time.syncedISO(seconds=SIGNATURE_EXPIRES_OFFSET_SECONDS)
certivox_server_secret = self._get_certivox_server_secret_share(expires)
customer_server_secret = self._get_customer_server_secret_share(expires)
try:
server_secret_hex = crypto.mpin_recombine_g2(certivox_server_secret, customer_server_secret)
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('M-Pin Server Secret Generation Failed')
return server_secret_hex.decode("hex")
def get_pass1_value(self):
"""Return pass1 value."""
try:
random_number = crypto.mpin_random_generate(self.rng)
except crypto.CryptoError as e:
log.error(e)
raise SecretsError('Pass 1 - failed to generate Y')
return random_number.encode('hex')
def validate_pass2_value(self, mpin_id, u, ut, y, v):
"""Validate pass2 value.
y - pass 1 values
v - pass 2 value in question
"""
date = crypto.today()
check_dates = [date]
if Time.syncedNow().hour < 1:
check_dates.append(date - 1)
for date in check_dates:
hid, htid = crypto.mpin_server_1(mpin_id, date)
success, _, _ = crypto.mpin_server_2(self.server_secret, v, date, hid, htid, y, u, ut)
if success != -19:
break
return success