blob: ab942fa68381fa2e803f26c97204de1e8c4c363c [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' '''
import json
import os
import unittest2 as unittest
from mock import Mock
import as tracker_access
import as topologies
import as main
import as args
# pylint: disable=missing-docstring, no-self-use
class ExplorerTest(unittest.TestCase):
''' unit tests '''
def setUp(self):
base_path = os.path.dirname(os.path.realpath(__file__))
info_path = os.path.join(base_path, 'info.json')
lp_path = os.path.join(base_path, 'logicalplan.json')
topo_path = os.path.join(base_path, 'topologies.json')
metrics_path = os.path.join(base_path, 'metrics.json')
with open(info_path) as f:
tracker_access.get_topology_info = Mock(return_value=json.load(f))
with open(lp_path) as f:
tracker_access.get_logical_plan = Mock(return_value=json.load(f))
with open(topo_path) as f:
j = json.load(f)
tracker_access.get_cluster_topologies = Mock(return_value=j)
tracker_access.get_cluster_role_topologies = Mock(return_value=j)
tracker_access.get_cluster_role_env_topologies = Mock(return_value=j)
with open(metrics_path) as f:
tracker_access.get_topology_metrics = Mock(return_value=json.load(f))
clusters = ['nyc', 'london', 'tokyo']
tracker_access.get_clusters = Mock(return_value=clusters)
def sample_topo_result(self):
info = []
info.append(['a1', 'a2', 'a3'])
info.append(['a1', 'a3', 'a5'])
info.append(['a1', 'a3', 'a6'])
info.append(['a2', 'a3', 'a6'])
info.append(['b1', 'b2', 'b3'])
info.append(['c1', 'c2', 'c3'])
d = {}
for row in info:
tmp = d
if row[0] not in tmp:
tmp[row[0]] = {}
tmp = tmp[row[0]]
if row[1] not in tmp:
tmp[row[1]] = []
return d, info
def acc_with_optional_args(self, cl, acc):
clv = cl + ['--verbose']
clt = cl + ['--tracker-url=""']
cltv = clv + ['--tracker-url=""']
for cl in clv, clt, cltv:
def sample_topo_cls(self):
clt1 = ['topologies', 'local']
clt2 = ['topologies', 'local/foo']
clt3 = ['topologies', 'local/foo/bar']
all_cl = []
for cl in clt1, clt2, clt3:
self.acc_with_optional_args(cl, all_cl)
return all_cl
def sample_lp_cls(self):
clco = ['components', 'local/rli/default', 'ExclamationTopology']
clsp = ['components', 'local/rli/default', 'ExclamationTopology', '--spout']
clbo = ['components', 'local/rli/default', 'ExclamationTopology', '--bolt']
good_cls = []
for cl in clsp, clbo, clco:
self.acc_with_optional_args(cl, good_cls)
bad_cl1 = ['spouts', 'local/rli/defult', 'ExclamationTopology']
return good_cls, bad_cl1
def sample_pp_cls(self):
clsp = ['metrics', 'local/rli/default', 'ExclamationTopology']
clco = ['containers', 'local/rli/default', 'ExclamationTopology']
good_cls = []
for cl in clsp, clco:
self.acc_with_optional_args(cl, good_cls)
good_cls.append(clco + ['--cid 1'])
bad_cl1 = ['metrics', 'local/rli/defult', 'ExclamationTopology']
return good_cls, bad_cl1
def sample_cluster_cls(self):
cl = ['clusters']
good_cls = []
self.acc_with_optional_args(cl, good_cls)
return good_cls
def sample_cli(self):
clt1 = ['topologies', 'local']
clt2 = ['topologies', 'local/foo']
clt3 = ['topologies', 'local/foo/bar']
clb2 = ['metrics', 'local/foo/bar', 'topo']
clc1 = ['components', 'local/foo/bar', 'topo']
clc1 = ['components', 'local/foo/bar', 'topo', '--spout']
clc1 = ['components', 'local/foo/bar', 'topo', '--bolt']
clc2 = ['containers', 'local/foo/bar', 'topo']
cll = [clt1, clt2, clt3, clb2, clc1, clc2]
all_cl = []
for cl in cll:
self.acc_with_optional_args(cl, all_cl)
# help subcommand
for sub in ['clusters', 'topologies',
'metrics', 'components', 'containers', 'help']:
all_cl.append(['help', sub])
return all_cl
def test_topo(self):
for cl in self.sample_topo_cls():
self.assertEqual(0, main.main(cl))
def test_lp(self):
good, _ = self.sample_lp_cls()
for cl in good:
self.assertEqual(0, main.main(cl))
#self.assertEqual(1, main.main(bad))
def test_help(self):
all_cl = []
for sub in ['clusters', 'topologies', 'metrics',
'components', 'containers', 'help']:
all_cl.append(['help', sub])
for cl in all_cl:
self.assertEqual(0, main.main(cl))
def test_pp(self):
good, bad = self.sample_pp_cls()
for cl in good:
self.assertEqual(0, main.main(cl))
self.assertEqual(1, main.main(bad))
def test_sample_clusters(self):
good = self.sample_cluster_cls()
for cl in good:
self.assertEqual(0, main.main(cl))
def test_topo_result_to_table(self):
d, told = self.sample_topo_result()
tnew, _, _ = topologies.to_table(d)
self.assertEqual(told, tnew)
def test_cli_parsing(self):
parser = main.create_parser()
clis = self.sample_cli()
for cli in clis:
_, unknown_args = parser.parse_known_args(cli)
self.assertTrue(unknown_args is not None)