blob: d1ca384115f07c51ca8b8243f12c654c44721412 [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Python package `teaclave` is the client SDK for Python developers, providing
some essential data structures, service, and client classes to establish
trusted TLS channel and communicate with Teaclave services (e.g., the
authentication service and frontend service) through RPC protocols.
"""
import json
import base64
import toml
import time
import os
import ssl
import cryptography
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from google.protobuf.json_format import MessageToDict
from google.protobuf.empty_pb2 import Empty
from grpclib.client import Channel, _ChannelState
from grpclib.protocol import H2Protocol
from OpenSSL.crypto import load_certificate, FILETYPE_PEM, FILETYPE_ASN1
from OpenSSL.crypto import X509Store, X509StoreContext
from OpenSSL import crypto
import teaclave_authentication_service_pb2 as auth
import teaclave_frontend_service_pb2 as fe
from teaclave_authentication_service_grpc import TeaclaveAuthenticationApiStub
from teaclave_frontend_service_grpc import TeaclaveFrontendStub
from teaclave_common_pb2 import TaskStatus, FileCryptoInfo
from typing import Tuple, Dict, List, Any
__all__ = [
'FrontendService', 'AuthenticationService', 'FunctionArgument',
'FunctionInput', 'FunctionOutput', 'OwnerList', 'DataMap'
]
Metadata = Dict[str, str]
class Request:
message = None
def __init__(self, method, response, metadata=dict()):
self.method = method
self.metadata = metadata
self.response = response
class TeaclaveException(Exception):
pass
class TeaclaveService:
metadata = None
stub = None
def __init__(self,
name: str,
address: Tuple[str, int],
as_root_ca_cert_path: str,
enclave_info_path: str,
dump_report=False):
self._name = name
self._address = address
self._as_root_ca_cert_path = as_root_ca_cert_path
self._enclave_info_path = enclave_info_path
self._dump_report = dump_report
self._channel = TeaclaveChannel(self._name, self._address,
self._as_root_ca_cert_path,
self._enclave_info_path)
self._loop = self._channel._loop
def call_method(self, request):
return self._loop.run_until_complete(
getattr(self.stub, request.method)(request.message,
metadata=request.metadata))
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def close(self):
if self._channel: self._channel.close()
def __del__(self) -> None:
self.close()
def check_metadata(self):
if not self.metadata: raise TeaclaveException("Metadata is None")
def check_channel(self):
self._channel.check_channel()
def get_metadata(self):
return self.metadata
def create_context() -> ssl.SSLContext:
ctx = ssl._create_unverified_context()
ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
ctx.set_alpn_protocols(['h2'])
try:
ctx.set_npn_protocols(['h2'])
except NotImplementedError:
pass
return ctx
class TeaclaveChannel(Channel):
def __init__(self,
name: str,
address: Tuple[str, int],
as_root_ca_cert_path: str,
enclave_info_path: str,
dump_report=False):
context = create_context()
super().__init__(host=address[0], port=address[1], ssl=context)
self._name = name
self._as_root_ca_cert_path = as_root_ca_cert_path
self._enclave_info_path = enclave_info_path
self._dump_report = dump_report
def check_channel(self):
if self._state == _ChannelState.TRANSIENT_FAILURE:
raise TeaclaveException("Channel is None")
async def __connect__(self) -> H2Protocol:
protocol = await super().__connect__()
sslobj = protocol.connection._transport.get_extra_info('ssl_object')
cert = sslobj.getpeercert(binary_form=True)
if not cert: raise TeaclaveException("Peer cert is None")
try:
self._verify_report(self._as_root_ca_cert_path,
self._enclave_info_path, cert, self._name)
except Exception as e:
raise TeaclaveException(
f"Failed to verify attestation report: {e}")
return protocol
def _verify_report(self, as_root_ca_cert_path: str, enclave_info_path: str,
cert: Dict[str, Any], endpoint_name: str):
def load_certificates(pem_bytes):
start_line = b'-----BEGIN CERTIFICATE-----'
result = []
cert_slots = pem_bytes.split(start_line)
for single_pem_cert in cert_slots[1:]:
cert = load_certificate(FILETYPE_ASN1,
start_line + single_pem_cert)
result.append(cert)
return result
if os.environ.get('SGX_MODE') == 'SW':
return
cert = x509.load_der_x509_certificate(cert, default_backend())
if self._dump_report:
try:
with open(self._name + "_attestation_report.pem", "wb") as f:
f.write(
cert.public_bytes(cryptography.hazmat.primitives.
serialization.Encoding.PEM))
except:
raise TeaclaveException("Failed to dump attestation report")
try:
ext = json.loads(cert.extensions[0].value.value)
except:
raise TeaclaveException("Failed to load extensions")
report = bytes(ext["report"])
signature = bytes(ext["signature"])
try:
certs = [
load_certificate(FILETYPE_ASN1, bytes(c)) for c in ext["certs"]
]
except:
raise TeaclaveException(
"Failed to load singing certificate of the report")
# verify signing cert with AS root cert
try:
with open(as_root_ca_cert_path) as f:
as_root_ca_cert = f.read()
except:
raise TeaclaveException(
"Failed to open attestation service root certificate")
try:
as_root_ca_cert = load_certificate(FILETYPE_PEM, as_root_ca_cert)
except:
raise TeaclaveException(
"Failed to load attestation service root certificate")
store = X509Store()
store.add_cert(as_root_ca_cert)
client_cert = certs[0]
if len(certs) > 1:
for c in certs[1:]:
store.add_cert(c)
store_ctx = X509StoreContext(store, client_cert)
try:
store_ctx.verify_certificate()
# verify report's signature
crypto.verify(certs[0], signature, bytes(ext["report"]), 'sha256')
except:
raise TeaclaveException("Failed to verify report signature")
report = json.loads(report)
quote = report['isvEnclaveQuoteBody']
quote = base64.b64decode(quote)
# get report_data from the quote
report_data = quote[368:368 + 64]
# get EC pub key from the certificate
pub_key = cert.public_key().public_bytes(
cryptography.hazmat.primitives.serialization.Encoding.X962,
cryptography.hazmat.primitives.serialization.PublicFormat.
UncompressedPoint)
# verify whether the certificate is bound to the quote
assert (pub_key[0] == 4)
if pub_key[1:] != report_data:
raise TeaclaveException(
"Failed to verify the certificate agaist the report data in the quote"
)
# get mr_enclave and mr_signer from the quote
mr_enclave = quote[112:112 + 32].hex()
mr_signer = quote[176:176 + 32].hex()
# get enclave_info
try:
enclave_info = toml.load(enclave_info_path)
except:
raise TeaclaveException("Failed to load enclave info")
# verify mr_enclave and mr_signer
enclave_name = "teaclave_" + endpoint_name + "_service"
if mr_enclave != enclave_info[enclave_name]["mr_enclave"]:
raise Exception("Failed to verify mr_enclave")
if mr_signer != enclave_info[enclave_name]["mr_signer"]:
raise Exception("Failed to verify mr_signer")
class FunctionInput:
"""Function input for registering.
Args:
name: Name of input data.
description: Description of the input data.
optional: [Default: False] Data owners do not need to register the data.
"""
def __init__(self, name: str, description: str, optional=False):
self.message = fe.FunctionInput(name=name,
description=description,
optional=optional)
class FunctionOutput:
"""Function output for registering.
Args:
name: Name of output data.
description: Description of the output data.
optional: [Default: False] Data owners do not need to register the data.
"""
def __init__(self, name: str, description: str, optional=False):
self.message = fe.FunctionOutput(name=name,
description=description,
optional=optional)
class FunctionArgument:
"""Function argument for registring.
Args:
key: Name of the argument.
default_value: A default value of the argument. The default value is "".
allow_overwrite: If allow_overwrite flag is set to be true. The service
will allow the task creator to overwrite the arguement
value when creating tasks.
"""
def __init__(self,
key: str,
default_value: str = "",
allow_overwrite=True):
self.message = fe.FunctionArgument(key=key,
default_value=default_value,
allow_overwrite=allow_overwrite)
class OwnerList:
"""Defines data ownership.
Args:
data_name: Name of output data.
uids: A list of user id which own this data.
"""
def __init__(self, data_name: str, uids: List[str]):
self.message = fe.OwnerList(data_name=data_name, uids=uids)
class DataMap:
"""Assign data id to input or output data.
Args:
data_name: Name of output data.
data_id: Id for the data name.
"""
def __init__(self, data_name, data_id):
self.message = fe.DataMap(data_name=data_name, data_id=data_id)
class CryptoInfo:
"""Cryptographic information for the input/output data.
Args:
schema: Encryption algorithms for the input/output data.
key: Key for encryption and decryption, bytes in list.
iv: IV, bytes in list.
"""
def __init__(self, schema: str, key: List[int], iv: List[int]):
self.message = FileCryptoInfo(schema=schema,
key=bytes(key),
iv=bytes(iv))
class UserRegisterRequest(Request):
def __init__(self, metadata: Metadata, user_id: str, user_password: str,
role: str, attribute: str):
super().__init__("UserRegister", Empty, metadata)
self.message = auth.UserRegisterRequest(id=user_id,
password=user_password,
role=role,
attribute=attribute)
class UserUpdateRequest(Request):
def __init__(self, metadata: Metadata, user_id: str, user_password: str,
role: str, attribute: str):
super().__init__("UserUpdate", Empty)
self.message = auth.UserUpdateRequest(id=user_id,
password=user_password,
role=role,
attribute=attribute)
class UserLoginRequest(Request):
def __init__(self, user_id: str, user_password: str):
super().__init__("UserLogin", auth.UserLoginResponse)
self.message = auth.UserLoginRequest(id=user_id,
password=user_password)
class UserChangePasswordRequest(Request):
def __init__(self, metadata: Metadata, password: str):
super().__init__("UserChangePassword", Empty, metadata)
self.message = auth.UserChangePasswordRequest(password=password)
class ResetUserPasswordRequest(Request):
def __init__(self, metadata: Metadata, user_id: str):
super().__init__("ResetUserPassword", auth.ResetUserPasswordResponse,
metadata)
self.message = auth.ResetUserPasswordRequest(id=user_id)
class DeleteUserRequest(Request):
def __init__(self, metadata: Metadata, user_id: str):
super().__init__("DeleteUser", Empty, metadata)
self.message = auth.DeleteUserRequest(id=user_id)
class ListUsersRequest(Request):
def __init__(self, metadata: Metadata, user_id: str):
super().__init__("ListUsers", auth.ListUsersResponse, metadata)
self.message = auth.ListUsersRequest(id=user_id)
class AuthenticationService(TeaclaveService):
"""
Establish trusted channel with the authentication service and provide
clients to send request through RPC.
Args:
address: The address of the remote services in tuple.
as_root_ca_cert_path: Root CA certification of the attestation services
to verify the attestation report.
enclave_info_path: Path of enclave info to verify the remote service in
the attestation report.
"""
def __init__(self,
address: Tuple[str, int],
as_root_ca_cert_path: str,
enclave_info_path: str,
dump_report=False):
super().__init__("authentication", address, as_root_ca_cert_path,
enclave_info_path, dump_report)
self.stub = TeaclaveAuthenticationApiStub(self._channel)
def user_register(self,
user_id: str,
user_password: str,
role="",
attribute=""):
"""Register a new user.
Args:
user_id: User ID.
user_password: Password.
role: Role of user.
attribute: Attribute related to the role.
"""
self.check_channel()
self.check_metadata()
request = UserRegisterRequest(self.metadata, user_id, user_password,
role, attribute)
try:
response = self.call_method(request)
return response
except Exception as e:
raise TeaclaveException(f"Failed to register user {str(e)}")
def user_update(self,
user_id: str,
user_password: str,
role: str,
attribute=""):
"""Update an existing user.
Args:
user_id: User ID.
user_password: Password.
role: Role of user.
attribute: Attribute related to the role.
"""
self.check_channel()
self.check_metadata()
request = UserUpdateRequest(self.metadata, user_id, user_password,
role, attribute)
try:
response = self.call_method(request)
return response
except Exception as e:
raise TeaclaveException(f"Failed to update user {str(e)}")
def user_login(self, user_id: str, user_password: str) -> str:
"""Login and get a session token.
Args:
user_id: User ID.
user_password: Password.
Returns:
str: User login token.
"""
self._channel.check_channel()
request = UserLoginRequest(user_id, user_password)
try:
response = self.call_method(request)
self.metadata = {"id": user_id, "token": response.token}
return response.token
except Exception as e:
raise TeaclaveException(f"Failed to login user {str(e)}")
def user_change_password(self, user_password: str):
"""Change password.
Args:
user_password: New password.
"""
self.check_channel()
self.check_metadata()
request = UserChangePasswordRequest(self.metadata, user_password)
try:
response = self.call_method(request)
return response
except Exception as e:
raise TeaclaveException(f"Failed to change password {str(e)}")
def reset_user_password(self, user_id: str) -> str:
"""Reset password of a managed user.
Args:
user_id: User ID.
Returns:
str: New password.
"""
self.check_channel()
self.check_metadata()
request = ResetUserPasswordRequest(self.metadata, user_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to reset password {reason}")
def delete_user(self, user_id: str) -> str:
"""Delete a user.
Args:
user_id: User ID.
"""
self.check_channel()
self.check_metadata()
request = DeleteUserRequest(self.metadata, user_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to delete user ({reason})")
def list_users(self, user_id: str) -> str:
"""List managed users
Args:
user_id: User ID.
Returns:
str: User list
"""
self.check_channel()
self.check_metadata()
request = ListUsersRequest(self.metadata, user_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to list user ({reason})")
class RegisterFunctionRequest(Request):
def __init__(self, metadata: Metadata, name: str, description: str,
executor_type: str, public: bool, payload: List[int],
arguments: List[FunctionArgument],
inputs: List[FunctionInput], outputs: List[FunctionOutput],
user_allowlist: List[str], usage_quota: int):
super().__init__("RegisterFunction", fe.RegisterFunctionResponse,
metadata)
arguments = [x.message for x in arguments]
inputs = [x.message for x in inputs]
outputs = [x.message for x in outputs]
self.message = fe.RegisterFunctionRequest(
name=name,
description=description,
executor_type=executor_type,
public=public,
payload=bytes(payload),
arguments=arguments,
inputs=inputs,
outputs=outputs,
user_allowlist=user_allowlist,
usage_quota=usage_quota)
class UpdateFunctionRequest(Request):
def __init__(self, metadata: Metadata, function_id: str, name: str,
description: str, executor_type: str, public: bool,
payload: List[int], arguments: List[FunctionArgument],
inputs: List[FunctionInput], outputs: List[FunctionOutput],
user_allowlist: List[str], usage_quota: int):
super().__init__("UpdateFunction", fe.UpdateFunctionResponse, metadata)
arguments = [x.message for x in arguments]
inputs = [x.message for x in inputs]
outputs = [x.message for x in outputs]
self.message = fe.UpdateFunctionRequest(function_id, name, description,
executor_type, public, payload,
arguments, inputs, outputs,
user_allowlist, usage_quota)
class ListFunctionsRequest(Request):
def __init__(self, metadata: Metadata, user_id: str):
super().__init__("ListFunctions", fe.ListFunctionsResponse, metadata)
self.message = fe.ListFunctionsRequest(user_id=user_id)
class DeleteFunctionRequest(Request):
def __init__(self, metadata: Metadata, function_id: str):
super().__init__("ListFunctions", Empty, metadata)
self.message = fe.DeleteFunctionRequest(function_id=function_id)
class DisableFunctionRequest(Request):
def __init__(self, metadata: Metadata, function_id: str):
super().__init__("DisableFunction", Empty, metadata)
self.message = fe.DisableFunctionRequest(function_id=function_id)
class GetFunctionRequest(Request):
def __init__(self, metadata: Metadata, function_id: str):
super().__init__("GetFunction", fe.GetFunctionResponse, metadata)
self.message = fe.GetFunctionRequest(function_id=function_id)
class GetFunctionUsageStatsRequest(Request):
def __init__(self, metadata: Metadata, function_id: str):
super().__init__("GetFunctionUsageStats",
fe.GetFunctionUsageStatsResponse, metadata)
self.message = fe.GetFunctionUsageStatsRequest(function_id=function_id)
class RegisterInputFileRequest(Request):
def __init__(self, metadata: Metadata, url: str, cmac: List[int],
crypto_info: CryptoInfo):
super().__init__("RegisterInputFile", fe.RegisterInputFileResponse,
metadata)
self.message = fe.RegisterInputFileRequest(
url=url, cmac=bytes(cmac), crypto_info=crypto_info.message)
class RegisterOutputFileRequest(Request):
def __init__(self, metadata: Metadata, url: str, crypto_info: CryptoInfo):
super().__init__("RegisterOutputFile", fe.RegisterOutputFileResponse,
metadata)
self.message = fe.RegisterOutputFileRequest(
url=url, crypto_info=crypto_info.message)
class UpdateInputFileRequest(Request):
def __init__(self, metadata: Metadata, data_id: str, url: str):
super().__init__("UpdateInputFile", fe.UpdateInputFileResponse,
metadata)
self.message = fe.UpdateInputFileRequest(data_id=data_id, url=url)
class UpdateOutputFileRequest(Request):
def __init__(self, metadata: Metadata, data_id: str, url: str):
super().__init__("UpdateInputFile", fe.UpdateOutputFileResponse,
metadata)
self.message = fe.UpdateOutputFileRequest(data_id=data_id, url=url)
class CreateTaskRequest(Request):
def __init__(self, metadata: Metadata, function_id: str,
function_arguments: Dict[str, Any], executor: str,
inputs_ownership: List[OwnerList],
outputs_ownership: List[OwnerList]):
super().__init__("CreateTask", fe.CreateTaskResponse, metadata)
inputs_ownership = [x.message for x in inputs_ownership]
outputs_ownership = [x.message for x in outputs_ownership]
self.message = fe.CreateTaskRequest(
function_id=function_id,
function_arguments=function_arguments,
executor=executor,
inputs_ownership=inputs_ownership,
outputs_ownership=outputs_ownership)
class AssignDataRequest(Request):
def __init__(self, metadata: Metadata, task_id: str, inputs: List[DataMap],
outputs: List[DataMap]):
super().__init__("AssignData", Empty, metadata)
inputs = [x.message for x in inputs]
outputs = [x.message for x in outputs]
self.message = fe.AssignDataRequest(task_id=task_id,
inputs=inputs,
outputs=outputs)
class ApproveTaskRequest(Request):
def __init__(self, metadata: Metadata, task_id: str):
super().__init__("ApproveTask", Empty, metadata)
self.message = fe.ApproveTaskRequest(task_id=task_id)
class InvokeTaskRequest(Request):
def __init__(self, metadata: Metadata, task_id: str):
super().__init__("InvokeTask", Empty, metadata)
self.message = fe.InvokeTaskRequest(task_id=task_id)
class CancelTaskRequest(Request):
def __init__(self, metadata: Metadata, task_id: str):
super().__init__("CancelTask", Empty, metadata)
self.message = fe.CancelTaskRequest(task_id=task_id)
class GetTaskRequest(Request):
def __init__(self, metadata: Metadata, task_id: str):
super().__init__("GetTask", fe.GetTaskResponse, metadata)
self.message = fe.GetTaskRequest(task_id=task_id)
class QueryAuditLogsRequest(Request):
def __init__(self, metadata: Metadata, message: str, limit: int):
super().__init__("QueryAuditLogs", fe.QueryAuditLogsResponse, metadata)
self.message = fe.QueryAuditLogsReqeust(message=message, limit=limit)
class FrontendService(TeaclaveService):
"""Establish trusted channel with the frontend service and provide
clients to send request through RPC.
Args:
address: The address of the remote services in tuple.
as_root_ca_cert_path: Root CA certification of the attestation services
to verify the attestation report.
enclave_info_path: Path of enclave info to verify the remote service in
the attestation report.
"""
def __init__(self,
address: Tuple[str, int],
as_root_ca_cert_path: str,
enclave_info_path: str,
dump_report=False):
super().__init__("frontend", address, as_root_ca_cert_path,
enclave_info_path, dump_report)
self.stub = TeaclaveFrontendStub(self._channel)
def register_function(
self,
name: str,
description: str,
executor_type: str,
public: bool = True,
payload: List[int] = [],
arguments: List[FunctionArgument] = [],
inputs: List[FunctionInput] = [],
outputs: List[FunctionOutput] = [],
user_allowlist: List[str] = [],
usage_quota: int = -1,
):
self.check_metadata()
self.check_channel()
request = RegisterFunctionRequest(self.metadata, name, description,
executor_type, public, payload,
arguments, inputs, outputs,
user_allowlist, usage_quota)
try:
response = self.call_method(request)
return response.function_id
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to register function ({reason})")
def update_function(
self,
function_id: str,
name: str,
description: str,
executor_type: str,
public: bool = True,
payload: List[int] = [],
arguments: List[FunctionArgument] = [],
inputs: List[FunctionInput] = [],
outputs: List[FunctionOutput] = [],
user_allowlist: List[str] = [],
usage_quota: int = -1,
):
self.check_metadata()
self.check_channel()
request = UpdateFunctionRequest(self.metadata, function_id, name,
description, executor_type, public,
payload, arguments, inputs, outputs,
user_allowlist, usage_quota)
try:
response = self.call_method(request)
return response.function_id
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to register function ({reason})")
def list_functions(self, user_id: str):
self.check_metadata()
self.check_channel()
request = ListFunctionsRequest(self.metadata, user_id)
try:
response = self.call_method(request)
except Exception as e:
raise TeaclaveException(f"Failed to list functions ({str(e)})")
return MessageToDict(response,
preserving_proto_field_name=True,
use_integers_for_enums=True)
def get_function(self, function_id: str):
self.check_metadata()
self.check_channel()
request = GetFunctionRequest(self.metadata, function_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to get function ({reason})")
def get_function_usage_stats(self, function_id: str):
self.check_metadata()
self.check_channel()
request = GetFunctionUsageStatsRequest(self.metadata, function_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to get function usage statistics ({reason})")
def delete_function(self, function_id: str):
self.check_metadata()
self.check_channel()
request = DeleteFunctionRequest(self.metadata, function_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to delete function ({reason})")
def disable_function(self, function_id: str):
self.check_metadata()
self.check_channel()
request = DisableFunctionRequest(self.metadata, function_id)
try:
response = self.call_method(request)
return response
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to disable function ({reason})")
def register_input_file(self, url: str, schema: str, key: List[int],
iv: List[int], cmac: List[int]):
self.check_metadata()
self.check_channel()
request = RegisterInputFileRequest(self.metadata, url, cmac,
CryptoInfo(schema, key, iv))
try:
response = self.call_method(request)
return response.data_id
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to register input file ({reason})")
def register_output_file(self, url: str, schema: str, key: List[int],
iv: List[int]):
self.check_metadata()
self.check_channel()
request = RegisterOutputFileRequest(self.metadata, url,
CryptoInfo(schema, key, iv))
try:
response = self.call_method(request)
return response.data_id
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to register output file ({reason})")
def create_task(self,
function_id: str,
function_arguments: Dict[str, Any],
executor: str,
inputs_ownership: List[OwnerList] = [],
outputs_ownership: List[OwnerList] = []):
self.check_metadata()
self.check_channel()
function_arguments = json.dumps(function_arguments)
request = CreateTaskRequest(self.metadata, function_id,
function_arguments, executor,
inputs_ownership, outputs_ownership)
try:
response = self.call_method(request)
return response.task_id
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to create task ({reason})")
def assign_data_to_task(self, task_id: str, inputs: List[DataMap],
outputs: List[DataMap]):
self.check_metadata()
self.check_channel()
request = AssignDataRequest(self.metadata, task_id, inputs, outputs)
try:
self.call_method(request)
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to assign data to task ({reason})")
def approve_task(self, task_id: str):
self.check_metadata()
self.check_channel()
request = ApproveTaskRequest(self.metadata, task_id)
try:
self.call_method(request)
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to approve task ({reason})")
def invoke_task(self, task_id: str):
self.check_metadata()
self.check_channel()
request = InvokeTaskRequest(self.metadata, task_id)
try:
self.call_method(request)
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to invoke task ({reason})")
def cancel_task(self, task_id: str):
self.check_metadata()
self.check_channel()
request = CancelTaskRequest(self.metadata, task_id)
try:
self.call_method(request)
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to cancel task ({reason})")
def get_task(self, task_id: str):
self.check_metadata()
self.check_channel()
request = GetTaskRequest(self.metadata, task_id)
try:
response = self.call_method(request)
return MessageToDict(response,
preserving_proto_field_name=True,
use_integers_for_enums=True)
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to get task result ({reason})")
def get_task_result(self, task_id: str):
self.check_metadata()
self.check_channel()
request = GetTaskRequest(self.metadata, task_id)
while True:
try:
time.sleep(1)
response = self.call_method(request)
if response.status == TaskStatus.Finished:
break
elif response.status == TaskStatus.Canceled:
raise TeaclaveException("Task Canceled, Error: " +
response.result.Err.reason)
elif response.status == TaskStatus.Failed:
raise TeaclaveException("Task Failed, Error: " +
response.result.Err.reason)
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to get task result ({reason})")
return response.result.Ok.return_value
def get_output_cmac_by_tag(self, task_id: str, tag: str):
self.check_metadata()
self.check_channel()
request = GetTaskRequest(self.metadata, task_id)
while True:
try:
time.sleep(1)
response = self.call_method(request)
if response.status == TaskStatus.Finished:
break
except Exception as e:
reason = str(e)
raise TeaclaveException(
f"Failed to get task result ({reason})")
response = MessageToDict(response,
preserving_proto_field_name=True,
use_integers_for_enums=True)
return base64.b64decode(response["result"]["Ok"]["tags_map"][tag])
def query_audit_logs(self, message: str, limit: int):
self.check_metadata()
self.check_channel()
request = QueryAuditLogsRequest(self.metadata, message, limit)
try:
response = self.call_method(request)
return MessageToDict(response,
preserving_proto_field_name=True,
use_integers_for_enums=True)
except Exception as e:
reason = str(e)
raise TeaclaveException(f"Failed to get audit logs ({reason})")