import os
import pytest
import re
import logging

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from ccmlib.node import ToolError

from dtest import Tester, create_ks
from tools.assertions import assert_all, assert_invalid, assert_none
from tools.jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem

since = pytest.mark.since
logger = logging.getLogger(__name__)


class TestNodetool(Tester):

    def test_decommission_after_drain_is_invalid(self):
        """
        @jira_ticket CASSANDRA-8741

        Running a decommission after a drain should generate
        an unsupported operation message and exit with an error
        code (which we receive as a ToolError exception).
        """
        cluster = self.cluster
        cluster.populate([3]).start()

        node = cluster.nodelist()[0]
        node.drain(block_on_log=True)

        try:
            node.decommission()
            assert not "Expected nodetool error"
        except ToolError as e:
            assert '' == e.stderr
            assert 'Unsupported operation' in e.stdout

    def test_correct_dc_rack_in_nodetool_info(self):
        """
        @jira_ticket CASSANDRA-10382

        Test that nodetool info returns the correct rack and dc
        """

        cluster = self.cluster
        cluster.populate([2, 2])
        cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.GossipingPropertyFileSnitch'})

        for i, node in enumerate(cluster.nodelist()):
            with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as snitch_file:
                for line in ["dc={}".format(node.data_center), "rack=rack{}".format(i % 2)]:
                    snitch_file.write(line + os.linesep)

        cluster.start(wait_for_binary_proto=True)

        for i, node in enumerate(cluster.nodelist()):
            out, err, _ = node.nodetool('info')
            assert 0 == len(err), err
            out_str = out
            if isinstance(out, (bytes, bytearray)):
                out_str = out.decode("utf-8")
            logger.debug(out_str)
            for line in out_str.split(os.linesep):
                if line.startswith('Data Center'):
                    assert line.endswith(node.data_center), \
                        "Expected dc {} for {} but got {}".format(node.data_center, node.address(), line.rsplit(None, 1)[-1])
                elif line.startswith('Rack'):
                    rack = "rack{}".format(i % 2)
                    assert line.endswith(rack), \
                        "Expected rack {} for {} but got {}".format(rack, node.address(), line.rsplit(None, 1)[-1])

    @since('3.4')
    def test_nodetool_timeout_commands(self):
        """
        @jira_ticket CASSANDRA-10953

        Test that nodetool gettimeout and settimeout work at a basic level
        """
        cluster = self.cluster
        cluster.populate([1]).start()
        node = cluster.nodelist()[0]

        types = ['read', 'range', 'write', 'counterwrite', 'cascontention',
                 'truncate', 'misc']
        if cluster.version() < '4.0':
            types.append('streamingsocket')

        # read all of the timeouts, make sure we get a sane response
        for timeout_type in types:
            out, err, _ = node.nodetool('gettimeout {}'.format(timeout_type))
            assert 0 == len(err), err
            logger.debug(out)
            assert re.search(r'.* \d+ ms', out)

        # set all of the timeouts to 123
        for timeout_type in types:
            _, err, _ = node.nodetool('settimeout {} 123'.format(timeout_type))
            assert 0 == len(err), err

        # verify that they're all reported as 123
        for timeout_type in types:
            out, err, _ = node.nodetool('gettimeout {}'.format(timeout_type))
            assert 0 == len(err), err
            logger.debug(out)
            assert re.search(r'.* 123 ms', out)

    @since('3.0')
    def test_cleanup_when_no_replica_with_index(self):
        self._cleanup_when_no_replica(True)

    @since('3.0')
    def test_cleanup_when_no_replica_without_index(self):
        self._cleanup_when_no_replica(False)

    def _cleanup_when_no_replica(self, with_index=False):
        """
        @jira_ticket CASSANDRA-13526
        Test nodetool cleanup KS to remove old data when new replicas in current node instead of directly returning success.
        """
        self.cluster.populate([1, 1]).start(wait_for_binary_proto=True, wait_other_notice=True)

        node_dc1 = self.cluster.nodelist()[0]
        node_dc2 = self.cluster.nodelist()[1]

        # init schema with rf on both data centers
        replication_factor = {'dc1': 1, 'dc2': 1}
        session = self.patient_exclusive_cql_connection(node_dc1, consistency_level=ConsistencyLevel.ALL)
        session_dc2 = self.patient_exclusive_cql_connection(node_dc2, consistency_level=ConsistencyLevel.LOCAL_ONE)
        create_ks(session, 'ks', replication_factor)

        if self.cluster.version() < '4.0':
            session.execute('CREATE TABLE ks.cf (id int PRIMARY KEY, value text) with dclocal_read_repair_chance = 0 AND read_repair_chance = 0;', trace=False)
        else:
            session.execute('CREATE TABLE ks.cf (id int PRIMARY KEY, value text);', trace=False)

        if with_index:
            session.execute('CREATE INDEX value_by_key on ks.cf(value)', trace=False)

        # populate data
        for i in range(0, 100):
            session.execute(SimpleStatement("INSERT INTO ks.cf(id, value) VALUES({}, 'value');".format(i), consistency_level=ConsistencyLevel.ALL))

        # generate sstable
        self.cluster.flush()

        for node in self.cluster.nodelist():
            assert 0 != len(node.get_sstables('ks', 'cf'))
        if with_index:
            assert 100 == len(list(session_dc2.execute("SELECT * FROM ks.cf WHERE value = 'value'"))), 100

        # alter rf to only dc1
        session.execute("ALTER KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 1, 'dc2' : 0};")

        # nodetool cleanup on dc2
        node_dc2.nodetool("cleanup ks cf")
        node_dc2.nodetool("compact ks cf")

        # check local data on dc2
        for node in self.cluster.nodelist():
            if node.data_center == 'dc2':
                assert 0 == len(node.get_sstables('ks', 'cf'))
            else:
                assert 0 != len(node.get_sstables('ks', 'cf'))

        # dc1 data remains
        statement = SimpleStatement("SELECT * FROM ks.cf", consistency_level=ConsistencyLevel.LOCAL_ONE)
        assert 100 == len(list(session.execute(statement)))
        if with_index:
            statement = SimpleStatement("SELECT * FROM ks.cf WHERE value = 'value'", consistency_level=ConsistencyLevel.LOCAL_ONE)
            assert len(list(session.execute(statement))) == 100

        # alter rf back to query dc2, no data, no index
        session.execute("ALTER KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 0, 'dc2' : 1};")
        assert_none(session_dc2, "SELECT * FROM ks.cf")
        if with_index:
            assert_none(session_dc2, "SELECT * FROM ks.cf WHERE value = 'value'")

    def test_meaningless_notice_in_status(self):
        """
        @jira_ticket CASSANDRA-10176

        nodetool status don't return ownership when there is more than one user keyspace
        define (since they likely have different replication infos making ownership
        meaningless in general) and shows a helpful notice as to why it does that.
        This test checks that said notice is only printed is there is indeed more than
        one user keyspace.
        """
        cluster = self.cluster
        cluster.populate([3]).start()

        node = cluster.nodelist()[0]

        notice_message = r'effective ownership information is meaningless'

        # Do a first try without any keypace, we shouldn't have the notice
        out, err, _ = node.nodetool('status')
        assert 0 == len(err), err
        assert not re.search(notice_message, out)

        session = self.patient_cql_connection(node)
        session.execute("CREATE KEYSPACE ks1 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1}")

        # With 1 keyspace, we should still not get the notice
        out, err, _ = node.nodetool('status')
        assert 0 == len(err), err
        assert not re.search(notice_message, out)

        session.execute("CREATE KEYSPACE ks2 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1}")

        # With 2 keyspaces with the same settings, we should not get the notice
        out, err, _ = node.nodetool('status')
        assert 0 == len(err), err
        assert not re.search(notice_message, out)

        session.execute("CREATE KEYSPACE ks3 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':3}")

        # With a keyspace without the same replication factor, we should get the notice
        out, err, _ = node.nodetool('status')
        assert 0 == len(err), err
        assert re.search(notice_message, out)

    @since('4.0')
    def test_set_get_batchlog_replay_throttle(self):
        """
        @jira_ticket CASSANDRA-13614

        Test that batchlog replay throttle can be set and get through nodetool
        """
        cluster = self.cluster
        cluster.populate(2)
        node = cluster.nodelist()[0]
        cluster.start()

        # Test that nodetool help messages are displayed
        assert 'Set batchlog replay throttle' in node.nodetool('help setbatchlogreplaythrottle').stdout
        assert 'Print batchlog replay throttle' in node.nodetool('help getbatchlogreplaythrottle').stdout

        # Set and get throttle with nodetool, ensuring that the rate change is logged
        node.nodetool('setbatchlogreplaythrottle 2048')
        assert len(node.grep_log('Updating batchlog replay throttle to 2048 KB/s, 1024 KB/s per endpoint',
                                 filename='debug.log')) >= 0
        assert 'Batchlog replay throttle: 2048 KB/s' in node.nodetool('getbatchlogreplaythrottle').stdout

    @since('3.0')
    def test_reloadlocalschema(self):
        """
        @jira_ticket CASSANDRA-13954

        Test that `nodetool reloadlocalschema` works as intended
        """
        cluster = self.cluster
        cluster.populate(1)
        node = cluster.nodelist()[0]
        remove_perf_disable_shared_mem(node)  # for jmx
        cluster.start()

        session = self.patient_cql_connection(node)

        query = "CREATE KEYSPACE IF NOT EXISTS test WITH replication " \
                "= {'class': 'NetworkTopologyStrategy', 'datacenter1': 2};"
        session.execute(query)

        query = 'CREATE TABLE test.test (pk int, ck int, PRIMARY KEY (pk, ck));'
        session.execute(query)

        ss = make_mbean('db', type='StorageService')

        # get initial schema version
        with JolokiaAgent(node) as jmx:
            schema_version = jmx.read_attribute(ss, 'SchemaVersion')

        # manually add a regular column 'val' to test.test
        query = """
            INSERT INTO system_schema.columns
                (keyspace_name, table_name, column_name, clustering_order,
                 column_name_bytes, kind, position, type)
            VALUES
                ('test', 'test', 'val', 'none',
                 0x76616c, 'regular', -1, 'int');"""
        session.execute(query)

        # validate that schema version wasn't automatically updated
        with JolokiaAgent(node) as jmx:
            assert schema_version == jmx.read_attribute(ss, 'SchemaVersion')

        # make sure the new column wasn't automagically picked up
        assert_invalid(session, 'INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')

        # force the node to reload schema from disk
        node.nodetool('reloadlocalschema')

        # validate that schema version changed
        with JolokiaAgent(node) as jmx:
            assert schema_version != jmx.read_attribute(ss, 'SchemaVersion')

        # try an insert with the new column again and validate it succeeds this time
        session.execute('INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')
        assert_all(session, 'SELECT pk, ck, val FROM test.test;', [[0, 1, 2]])

    @since('3.0')
    def test_refresh_size_estimates_clears_invalid_entries(self):
        """
        @jira_ticket CASSANDRA-14905
         nodetool refreshsizeestimates should clear up entries for tables that no longer exist
        """
        cluster = self.cluster
        cluster.populate(1)
        node = cluster.nodelist()[0]
        cluster.start()
        session = self.patient_exclusive_cql_connection(node)
        session.execute("USE system;")
        # Valid keyspace but invalid table
        session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ('system_auth', 'bad_table', '-5', '5', 0, 0);")
        # Invalid keyspace and table
        session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ('bad_keyspace', 'bad_table', '-5', '5', 0, 0);")
        node.nodetool('refreshsizeestimates')
        assert_none(session, "SELECT * FROM size_estimates WHERE keyspace_name='system_auth' AND table_name='bad_table'")
        assert_none(session, "SELECT * FROM size_estimates WHERE keyspace_name='bad_keyspace'")

    @since('4.0')
    def test_set_get_concurrent_view_builders(self):
        """
        @jira_ticket CASSANDRA-12245

        Test that the number of concurrent view builders can be set and get through nodetool
        """
        cluster = self.cluster
        cluster.populate(2)
        node = cluster.nodelist()[0]
        cluster.start()

        # Test that nodetool help messages are displayed
        assert 'Set the number of concurrent view' in node.nodetool('help setconcurrentviewbuilders').stdout
        assert 'Get the number of concurrent view' in node.nodetool('help getconcurrentviewbuilders').stdout

        # Set and get throttle with nodetool, ensuring that the rate change is logged
        node.nodetool('setconcurrentviewbuilders 4')
        assert 'Current number of concurrent view builders in the system is: \n4' \
               in node.nodetool('getconcurrentviewbuilders').stdout

        # Try to set an invalid zero value
        try:
            node.nodetool('setconcurrentviewbuilders 0')
        except ToolError as e:
            assert 'concurrent_view_builders should be great than 0.' in e.stdout
            assert 'Number of concurrent view builders should be greater than 0.', e.message
        else:
            self.fail("Expected error when setting and invalid value")

    @since('4.0')
    def test_describecluster_more_information_three_datacenters(self):
        """
        nodetool describecluster should be more informative. It should include detailes
        for total node count, list of datacenters, RF, number of nodes per dc, how many
        are down and version(s).
        @jira_ticket CASSANDRA-13853
        @expected_result This test invokes nodetool describecluster and matches the output with the expected one
        """
        cluster = self.cluster
        cluster.populate([1, 2, 1]).start(wait_for_binary_proto=True)

        node1_dc1, node1_dc2, node2_dc2, node1_dc3 = cluster.nodelist()

        session_dc1 = self.patient_cql_connection(node1_dc1)
        session_dc1.execute("create KEYSPACE ks1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2':5, 'dc3':1}")

        session_dc3 = self.patient_cql_connection(node1_dc3)
        session_dc3.execute("create KEYSPACE ks2 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2':5, 'dc3':1}")

        all_nodes = cluster.nodelist()

        out_node1_dc1, node1_dc1_sorted = self._describe(all_nodes.pop())

        for node in all_nodes:
            out, out_sorted = self._describe(node)
            assert node1_dc1_sorted == out_sorted

        logger.debug(out_node1_dc1)
        assert 'Live: 4' in out_node1_dc1
        assert 'Joining: 0' in out_node1_dc1
        assert 'Moving: 0' in out_node1_dc1
        assert 'Leaving: 0' in out_node1_dc1
        assert 'Unreachable: 0' in out_node1_dc1
        assert 'Data Centers:' in out_node1_dc1
        assert 'dc1 #Nodes: 1 #Down: 0' in out_node1_dc1
        assert 'dc2 #Nodes: 2 #Down: 0' in out_node1_dc1
        assert 'dc3 #Nodes: 1 #Down: 0' in out_node1_dc1
        assert 'Keyspaces:' in out_node1_dc1

        expected_keyspaces = [('system_schema', 'LocalStrategy', {''}),
                              ('system', 'LocalStrategy', {''}),
                              ('system_traces', 'SimpleStrategy', {'replication_factor=2'}),
                              ('system_distributed', 'SimpleStrategy', {'replication_factor=3'}),
                              ('system_auth', 'SimpleStrategy', {'replication_factor=1'}),
                              ('ks1', 'NetworkTopologyStrategy', {'dc1=3','dc2=5','dc3=1'}),
                              ('ks2', 'NetworkTopologyStrategy', {'dc1=3','dc2=5','dc3=1'})]

        for (ks, strategy, _) in expected_keyspaces:
            assert "{} -> Replication class: {}".format(ks, strategy) in out_node1_dc1 # replication factor is verified below

        # now check db versions & replication factor:
        # Database versions:
        #       4.0.0: [127.0.0.6:7000, 127.0.0.5:7000, 127.0.0.4:7000, 127.0.0.3:7000, 127.0.0.2:7000, 127.0.0.1:7000]

        lines = out_node1_dc1.splitlines()
        rex = r'(\S+)\s\[(.*)\]'
        found_keyspaces = False
        found_database = False
        verified_rfs = 0
        for i in range(0, len(lines)):
            if 'Keyspaces' in lines[i]:
                found_keyspaces = True
                for x in range(i+1, len(lines)):
                    for (ks, strategy, replication) in expected_keyspaces:
                        if "{} ->".format(ks) in lines[x]:
                            verified_rfs += 1
                            assert strategy in lines[x]
                            assert replication == self._get_replication(lines[x])

            if 'Database versions:' in lines[i]:
                found_database = True
                m = re.search(rex, lines[i+1])
                # group(1) is the version, and all nodes are on the same version
                assert "{}".format(node1_dc1.get_cassandra_version()) in m.group(1)
                nodestring = m.group(2)
                for n in cluster.nodelist():
                    assert n.address_and_port() in nodestring

        assert found_keyspaces
        assert found_database
        assert verified_rfs == len(expected_keyspaces)

    def _get_replication(self, line):
        # ks1 -> Replication class: NetworkTopologyStrategy {dc2=5, dc1=3, dc3=1}
        repl_rex = r'{(.*)}'
        repl_m = re.search(repl_rex, line)
        return {x.strip() for x in repl_m.group(1).split(",")}

    def _describe(self, node):
        node_describe, err, _ = node.nodetool('describecluster')
        assert 0 == len(err), err
        out_sorted = node_describe.split()
        out_sorted.sort()
        return (node_describe, out_sorted)

    @since('4.0')
    def test_sjk(self):
        """
        Verify that SJK generally works.
        """

        cluster = self.cluster
        cluster.populate([1]).start()
        node = cluster.nodelist()[0]

        out, err, _ = node.nodetool('sjk --help')
        logger.debug(out)
        hasPattern = False
        for line in out.split(os.linesep):
            if "    ttop      [Thread Top] Displays threads from JVM process" == line:
                hasPattern = True
        assert hasPattern == True, "Expected help about SJK ttop"

        out, err, _ = node.nodetool('sjk')
        logger.debug(out)
        hasPattern = False
        for line in out.split(os.linesep):
            if "    ttop      [Thread Top] Displays threads from JVM process" == line:
                hasPattern = True
        assert hasPattern == True, "Expected help about SJK ttop"

        out, err, _ = node.nodetool('sjk hh -n 10 --live')
        logger.debug(out)
        hasPattern = False
        for line in out.split(os.linesep):
            if re.match('.*Instances.*Bytes.*Type.*', line):
                hasPattern = True
        assert hasPattern == True, "Expected 'SJK hh' output"
