blob: 9fa25979b335f443925d5e0faa3a8fdda786fd01 [file] [log] [blame]
# -*- coding: utf-8 -*-
import json
import unittest
from dubbo.client import DubboClient, ZkRegister
from dubbo.codec.encoder import Object
from dubbo.common.loggers import init_log
class TestDubbo(unittest.TestCase):
def setUp(self):
init_log() # 初始化日志配置,调用端需要自己配置日志属性
zk = ZkRegister('172.21.4.71:2181')
self.spu_query_provider = DubboClient('com.qianmi.pc.item.api.spu.SpuQueryProvider', zk_register=zk)
# self.dubbo = DubboClient('com.qianmi.pc.item.api.spu', host='172.21.36.82:20880')
def test_run(self):
channel = Object('com.qianmi.pc.base.api.constants.ChannelEnum')
channel['name'] = 'D2C'
spu_query_request = Object('com.qianmi.pc.item.api.spu.request.SpuQueryRequest')
spu_query_request['chainMasterId'] = 'A000000'
spu_query_request['channel'] = channel
spu_query_request['pageSize'] = 2000
result = self.spu_query_provider.call('query', spu_query_request)
pretty_print(result)
print len(result['dataList'])
def pretty_print(value):
print json.dumps(value, ensure_ascii=False, indent=4, sort_keys=True)
def run(_dubbo):
for j in range(100000):
_dubbo.call('echo18')
if __name__ == '__main__':
unittest.main()