add tests for network auth (CASSANDRA-13985)
diff --git a/auth_test.py b/auth_test.py
index 34f7212..e7aef05 100644
--- a/auth_test.py
+++ b/auth_test.py
@@ -1,3 +1,5 @@
+import random
+import string
 import time
 from collections import namedtuple
 from datetime import datetime, timedelta
@@ -395,20 +397,20 @@
         self.prepare()
         session = self.get_session(user='cassandra', password='cassandra')
 
-        assert_one(session, "LIST USERS", ['cassandra', True])
+        assert_one(session, "LIST USERS", ['cassandra', True] + all_dcs)
 
         session.execute("CREATE USER IF NOT EXISTS aleksey WITH PASSWORD 'sup'")
         session.execute("CREATE USER IF NOT EXISTS aleksey WITH PASSWORD 'ignored'")
 
         self.get_session(user='aleksey', password='sup')
 
-        assert_all(session, "LIST USERS", [['aleksey', False], ['cassandra', True]])
+        assert_all(session, "LIST USERS", [['aleksey', False] + all_dcs, ['cassandra', True] + all_dcs])
 
         session.execute("DROP USER IF EXISTS aleksey")
-        assert_one(session, "LIST USERS", ['cassandra', True])
+        assert_one(session, "LIST USERS", ['cassandra', True] + all_dcs)
 
         session.execute("DROP USER IF EXISTS aleksey")
-        assert_one(session, "LIST USERS", ['cassandra', True])
+        assert_one(session, "LIST USERS", ['cassandra', True] + all_dcs)
 
     def test_create_ks_auth(self):
         """
@@ -1008,13 +1010,15 @@
 
         self.cluster.stop()
         config = {'authenticator': 'org.apache.cassandra.auth.AllowAllAuthenticator',
-                  'authorizer': 'org.apache.cassandra.auth.AllowAllAuthorizer'}
+                  'authorizer': 'org.apache.cassandra.auth.AllowAllAuthorizer',
+                  'network_authorizer': 'org.apache.cassandra.auth.AllowAllNetworkAuthorizer'}
         self.cluster.set_configuration_options(values=config)
         self.cluster.start(wait_for_binary_proto=True)
 
         self.cluster.stop()
         config = {'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
-                  'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer'}
+                  'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
+                  'network_authorizer': 'org.apache.cassandra.auth.CassandraNetworkAuthorizer'}
         self.cluster.set_configuration_options(values=config)
         self.cluster.start(wait_for_binary_proto=True)
 
@@ -1073,6 +1077,7 @@
         """
         config = {'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
                   'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
+                  'network_authorizer': 'org.apache.cassandra.auth.CassandraNetworkAuthorizer',
                   'permissions_validity_in_ms': permissions_validity}
         self.cluster.set_configuration_options(values=config)
         self.cluster.populate(nodes).start()
@@ -1129,12 +1134,15 @@
 # Third value is login status
 # Fourth value is role options
 # See CASSANDRA-7653 for explanations of these
-Role = namedtuple('Role', ['name', 'superuser', 'login', 'options'])
+dcs_field = [] if CASSANDRA_VERSION_FROM_BUILD < '4.0' else ['dcs']
+Role = namedtuple('Role', ['name', 'superuser', 'login', 'options'] + dcs_field)
 
-mike_role = Role('mike', False, True, {})
-role1_role = Role('role1', False, False, {})
-role2_role = Role('role2', False, False, {})
-cassandra_role = Role('cassandra', True, True, {})
+all_dcs = [] if CASSANDRA_VERSION_FROM_BUILD < '4.0' else ['ALL']
+na_dcs = [] if CASSANDRA_VERSION_FROM_BUILD < '4.0' else ['n/a']
+mike_role = Role('mike', False, True, {}, *all_dcs)
+role1_role = Role('role1', False, False, {}, *na_dcs)
+role2_role = Role('role2', False, False, {}, *na_dcs)
+cassandra_role = Role('cassandra', True, True, {}, *all_dcs)
 
 
 @since('2.2')
@@ -1275,9 +1283,9 @@
 
         # roles with roleadmin can drop roles
         mike.execute("DROP ROLE role1")
-        assert_all(cassandra, "LIST ROLES", [['administrator', False, False, {}],
+        assert_all(cassandra, "LIST ROLES", [['administrator', False, False, {}] + na_dcs,
                                              list(cassandra_role),
-                                             ['klaus', False, True, {}],
+                                             ['klaus', False, True, {}] + all_dcs,
                                              list(mike_role)])
 
         # revoking role admin removes its privileges
@@ -1353,9 +1361,9 @@
         mike.execute("GRANT another_superuser TO role1")
         assert_unauthorized(mike, "CREATE ROLE role2 WITH SUPERUSER = true",
                             "Only superusers can create a role with superuser status")
-        assert_all(cassandra, "LIST ROLES OF role1", [['another_superuser', True, False, {}],
-                                                      ['non_superuser', False, False, {}],
-                                                      ['role1', False, False, {}]])
+        assert_all(cassandra, "LIST ROLES OF role1", [['another_superuser', True, False, {}] + na_dcs,
+                                                      ['non_superuser', False, False, {}] + na_dcs,
+                                                      ['role1', False, False, {}] + na_dcs])
 
     def test_drop_and_revoke_roles_with_superuser_status(self):
         """
@@ -1928,7 +1936,7 @@
         assert_one(cassandra, "LIST ROLES OF mike", list(mike_role))
 
         cassandra.execute("CREATE USER super_user WITH PASSWORD '12345' SUPERUSER")
-        assert_one(cassandra, "LIST ROLES OF super_user", ["super_user", True, True, {}])
+        assert_one(cassandra, "LIST ROLES OF super_user", ["super_user", True, True, {}] + all_dcs)
 
     def test_role_name(self):
         """
@@ -1985,11 +1993,11 @@
         self.get_session(user='mike', password='12345')
 
         cassandra.execute("ALTER ROLE mike WITH LOGIN = false")
-        assert_one(cassandra, "LIST ROLES OF mike", ["mike", False, False, {}])
+        assert_one(cassandra, "LIST ROLES OF mike", ["mike", False, False, {}] + na_dcs)
         self.assert_login_not_allowed('mike', '12345')
 
         cassandra.execute("ALTER ROLE mike WITH LOGIN = true")
-        assert_one(cassandra, "LIST ROLES OF mike", ["mike", False, True, {}])
+        assert_one(cassandra, "LIST ROLES OF mike", ["mike", False, True, {}] + all_dcs)
         self.get_session(user='mike', password='12345')
 
     def test_roles_do_not_inherit_login_privilege(self):
@@ -2006,9 +2014,9 @@
         cassandra.execute("CREATE ROLE with_login WITH PASSWORD = '54321' AND SUPERUSER = false AND LOGIN = true")
         cassandra.execute("GRANT with_login to mike")
 
-        assert_all(cassandra, "LIST ROLES OF mike", [["mike", False, False, {}],
-                                                     ["with_login", False, True, {}]])
-        assert_one(cassandra, "LIST ROLES OF with_login", ["with_login", False, True, {}])
+        assert_all(cassandra, "LIST ROLES OF mike", [["mike", False, False, {}] + na_dcs,
+                                                     ["with_login", False, True, {}] + all_dcs])
+        assert_one(cassandra, "LIST ROLES OF with_login", ["with_login", False, True, {}] + all_dcs)
 
         self.assert_login_not_allowed("mike", "12345")
 
@@ -2053,9 +2061,9 @@
 
         cassandra.execute("GRANT db_admin TO mike")
         mike.execute("CREATE ROLE another_role WITH SUPERUSER = false AND LOGIN = false")
-        assert_all(mike, "LIST ROLES", [["another_role", False, False, {}],
+        assert_all(mike, "LIST ROLES", [["another_role", False, False, {}] + na_dcs,
                                         list(cassandra_role),
-                                        ["db_admin", True, False, {}],
+                                        ["db_admin", True, False, {}] + na_dcs,
                                         list(mike_role)])
 
     def test_list_users_considers_inherited_superuser_status(self):
@@ -2071,8 +2079,8 @@
         cassandra.execute("CREATE ROLE db_admin WITH SUPERUSER = true")
         cassandra.execute("CREATE ROLE mike WITH PASSWORD = '12345' AND SUPERUSER = false AND LOGIN = true")
         cassandra.execute("GRANT db_admin TO mike")
-        assert_all(cassandra, "LIST USERS", [['cassandra', True],
-                                             ["mike", True]])
+        assert_all(cassandra, "LIST USERS", [['cassandra', True] + all_dcs,
+                                             ["mike", True] + all_dcs])
 
     # UDF permissions tests # TODO move to separate fixture & refactor this + auth_test.py
     def test_grant_revoke_udf_permissions(self):
@@ -2683,6 +2691,7 @@
         config = {'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
                   'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
                   'role_manager': 'org.apache.cassandra.auth.CassandraRoleManager',
+                  'network_authorizer': 'org.apache.cassandra.auth.CassandraNetworkAuthorizer',
                   'permissions_validity_in_ms': 0,
                   'roles_validity_in_ms': roles_expiry}
         self.cluster.set_configuration_options(values=config)
@@ -2699,6 +2708,123 @@
         assert list(session.execute(query)) == []
 
 
+@since('4.0')
+class TestNetworkAuth(Tester):
+
+    @pytest.fixture(autouse=True)
+    def fixture_setup_auth(self, fixture_dtest_setup):
+        fixture_dtest_setup.cluster.set_configuration_options(values={
+            'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
+            'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
+            'role_manager': 'org.apache.cassandra.auth.CassandraRoleManager',
+            'network_authorizer': 'org.apache.cassandra.auth.CassandraNetworkAuthorizer',
+            'num_tokens': 1
+        })
+        fixture_dtest_setup.cluster.populate([1, 1], debug=True).start(wait_for_binary_proto=True, jvm_args=['-XX:-PerfDisableSharedMem'])
+        fixture_dtest_setup.dc1_node, fixture_dtest_setup.dc2_node = fixture_dtest_setup.cluster.nodelist()
+        fixture_dtest_setup.superuser = fixture_dtest_setup.patient_exclusive_cql_connection(fixture_dtest_setup.dc1_node, user='cassandra', password='cassandra')
+
+        fixture_dtest_setup.superuser.execute("ALTER KEYSPACE system_auth WITH REPLICATION={'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}")
+        fixture_dtest_setup.superuser.execute("CREATE KEYSPACE ks WITH REPLICATION={'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}")
+        fixture_dtest_setup.superuser.execute("CREATE TABLE ks.tbl (k int primary key, v int)")
+
+    def username(self):
+        return ''.join(random.choice(string.ascii_lowercase) for _ in range(8));
+
+
+    def create_user(self, query_fmt, username):
+        """
+        formats and runs the given auth query and grants permissions to the created user
+        """
+        self.superuser.execute(query_fmt % username)
+        self.superuser.execute("GRANT ALL PERMISSIONS ON ks.tbl TO %s" % username)
+
+    def assertConnectsTo(self, username, node):
+        session = self.exclusive_cql_connection(node, user=username, password='password')
+        session.execute("SELECT * FROM ks.tbl")
+
+    def assertUnauthorized(self, func):
+        try:
+            func()
+            pytest.fail("Expecting Unauthorized exception")
+        except Unauthorized as _:
+            pass
+        except NoHostAvailable as e:
+            cause = list(e.errors.values())[0]
+            assert isinstance(cause, Unauthorized)
+
+    def assertWontConnectTo(self, username, node):
+        self.assertUnauthorized(lambda: self.exclusive_cql_connection(node, user=username, password='password'))
+
+    def clear_network_auth_cache(self, node):
+        mbean = make_mbean('auth', type='NetworkAuthCache')
+        with JolokiaAgent(node) as jmx:
+            jmx.execute_method(mbean, 'invalidate')
+
+    def test_full_dc_access(self):
+        username = self.username()
+        self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true", username)
+        self.assertConnectsTo(username, self.dc1_node)
+        self.assertConnectsTo(username, self.dc2_node)
+
+    def test_single_dc_access(self):
+        username = self.username()
+        self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true AND ACCESS TO DATACENTERS {'dc1'}", username)
+        self.assertConnectsTo(username, self.dc1_node)
+        self.assertWontConnectTo(username, self.dc2_node)
+
+    def test_revoked_dc_access(self):
+        """
+        if a user's access to a dc is revoked while they're connected,
+        all of their requests should fail once the cache is cleared
+        """
+        username = self.username()
+        self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true", username)
+        self.assertConnectsTo(username, self.dc1_node)
+        self.assertConnectsTo(username, self.dc2_node)
+
+        # connect to the dc2 node, then remove permission for it
+        session = self.exclusive_cql_connection(self.dc2_node, user=username, password='password')
+        self.superuser.execute("ALTER ROLE %s WITH ACCESS TO DATACENTERS {'dc1'}" % username)
+        self.clear_network_auth_cache(self.dc2_node)
+        self.assertUnauthorized(lambda: session.execute("SELECT * FROM ks.tbl"))
+
+    def test_create_dc_validation(self):
+        """
+        trying to give a user access to a dc that doesn't exist should fail
+        """
+        username = self.username()
+        with pytest.raises(InvalidRequest):
+            self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true AND ACCESS TO DATACENTERS {'dc1000'}", username)
+
+    def test_alter_dc_validation(self):
+        """
+        trying to give a user access to a dc that doesn't exist should fail
+        """
+        username = self.username()
+        self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true", username)
+        with pytest.raises(InvalidRequest):
+            self.create_user("ALTER ROLE %s WITH ACCESS TO DATACENTERS {'dc1000'}", username)
+
+    def test_revoked_login(self):
+        """
+        If the login flag is set to false for a user with a current connection,
+        all their requests should fail once the cache is cleared. Here because it has
+        more in common with these tests that the other auth tests
+        """
+        username = self.username()
+        superuser = self.patient_exclusive_cql_connection(self.dc1_node, user='cassandra', password='cassandra')
+        self.create_user("CREATE ROLE %s WITH password = 'password' AND LOGIN = true", username)
+        self.assertConnectsTo(username, self.dc1_node)
+        self.assertConnectsTo(username, self.dc2_node)
+
+        # connect to the dc2 node, then remove permission for it
+        session = self.exclusive_cql_connection(self.dc2_node, user=username, password='password')
+        superuser.execute("ALTER ROLE %s WITH LOGIN=false" % username)
+        self.clear_network_auth_cache(self.dc2_node)
+        self.assertUnauthorized(lambda: session.execute("SELECT * FROM ks.tbl"))
+
+
 def role_creator_permissions(creator, role):
     return [(creator, role, perm) for perm in ('ALTER', 'DROP', 'AUTHORIZE')]