| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ignite_runner_suite.h" |
| |
| #include "ignite/client/ignite_client.h" |
| #include "ignite/client/ignite_client_configuration.h" |
| |
| #include <gmock/gmock-matchers.h> |
| #include <gtest/gtest.h> |
| |
| #include <chrono> |
| #include <limits> |
| |
| using namespace ignite; |
| |
| /** |
| * Test suite. |
| */ |
| class compute_test : public ignite_runner_suite { |
| protected: |
| void SetUp() override { |
| ignite_client_configuration cfg{get_node_addrs()}; |
| cfg.set_logger(get_logger()); |
| |
| m_client = ignite_client::start(cfg, std::chrono::seconds(30)); |
| } |
| |
| /** |
| * Get specific node. |
| * @param id Node id. |
| * @return Node. |
| */ |
| cluster_node get_node(size_t id) { |
| auto nodes = m_client.get_cluster_nodes(); |
| std::sort( |
| nodes.begin(), nodes.end(), [](const auto &n1, const auto &n2) { return n1.get_name() < n2.get_name(); }); |
| return nodes[id]; |
| } |
| |
| /** |
| * Get nodes as set. |
| * @return Nodes in set. |
| */ |
| std::set<cluster_node> get_node_set() { |
| auto nodes = m_client.get_cluster_nodes(); |
| return {nodes.begin(), nodes.end()}; |
| } |
| |
| /** |
| * Check that passed argument returned in a specific string form. |
| * |
| * @tparam T Type of the argument. |
| * @param value Argument. |
| * @param expected_str Expected string form. |
| */ |
| template<typename T> |
| void check_argument(T value, const std::string &expected_str) { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| auto execution = m_client.get_compute().submit(cluster_nodes, {}, ECHO_JOB, {value, expected_str}, {}); |
| auto result = execution.get_result(); |
| |
| ASSERT_TRUE(result.has_value()); |
| EXPECT_EQ(result.value().template get<T>(), value); |
| } |
| |
| /** |
| * Check that passed argument returned in an expected string form. |
| * |
| * @tparam T Type of the argument. |
| * @param value Argument. |
| */ |
| template<typename T> |
| void check_argument(T value) { |
| check_argument(std::move(value), std::to_string(value)); |
| } |
| |
| /** Ignite client. */ |
| ignite_client m_client; |
| }; |
| |
| TEST_F(compute_test, get_cluster_nodes) { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| |
| std::sort(cluster_nodes.begin(), cluster_nodes.end(), |
| [](const auto &n1, const auto &n2) { return n1.get_name() < n2.get_name(); }); |
| |
| ASSERT_EQ(4, cluster_nodes.size()); |
| |
| EXPECT_FALSE(cluster_nodes[0].get_id().empty()); |
| EXPECT_FALSE(cluster_nodes[1].get_id().empty()); |
| |
| EXPECT_EQ(3344, cluster_nodes[0].get_address().port); |
| EXPECT_EQ(3345, cluster_nodes[1].get_address().port); |
| |
| EXPECT_FALSE(cluster_nodes[0].get_address().host.empty()); |
| EXPECT_FALSE(cluster_nodes[1].get_address().host.empty()); |
| |
| EXPECT_EQ(cluster_nodes[0].get_address().host, cluster_nodes[1].get_address().host); |
| } |
| |
| TEST_F(compute_test, execute_on_random_node) { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| |
| auto execution = m_client.get_compute().submit(cluster_nodes, {}, NODE_NAME_JOB, {}, {}); |
| auto result = execution.get_result(); |
| |
| ASSERT_TRUE(result.has_value()); |
| EXPECT_THAT(result.value().get<std::string>(), ::testing::StartsWith(PLATFORM_TEST_NODE_RUNNER)); |
| } |
| |
| TEST_F(compute_test, execute_on_specific_node) { |
| auto execution1 = m_client.get_compute().submit({get_node(0)}, {}, NODE_NAME_JOB, {"-", 11}, {}); |
| auto execution2 = m_client.get_compute().submit({get_node(1)}, {}, NODE_NAME_JOB, {":", 22}, {}); |
| |
| auto res1 = execution1.get_result(); |
| auto res2 = execution2.get_result(); |
| |
| ASSERT_TRUE(res1.has_value()); |
| ASSERT_TRUE(res2.has_value()); |
| |
| EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "-_11"); |
| EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "_2:_22"); |
| } |
| |
| TEST_F(compute_test, execute_broadcast_one_node) { |
| auto res = m_client.get_compute().submit_broadcast({get_node(1)}, {}, NODE_NAME_JOB, {"42"}, {}); |
| |
| ASSERT_EQ(res.size(), 1); |
| |
| EXPECT_EQ(res.begin()->first, get_node(1)); |
| |
| ASSERT_TRUE(res.begin()->second.has_value()); |
| EXPECT_EQ(res.begin()->second.value().get_result(), PLATFORM_TEST_NODE_RUNNER + "_242"); |
| } |
| |
| TEST_F(compute_test, execute_broadcast_all_nodes) { |
| auto res = m_client.get_compute().submit_broadcast(get_node_set(), {}, NODE_NAME_JOB, {"42"}, {}); |
| |
| ASSERT_EQ(res.size(), 4); |
| |
| EXPECT_EQ(res[get_node(0)].value().get_result(), get_node(0).get_name() + "42"); |
| EXPECT_EQ(res[get_node(1)].value().get_result(), get_node(1).get_name() + "42"); |
| EXPECT_EQ(res[get_node(2)].value().get_result(), get_node(2).get_name() + "42"); |
| EXPECT_EQ(res[get_node(3)].value().get_result(), get_node(3).get_name() + "42"); |
| } |
| |
| TEST_F(compute_test, execute_with_args) { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| |
| auto execution = m_client.get_compute().submit(cluster_nodes, {}, CONCAT_JOB, {5.3, uuid(), "42", nullptr}, {}); |
| auto result = execution.get_result(); |
| |
| ASSERT_TRUE(result.has_value()); |
| EXPECT_EQ(result.value().get<std::string>(), "5.3_00000000-0000-0000-0000-000000000000_42_null"); |
| } |
| |
| TEST_F(compute_test, job_error_propagates_to_client) { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| |
| EXPECT_THROW( |
| { |
| try { |
| m_client.get_compute().submit(cluster_nodes, {}, ERROR_JOB, {"unused"}, {}).get_result(); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job error")); |
| // TODO https://issues.apache.org/jira/browse/IGNITE-19603 |
| // EXPECT_THAT(e.what_str(), |
| // testing::HasSubstr( |
| // "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$CustomException")); |
| // EXPECT_THAT(e.what_str(), testing::HasSubstr("IGN-TBL-3")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, unknown_node_execute_throws) { |
| auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234}); |
| |
| EXPECT_THROW( |
| { |
| try { |
| m_client.get_compute().submit({unknown_node}, {}, ECHO_JOB, {"unused"}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), |
| testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| // TODO https://issues.apache.org/jira/browse/IGNITE-21553 |
| TEST_F(compute_test, DISABLED_unknown_node_broadcast_throws) { |
| auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234}); |
| |
| EXPECT_THROW( |
| { |
| try { |
| m_client.get_compute().submit_broadcast({unknown_node}, {}, ECHO_JOB, {"unused"}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), |
| testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, all_arg_types) { |
| check_argument<std::int8_t>(42); |
| check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::max()); |
| check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::min()); |
| |
| check_argument<std::int16_t>(4242); |
| check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::max()); |
| check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::min()); |
| |
| check_argument<std::int32_t>(424242); |
| check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::max()); |
| check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::min()); |
| |
| check_argument<std::int64_t>(424242424242); |
| check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::max()); |
| check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::min()); |
| |
| check_argument<float>(0.123456f); |
| check_argument<float>(std::numeric_limits<float>::max(), "3.4028235E38"); |
| check_argument<float>(std::numeric_limits<float>::min(), "1.17549435E-38"); |
| |
| check_argument<double>(0.987654); |
| check_argument<double>(std::numeric_limits<double>::max(), "1.7976931348623157E308"); |
| check_argument<double>(std::numeric_limits<double>::min(), "2.2250738585072014E-308"); |
| |
| check_argument<big_decimal>({123456, 3}, "123.456"); |
| check_argument<big_decimal>({}, "0"); |
| check_argument<big_decimal>({1, 0}, "1"); |
| |
| auto str_dec = "12345678909876543211234567890.987654321"; |
| check_argument<big_decimal>(big_decimal(str_dec), str_dec); |
| |
| check_argument<ignite_date>({2021, 11, 18}, "2021-11-18"); |
| check_argument<ignite_time>({13, 8, 55, 266574889}, "13:08:55.266574889"); |
| check_argument<ignite_date_time>({{2021, 11, 18}, {13, 8, 55, 266574889}}, "2021-11-18T13:08:55.266574889"); |
| |
| check_argument<uuid>({0, 0}, "00000000-0000-0000-0000-000000000000"); |
| check_argument<uuid>({0x123e4567e89b12d3, 0x7456426614174000}, "123e4567-e89b-12d3-7456-426614174000"); |
| } |
| |
| TEST_F(compute_test, submit_colocated) { |
| std::map<std::int32_t, std::string> nodes_for_values = {{1, "_2"}, {5, "_4"}, {9, ""}, {10, "_2"}, {11, "_4"}}; |
| |
| for (const auto &var : nodes_for_values) { |
| SCOPED_TRACE("key=" + std::to_string(var.first) + ", node=" + var.second); |
| auto key = get_tuple(var.first); |
| |
| auto execution = m_client.get_compute().submit_colocated(TABLE_1, key, {}, NODE_NAME_JOB, {}, {}); |
| auto res_node_name = execution.get_result(); |
| auto expected_node_name = PLATFORM_TEST_NODE_RUNNER + var.second; |
| |
| EXPECT_EQ(expected_node_name, res_node_name.value().get<std::string>()); |
| } |
| } |
| |
| TEST_F(compute_test, execute_colocated_throws_when_table_does_not_exist) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_colocated("unknownTable", get_tuple(42), {}, ECHO_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_STREQ("Table does not exist: 'unknownTable'", e.what()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_colocated_throws_when_key_column_is_missing) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_colocated(TABLE_1, get_tuple("some"), {}, ECHO_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Missed key column: KEY")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_colocated_throws_when_key_is_empty) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_colocated(TABLE_1, {}, {}, ECHO_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Key tuple can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, unknown_unit) { |
| EXPECT_THROW( |
| { |
| try { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| (void) m_client.get_compute().submit(cluster_nodes, {{"unknown"}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:latest doesn't exist")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_unknown_unit_and_version) { |
| EXPECT_THROW( |
| { |
| try { |
| auto cluster_nodes = m_client.get_cluster_nodes(); |
| (void) m_client.get_compute().submit(cluster_nodes, {{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_colocated_unknown_unit_and_version) { |
| EXPECT_THROW( |
| { |
| try { |
| auto comp = m_client.get_compute(); |
| (void) comp.submit_colocated(TABLE_1, get_tuple(1), {{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist")); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, broadcast_unknown_unit_and_version) { |
| auto res = m_client.get_compute().submit_broadcast({get_node(1)}, {{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {}); |
| |
| ASSERT_EQ(res.size(), 1); |
| |
| auto &res1 = res[get_node(1)]; |
| ASSERT_TRUE(res1.has_error()); |
| EXPECT_THAT(res1.error().what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist")); |
| } |
| |
| TEST_F(compute_test, execute_empty_unit_name) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit({get_node(1)}, {{""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit name can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_empty_unit_version) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit({get_node(1)}, {{"some", ""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit version can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, broadcast_empty_unit_name) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_broadcast({get_node(1)}, {{""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit name can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, broadcast_empty_unit_version) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_broadcast({get_node(1)}, {{"some", ""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit version can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_colocated_empty_unit_name) { |
| EXPECT_THROW( |
| { |
| try { |
| (void) m_client.get_compute().submit_colocated(TABLE_1, get_tuple(1), {{""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit name can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, execute_colocated_empty_unit_version) { |
| EXPECT_THROW( |
| { |
| try { |
| auto comp = m_client.get_compute(); |
| comp.submit_colocated(TABLE_1, get_tuple(1), {{"some", ""}}, NODE_NAME_JOB, {}, {}); |
| } catch (const ignite_error &e) { |
| EXPECT_EQ("Deployment unit version can not be empty", e.what_str()); |
| throw; |
| } |
| }, |
| ignite_error); |
| } |
| |
| TEST_F(compute_test, job_execution_status_executing) { |
| const std::int32_t sleep_ms = 3000; |
| |
| auto execution = m_client.get_compute().submit({get_node(1)}, {}, SLEEP_JOB, {sleep_ms}, {}); |
| |
| auto status = execution.get_status(); |
| |
| ASSERT_TRUE(status.has_value()); |
| EXPECT_EQ(job_state::EXECUTING, status->state); |
| } |
| |
| TEST_F(compute_test, DISABLED_job_execution_status_completed) { |
| const std::int32_t sleep_ms = 1; |
| |
| auto execution = m_client.get_compute().submit({get_node(1)}, {}, SLEEP_JOB, {sleep_ms}, {}); |
| execution.get_result(); |
| |
| auto status = execution.get_status(); |
| |
| ASSERT_TRUE(status.has_value()); |
| EXPECT_EQ(job_state::COMPLETED, status->state); |
| } |
| |
| TEST_F(compute_test, job_execution_status_failed) { |
| auto execution = m_client.get_compute().submit({get_node(1)}, {}, ERROR_JOB, {"unused"}, {}); |
| |
| EXPECT_THROW( |
| { |
| try { |
| execution.get_result(); |
| } catch (const ignite_error &e) { |
| EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job error")); |
| throw; |
| } |
| }, |
| ignite_error); |
| |
| auto status = execution.get_status(); |
| |
| ASSERT_TRUE(status.has_value()); |
| EXPECT_EQ(job_state::FAILED, status->state); |
| } |
| |
| TEST_F(compute_test, job_execution_cancel) { |
| const std::int32_t sleep_ms = 5000; |
| |
| auto execution = m_client.get_compute().submit({get_node(1)}, {}, SLEEP_JOB, {sleep_ms}, {}); |
| execution.cancel(); |
| |
| auto status = execution.get_status(); |
| |
| ASSERT_TRUE(status.has_value()); |
| EXPECT_EQ(job_state::CANCELED, status->state); |
| } |
| |
| TEST_F(compute_test, job_execution_change_priority) { |
| const std::int32_t sleep_ms = 5000; |
| |
| auto execution = m_client.get_compute().submit({get_node(1)}, {}, SLEEP_JOB, {sleep_ms}, {}); |
| auto res = execution.change_priority(123); |
| |
| EXPECT_EQ(res, job_execution::operation_result::INVALID_STATE); |
| } |