blob: 9b71963b916bf23ad9adb885a07f95859cdccf4e [file]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from cli_test_utils import CLITestBase
class TestPrivilegesCommand(CLITestBase):
def test_privilege_commands_validation(self) -> None:
mock_client = self.build_mock_client()
# # Missing --catalog flag
for sub in ["catalog", "namespace", "table", "view", "list"]:
args = (
["privileges", sub, "grant", "PRIV"]
if sub != "list"
else ["privileges", "list"]
)
with self.subTest(subcommand=sub, error="missing catalog"):
self.check_exception(
lambda: self.mock_execute(
mock_client, args + ["--catalog-role", "r"]
),
"--catalog",
)
# Missing --catalog-role flag
for sub in ["catalog", "namespace", "table", "view", "list"]:
args = (
["privileges", sub, "grant", "PRIV"]
if sub != "list"
else ["privileges", "list"]
)
with self.subTest(subcommand=sub, error="missing role"):
self.check_exception(
lambda: self.mock_execute(mock_client, args + ["--catalog", "c"]),
"--catalog-role",
)
# Missing positional privilege
for sub in ["catalog", "namespace", "table", "view"]:
with self.subTest(subcommand=sub, error="missing positional"):
with self.assertRaises(SystemExit):
self.mock_execute(
mock_client,
[
"privileges",
sub,
"grant",
"--catalog",
"c",
"--catalog-role",
"r",
],
)
# Invalid privilege
self.check_exception(
lambda: self.mock_execute(
mock_client,
[
"privileges",
"catalog",
"grant",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_MORE_BOOKS",
],
),
"catalog privilege: TABLE_READ_MORE_BOOKS",
)
def test_privilege_grant(self) -> None:
mock_client = self.build_mock_client()
self.mock_execute(
mock_client,
[
"privileges",
"namespace",
"grant",
"--namespace",
"a.b.c",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_DATA",
],
)
call_args = mock_client.add_grant_to_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2].grant.privilege.value, "TABLE_READ_DATA")
self.assertEqual(call_args[2].grant.namespace, ["a", "b", "c"])
self.mock_execute(
mock_client,
[
"privileges",
"table",
"grant",
"--namespace",
"a.b.c",
"--table",
"t",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_DATA",
],
)
call_args = mock_client.add_grant_to_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2].grant.privilege.value, "TABLE_READ_DATA")
self.assertEqual(call_args[2].grant.namespace, ["a", "b", "c"])
self.assertEqual(call_args[2].grant.table_name, "t")
self.mock_execute(
mock_client,
[
"privileges",
"view",
"grant",
"--namespace",
"a.b.c",
"--catalog",
"foo",
"--catalog-role",
"bar",
"--view",
"v",
"VIEW_FULL_METADATA",
],
)
call_args = mock_client.add_grant_to_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2].grant.privilege.value, "VIEW_FULL_METADATA")
self.assertEqual(call_args[2].grant.namespace, ["a", "b", "c"])
self.assertEqual(call_args[2].grant.view_name, "v")
def test_privilege_revoke(self) -> None:
mock_client = self.build_mock_client()
self.mock_execute(
mock_client,
[
"privileges",
"catalog",
"revoke",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_DATA",
],
)
call_args = mock_client.revoke_grant_from_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2], False)
self.assertEqual(call_args[3].grant.privilege.value, "TABLE_READ_DATA")
self.mock_execute(
mock_client,
[
"privileges",
"table",
"revoke",
"--namespace",
"a.b.c",
"--catalog",
"foo",
"--catalog-role",
"bar",
"--table",
"t",
"--cascade",
"TABLE_READ_DATA",
],
)
call_args = mock_client.revoke_grant_from_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2], True)
self.assertEqual(call_args[3].grant.privilege.value, "TABLE_READ_DATA")
self.assertEqual(call_args[3].grant.namespace, ["a", "b", "c"])
self.assertEqual(call_args[3].grant.table_name, "t")
def test_privilege_list(self) -> None:
mock_client = self.build_mock_client()
mock_client.list_grants_for_catalog_role.return_value.grants = []
self.mock_execute(
mock_client,
["privileges", "list", "--catalog", "foo", "--catalog-role", "bar"],
)
mock_client.list_grants_for_catalog_role.assert_called_with("foo", "bar")
def test_privilege_catalog_grant(self) -> None:
mock_client = self.build_mock_client()
self.mock_execute(
mock_client,
[
"privileges",
"catalog",
"grant",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_DATA",
],
)
call_args = mock_client.add_grant_to_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2].grant.privilege.value, "TABLE_READ_DATA")
def test_privilege_namespace_revoke(self) -> None:
mock_client = self.build_mock_client()
self.mock_execute(
mock_client,
[
"privileges",
"namespace",
"revoke",
"--namespace",
"a.b.c",
"--catalog",
"foo",
"--catalog-role",
"bar",
"TABLE_READ_DATA",
],
)
call_args = mock_client.revoke_grant_from_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2], False)
self.assertEqual(call_args[3].grant.privilege.value, "TABLE_READ_DATA")
self.assertEqual(call_args[3].grant.namespace, ["a", "b", "c"])
def test_privilege_view_revoke(self) -> None:
mock_client = self.build_mock_client()
self.mock_execute(
mock_client,
[
"privileges",
"view",
"revoke",
"--namespace",
"a.b.c",
"--catalog",
"foo",
"--catalog-role",
"bar",
"--view",
"v",
"VIEW_FULL_METADATA",
],
)
call_args = mock_client.revoke_grant_from_catalog_role.call_args[0]
self.assertEqual(call_args[0], "foo")
self.assertEqual(call_args[1], "bar")
self.assertEqual(call_args[2], False)
self.assertEqual(call_args[3].grant.privilege.value, "VIEW_FULL_METADATA")
self.assertEqual(call_args[3].grant.namespace, ["a", "b", "c"])
self.assertEqual(call_args[3].grant.view_name, "v")