blob: fe0734df52c058a3996fa558d43281164e15ee0d [file] [log] [blame]
#!/usr/bin/env python3
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
"""
bls
This module use cffi to access the c functions in the BLS library.
There is also an example usage program in this file.
"""
import cffi
import platform
import os
ffi = cffi.FFI()
ffi.cdef("""
typedef struct {
unsigned int ira[21]; /* random number... */
int rndptr; /* ...array & pointer */
unsigned int borrow;
int pool_ptr;
char pool[32]; /* random pool */
} csprng;
typedef struct
{
int len;
int max;
char *val;
} octet;
extern void CREATE_CSPRNG(csprng *R,octet *S);
extern void KILL_CSPRNG(csprng *R);
extern void OCT_clear(octet *O);
extern int BLS_ZZZ_KEY_PAIR_GENERATE(csprng *RNG,octet* S,octet *W);
extern int BLS_ZZZ_SIGN(octet *SIG,char *m,octet *S);
extern int BLS_ZZZ_VERIFY(octet *SIG,char *m,octet *W);
extern int BLS_ZZZ_ADD_G1(octet *R1,octet *R2,octet *R);
extern int BLS_ZZZ_ADD_G2(octet *W1,octet *W2,octet *W);
""")
if (platform.system() == 'Windows'):
libamcl_bls_ZZZ = ffi.dlopen("libamcl_bls_ZZZ.dll")
libamcl_core = ffi.dlopen("libamcl_core.dll")
elif (platform.system() == 'Darwin'):
libamcl_bls_ZZZ = ffi.dlopen("libamcl_bls_ZZZ.dylib")
libamcl_core = ffi.dlopen("libamcl_core.dylib")
else:
libamcl_bls_ZZZ = ffi.dlopen("libamcl_bls_ZZZ.so")
libamcl_core = ffi.dlopen("libamcl_core.so")
# Group Size
BGS = @NB@
# Field Size
BFS = @NB@
CURVE_SECURITY = @CS@
G1LEN = BFS + 1
if CURVE_SECURITY == 128:
G2LEN = 4 * BFS
if CURVE_SECURITY == 192:
G2LEN = 8 * BFS
if CURVE_SECURITY == 256:
G2LEN = 16 * BFS
def to_hex(octet_value):
"""Converts an octet type into a string
Add all the values in an octet into an array. This arrays is then
converted to a string and hex encoded.
Args::
octet_value. An octet pointer type
Returns::
String
Raises:
Exception
"""
i = 0
val = []
while i < octet_value.len:
val.append(octet_value.val[i])
i = i + 1
out = b''
for x in val:
out = out + x
return out.hex()
def make_octet(length, value=None):
"""Generates an octet pointer
Generates an empty octet or one filled with the input value
Args::
length: Length of empty octet
value: Data to assign to octet
Returns::
oct_ptr: octet pointer
val: data associated with octet to prevent garbage collection
Raises:
"""
oct_ptr = ffi.new("octet*")
if value:
val = ffi.new("char [%s]" % len(value), value)
oct_ptr.val = val
oct_ptr.max = len(value)
oct_ptr.len = len(value)
else:
val = ffi.new("char []", length)
oct_ptr.val = val
oct_ptr.max = length
oct_ptr.len = length
return oct_ptr, val
def create_csprng(seed):
"""Make a Cryptographically secure pseudo-random number generator instance
Make a Cryptographically secure pseudo-random number generator instance
Args::
seed: random seed value
Returns::
rng: Pointer to cryptographically secure pseudo-random number generator instance
Raises:
"""
seed_oct, seed_val = make_octet(None, seed)
# random number generator
rng = ffi.new('csprng*')
libamcl_core.CREATE_CSPRNG(rng, seed_oct)
libamcl_core.OCT_clear(seed_oct)
return rng
def kill_csprng(rng):
"""Kill a random number generator
Deletes all internal state
Args::
rng: Pointer to cryptographically secure pseudo-random number generator instance
Returns::
Raises:
"""
libamcl_core.KILL_CSPRNG(rng)
return 0
def key_pair_generate(rng, sk=None):
"""Generate key pair
Generate key pair
Args::
rng: Pointer to cryptographically secure pseudo-random number generator instance
sk: secret key. Externally generated
Returns::
error_code: error from the C function
sk: secret key
pk: public key
Raises:
"""
if sk:
sk1, sk1_val = make_octet(None, sk)
rng = ffi.NULL
else:
sk1, sk1val = make_octet(BGS)
pk1, pk1val = make_octet(G2LEN)
error_code = libamcl_bls_ZZZ.BLS_ZZZ_KEY_PAIR_GENERATE(rng, sk1, pk1)
sk_hex = to_hex(sk1)
pk_hex = to_hex(pk1)
# clear memory
libamcl_core.OCT_clear(sk1)
libamcl_core.OCT_clear(pk1)
sk = bytes.fromhex(sk_hex)
pk = bytes.fromhex(pk_hex)
return error_code, sk, pk
def sign(message, sk):
"""Calculate a signature
Generate key pair
Args::
message: Message to sign
sk: BLS secret key
Returns::
error_code: Zero for success or else an error code
signature: BLS signature
Raises:
"""
sk1, sk1_val = make_octet(None, sk)
signature1, signature1_val = make_octet(G1LEN)
error_code = libamcl_bls_ZZZ.BLS_ZZZ_SIGN(signature1, message, sk1)
signature_hex = to_hex(signature1)
# clear memory
libamcl_core.OCT_clear(sk1)
libamcl_core.OCT_clear(signature1)
signature = bytes.fromhex(signature_hex)
return error_code, signature
def verify(signature, message, pk):
"""Verify a signature
Verify a signature
Args::
message: Message to verify
signature: BLS signature
pk: BLS public key
Returns::
error_code: Zero for success or else an error code
Raises:
"""
pk1, pk1_val = make_octet(None, pk)
signature1, signature1_val = make_octet(None, signature)
error_code = libamcl_bls_ZZZ.BLS_ZZZ_VERIFY(signature1, message, pk1)
# clear memory
libamcl_core.OCT_clear(pk1)
libamcl_core.OCT_clear(signature1)
return error_code
def add_G1(R1, R2):
"""Add two members from the group G1
Add two members from the group G1
Args::
R1: member of G1
R2: member of G1
Returns::
R: member of G1. R = R1+R2
error_code: Zero for success or else an error code
Raises:
"""
R11, R11_val = make_octet(None, R1)
R21, R21_val = make_octet(None, R2)
R1, R1_val = make_octet(G1LEN)
error_code = libamcl_bls_ZZZ.BLS_ZZZ_ADD_G1(R11, R21, R1)
R_hex = to_hex(R1)
# clear memory
libamcl_core.OCT_clear(R11)
libamcl_core.OCT_clear(R21)
libamcl_core.OCT_clear(R1)
R = bytes.fromhex(R_hex)
return error_code, R
def add_G2(R1, R2):
"""Add two members from the group G2
Add two members from the group G2
Args::
R1: member of G2
R2: member of G2
Returns::
R: member of G2. R = R1+R2
error_code: Zero for success or else an error code
Raises:
"""
R11, R11_val = make_octet(None, R1)
R21, R21_val = make_octet(None, R2)
R1, R1_val = make_octet(G2LEN)
error_code = libamcl_bls_ZZZ.BLS_ZZZ_ADD_G2(R11, R21, R1)
R_hex = to_hex(R1)
# clear memory
libamcl_core.OCT_clear(R11)
libamcl_core.OCT_clear(R21)
libamcl_core.OCT_clear(R1)
R = bytes.fromhex(R_hex)
return error_code, R
if __name__ == "__main__":
# Print hex values
DEBUG = False
# Seed
seed_hex = "78d0fb6705ce77dee47d03eb5b9c5d30"
seed = bytes.fromhex(seed_hex)
# Message
message = b"test message"
# random number generator
rng = create_csprng(seed)
# Generate key pairs
rtn, sk1, pktmp = key_pair_generate(rng)
if rtn != 0:
print("Error: key_pair_generate {}".format(rtn))
raise SystemExit(0)
print("sk1: {}".format(sk1.hex()))
print("pktmp: {}".format(pktmp.hex()))
rtn, sk1, pk1 = key_pair_generate(rng, sk1)
if rtn != 0:
print("Error: key_pair_generate {}".format(rtn))
raise SystemExit(0)
print("sk1: {}".format(sk1.hex()))
print("pk1: {}".format(pk1.hex()))
rtn, sk2, pk2 = key_pair_generate(rng)
if rtn != 0:
print("Error: key_pair_generate {}".format(rtn))
raise SystemExit(0)
print("sk2: {}".format(sk2.hex()))
print("pk2: {}".format(pk2.hex()))
rtn, sk3, pk3 = key_pair_generate(rng)
if rtn != 0:
print("Error: key_pair_generate {}".format(rtn))
raise SystemExit(0)
print("sk3: {}".format(sk3.hex()))
print("pk3: {}".format(pk3.hex()))
# Sign and verify
rtn, sig1 = sign(message, sk1)
if rtn != 0:
print("Error: sign {}".format(rtn))
raise SystemExit(0)
print("sig1: {}".format(sig1.hex()))
rtn = verify(sig1, message, pk1)
if rtn != 0:
print("Error: Invalid signature {}".format(rtn))
raise SystemExit(0)
print("Success: Signature is valid")
rtn, sig2 = sign(message, sk2)
if rtn != 0:
print("Error: sign {}".format(rtn))
raise SystemExit(0)
print("sig2: {}".format(sig2.hex()))
rtn = verify(sig2, message, pk2)
if rtn != 0:
print("Error: Invalid signature {}".format(rtn))
raise SystemExit(0)
print("Success: Signature is valid")
rtn, sig3 = sign(message, sk3)
if rtn != 0:
print("Error: sign {}".format(rtn))
raise SystemExit(0)
print("sig3: {}".format(sig3.hex()))
rtn = verify(sig3, message, pk3)
if rtn != 0:
print("Error: Invalid signature {}".format(rtn))
raise SystemExit(0)
print("Success: Signature is valid")
# Add Signatures
rtn, sig12 = add_G1(sig1, sig2)
if rtn != 0:
print("Error: add_G1 {}".format(rtn))
raise SystemExit(0)
print("sig12: {}".format(sig12.hex()))
rtn, sig123 = add_G1(sig12, sig3)
if rtn != 0:
print("Error: add_G1 {}".format(rtn))
raise SystemExit(0)
print("sig123: {}".format(sig123.hex()))
# Add Public keys
rtn, pk12 = add_G2(pk1, pk2)
if rtn != 0:
print("Error: add_G2 {}".format(rtn))
raise SystemExit(0)
print("pk12: {}".format(pk12.hex()))
rtn, pk123 = add_G2(pk12, pk3)
if rtn != 0:
print("Error: add_G2 {}".format(rtn))
raise SystemExit(0)
print("pk123: {}".format(pk123.hex()))
# Verify aggretated values
rtn = verify(sig123, message, pk123)
if rtn != 0:
print("Error: Invalid aggregated signature {}".format(rtn))
raise SystemExit(0)
print("Success: Aggregated signature is valid")
# Clear memory
kill_csprng(rng)
del sk1
del pk1
del sk2
del pk2
del sk3
del pk3