blob: 300ea652c05263bfbbb5f0df116408fc947772cc [file] [log] [blame]
#! /usr/bin/env python
# coding=utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from subprocess import check_output, CalledProcessError, STDOUT
from twisted.trial import unittest
from pypegasus.pgclient import *
def getstatusoutput(cmd):
try:
data = check_output(cmd, shell=True, universal_newlines=True, stderr=STDOUT)
status = 0
except CalledProcessError as ex:
data = ex.output
status = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return status, data
class ServerOperator(object):
shell_path = '/your/pegasus-shell/dir'
@classmethod
def modify_conf(cls, old_conf, new_conf):
origin_conf_file = cls.shell_path + '/src/server/config-server.ini'
status, output = getstatusoutput('sed -i "s/%s/%s/" %s'
% (old_conf, new_conf, origin_conf_file))
# print(status, output)
@classmethod
def start_cluster(cls, meta_count, replica_count, check_health):
status, output = getstatusoutput('cd %s && ./run.sh start_onebox -m %s -r %s'
% (cls.shell_path, meta_count, replica_count))
if check_health:
cls.wait_until_cluster_health()
else:
time.sleep(1) # wait a while for meta ready
@classmethod
def stop_and_clear_cluster(cls):
status, output = getstatusoutput('cd %s && ./run.sh stop_onebox'
% cls.shell_path)
status, output = getstatusoutput('cd %s && ./run.sh clear_onebox'
% cls.shell_path)
@classmethod
def operate_1_server(cls, op_type, server_type, index):
status, output = getstatusoutput('cd %s && ./run.sh %s_onebox_instance -%s %s'
% (cls.shell_path, op_type, server_type, index))
# print(status, output)
@classmethod
def stop_1_replica(cls, index):
cls.operate_1_server('stop', 'r', index)
@classmethod
def start_1_replica(cls, index):
cls.operate_1_server('start', 'r', index)
@classmethod
def restart_1_replica(cls, index):
cls.operate_1_server('restart', 'r', index)
@classmethod
def stop_1_meta(cls, index):
cls.operate_1_server('stop', 'm', index)
@classmethod
def start_1_meta(cls, index):
cls.operate_1_server('start', 'm', index)
@classmethod
def wait_until_cluster_health(cls):
cmd = ('cd %s && echo "app temp -d" | ./run.sh shell |'
' grep fully_healthy_partition_count | awk \'{print $NF}\''
% cls.shell_path)
while True:
status, output = getstatusoutput(cmd)
# 0 means return value, 8 means fully_healthy_partition_count = 8
if status == 0 and output == '8':
break
class TestIntegration(unittest.TestCase):
TEST_HKEY = 'test_hkey_1'
TEST_SKEY = 'test_skey_1'
TEST_VALUE = 'test_value_1'
DATA_COUNT = 500
MAX_RETRY_COUNT = 30
check_health = True
def setUp(self):
ServerOperator.stop_and_clear_cluster()
def tearDown(self):
self.c.close()
ServerOperator.stop_and_clear_cluster()
@inlineCallbacks
def init(self, meta_count, replica_count, confs=None):
if isinstance(confs, dict):
for old_conf, new_conf in confs.items():
ServerOperator.modify_conf(old_conf, new_conf)
ServerOperator.start_cluster(meta_count, replica_count, self.check_health)
self.c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'temp')
ret = yield self.c.init()
self.assertTrue(ret)
@inlineCallbacks
def loop_op(self):
for i in range(self.DATA_COUNT):
ret = yield self.c.get(self.TEST_HKEY + str(i), self.TEST_SKEY, 1000)
if not isinstance(ret, tuple) or ret[0] != error_types.ERR_OK.value or bytes.decode(ret[1]) != self.TEST_VALUE:
defer.returnValue(False)
defer.returnValue(True)
@inlineCallbacks
def check_data(self):
wait_times = 0
while True:
ret = yield self.loop_op()
if not ret:
wait_times += 1
time.sleep(1)
if wait_times >= self.MAX_RETRY_COUNT:
self.assertTrue(False)
else:
break
@inlineCallbacks
def test_replica_start(self):
yield self.init(3, 3)
(ret, ign) = yield self.c.set(self.TEST_HKEY, self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
@inlineCallbacks
def test_can_not_connect(self):
self.c = Pegasus(['127.0.1.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'temp')
ret = yield self.c.init()
self.assertEqual(ret, None)
@inlineCallbacks
def test_1of3_replica_restart(self):
yield self.init(3, 3)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
ServerOperator.restart_1_replica(1)
yield self.check_data()
@inlineCallbacks
def test_3of3_replica_restart(self):
yield self.init(3, 3)
ServerOperator.wait_until_cluster_health()
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
for i in range(1, 4):
ServerOperator.restart_1_replica(i)
yield self.check_data()
@inlineCallbacks
def test_1of5_replica_restart(self):
yield self.init(3, 5)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
ServerOperator.restart_1_replica(1)
yield self.check_data()
@inlineCallbacks
def test_1of5_replica_stop_and_start(self):
yield self.init(3, 5)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
ServerOperator.stop_1_replica(1)
yield self.check_data()
ServerOperator.start_1_replica(1)
yield self.check_data()
@inlineCallbacks
def test_5of5_replica_restart(self):
yield self.init(3, 5)
ServerOperator.wait_until_cluster_health()
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
for i in range(1, 6):
ServerOperator.restart_1_replica(i)
yield self.check_data()
@inlineCallbacks
def test_1of5_replica_stop(self):
yield self.init(3, 5)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
ServerOperator.stop_1_replica(1)
wait_times = 0
while True:
ret = yield self.loop_op()
if not ret:
wait_times += 1
time.sleep(wait_times)
if wait_times >= 20:
self.assertTrue(False)
else:
break
@inlineCallbacks
def test_2of6_replica_stop(self):
yield self.init(3, 6)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
for i in range(1, 3):
ServerOperator.stop_1_replica(i)
time.sleep(5)
yield self.check_data()
@inlineCallbacks
def test_1of5_replica_stop_and_start_with_meta_stop_and_start(self):
confs = {'timeout_ms = 60000': 'timeout_ms = 2000'}
yield self.init(2, 5, confs)
for i in range(self.DATA_COUNT):
(ret, ign) = yield self.c.set(self.TEST_HKEY + str(i), self.TEST_SKEY, self.TEST_VALUE)
self.assertEqual(ret, error_types.ERR_OK.value)
for i in range(2):
ServerOperator.stop_1_replica(1)
ServerOperator.stop_1_meta(i + 1)
time.sleep(30)
yield self.check_data()
ServerOperator.start_1_meta(i + 1)
ServerOperator.start_1_replica(1)
time.sleep(3)
yield self.check_data()
@inlineCallbacks
def test_0_replica_scan_exception(self):
self.check_health = False
yield self.init(3, 0)
o = ScanOptions()
s = self.c.get_scanner(self.TEST_HKEY, b'\x00\x00', b'\xFF\xFF', o)
try:
ret = yield s.get_next()
self.assertEqual(ret, None)
s.close()
except Exception as e:
self.assertEqual(e.args[0], 'session or packet error!')