blob: d23431dfa744e0a62dee20f2839034f40f602ff2 [file] [log] [blame]
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from rocketmq_eventbridge.client import Client as SDKClientClient
from alibabacloud_tea_openapi import models as open_api_models
from rocketmq_eventbridge import models as sdkclient_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient
class Demo:
_sdk_client: SDKClientClient = None
_endpoint: str = None
def __init__(self):
self._endpoint = '127.0.0.1:7001'
config = open_api_models.Config(
endpoint=self._endpoint
)
self._sdk_client = SDKClientClient(config)
def test_create_connection(self) -> None:
"""
test func for Connection Controller apis:
createConnection *\
deleteConnection *\
updateConnection *\
getConnection *\
selectOneConnection *\
listConnections *\
listEnumsResponse *\
"""
request = sdkclient_models.CreateConnectionRequest(
connection_name='new-connection',
network_parameters=sdkclient_models.CreateConnectionRequestNetworkParameters(
network_type='PublicNetwork'
)
)
try:
res = self._sdk_client.create_connection(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_create_connection_async(self) -> None:
"""
test func for Connection Controller apis:
createConnection *\
deleteConnection *\
updateConnection *\
getConnection *\
selectOneConnection *\
listConnections *\
listEnumsResponse *\
"""
request = sdkclient_models.CreateConnectionRequest(
connection_name='new-connection',
network_parameters=sdkclient_models.CreateConnectionRequestNetworkParameters(
network_type='PublicNetwork'
)
)
try:
res = await self._sdk_client.create_connection_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_delete_connection(self) -> None:
request = sdkclient_models.DeleteConnectionRequest(
connection_name='new-connection'
)
try:
res = self._sdk_client.delete_connection(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_delete_connection_async(self) -> None:
request = sdkclient_models.DeleteConnectionRequest(
connection_name='new-connection'
)
try:
res = await self._sdk_client.delete_connection_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_update_connection(self) -> None:
request = sdkclient_models.UpdateConnectionRequest(
connection_name='new-connection',
network_parameters=sdkclient_models.UpdateConnectionRequestNetworkParameters(
network_type='PrivateNetwork',
security_group_id='eb-167adad548759-security_grop/sg-bp1addad26peuh9qh9rtyb',
vpc_id='eb-test/vpc-bp1symadadwnwgmqud',
vswitche_id='vsw-bp1iu4x7aeradadown1og8,vsw-bp193sqmadadlaszpeqbt2c'
)
)
try:
res = self._sdk_client.update_connection(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_update_connection_async(self) -> None:
request = sdkclient_models.UpdateConnectionRequest(
connection_name='new-connection',
network_parameters=sdkclient_models.UpdateConnectionRequestNetworkParameters(
network_type='PrivateNetwork',
security_group_id='eb-167adad548759-security_grop/sg-bp1addad26peuh9qh9rtyb',
vpc_id='eb-test/vpc-bp1symadadwnwgmqud',
vswitche_id='vsw-bp1iu4x7aeradadown1og8,vsw-bp193sqmadadlaszpeqbt2c'
)
)
try:
res = await self._sdk_client.update_connection_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_get_connections(self) -> None:
request = sdkclient_models.GetConnectionRequest(
connection_name='new-connection'
)
try:
res = self._sdk_client.get_connection(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_get_connections_async(self) -> None:
request = sdkclient_models.GetConnectionRequest(
connection_name='new-connection'
)
try:
res = await self._sdk_client.get_connection_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_select_one_connection(self) -> None:
request = sdkclient_models.GetConnectionRequest(
connection_name='new-connection'
)
try:
res = self._sdk_client.select_one_connection(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_select_one_connection_async(self) -> None:
request = sdkclient_models.GetConnectionRequest(
connection_name='new-connection'
)
try:
res = await self._sdk_client.select_one_connection_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_list_connections(self) -> None:
request = sdkclient_models.ListConnectionsRequest(
max_results=2
)
try:
res = self._sdk_client.list_connections(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_list_connections_async(self) -> None:
request = sdkclient_models.ListConnectionsRequest(
max_results=2
)
try:
res = await self._sdk_client.list_connections_async(request)
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
def test_list_enums_response(self) -> None:
try:
res = self._sdk_client.list_enums_response()
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
async def test_list_enums_response_async(self) -> None:
try:
res = await self._sdk_client.list_enums_response_async()
ConsoleClient.log(UtilClient.to_jsonstring(res.body))
except Exception as err:
ConsoleClient.log('err!')
ConsoleClient.log(err.message)
finally:
ConsoleClient.log('test end!')
demo = Demo()
demo.test_list_connections()