blob: 4898e5bdd5a56308bcbf7b69c4c7b8268836cb84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ignite_runner_suite.h"
#include "ignite/client/ignite_client.h"
#include "ignite/client/ignite_client_configuration.h"
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include <chrono>
#include <limits>
using namespace ignite;
/**
* Test suite.
*/
class compute_test : public ignite_runner_suite {
protected:
void SetUp() override {
ignite_client_configuration cfg{get_node_addrs()};
cfg.set_logger(get_logger());
m_client = ignite_client::start(cfg, std::chrono::seconds(30));
}
/**
* Get specific node.
* @param id Node id.
* @return Node.
*/
cluster_node get_node(size_t id) {
auto nodes = m_client.get_cluster_nodes();
std::sort(
nodes.begin(), nodes.end(), [](const auto &n1, const auto &n2) { return n1.get_name() < n2.get_name(); });
return nodes[id];
}
/**
* Get nodes as set.
* @return Nodes in set.
*/
std::set<cluster_node> get_node_set() {
auto nodes = m_client.get_cluster_nodes();
return {nodes.begin(), nodes.end()};
}
/**
* Check that passed argument returned in a specific string form.
*
* @tparam T Type of the argument.
* @param value Argument.
* @param expected_str Expected string form.
*/
template<typename T>
void check_argument(T value, const std::string &expected_str) {
auto cluster_nodes = m_client.get_cluster_nodes();
auto execution = m_client.get_compute().submit(job_target::any_node(cluster_nodes), m_echo_job, {value});
auto result = execution.get_result();
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value().get_primitive().template get<T>(), value);
execution = m_client.get_compute().submit(job_target::any_node(cluster_nodes), m_to_string_job, {value});
result = execution.get_result();
ASSERT_TRUE(result.has_value());
auto res_str = result.value().get_primitive().template get<std::string>();
if (res_str != expected_str)
throw ignite_error("Expected equality of these values: '" + res_str + "' and '" + expected_str + "'");
}
/**
* Check that passed argument returned in an expected string form.
*
* @tparam T Type of the argument.
* @param value Argument.
*/
template<typename T>
void check_argument(T value) {
check_argument(std::move(value), std::to_string(value));
}
/** Ignite client. */
ignite_client m_client;
/** Node name job. */
std::shared_ptr<job_descriptor> m_node_name_job{job_descriptor::builder(NODE_NAME_JOB).build()};
/** Echo job. */
std::shared_ptr<job_descriptor> m_echo_job{job_descriptor::builder(ECHO_JOB).build()};
/** Concat job. */
std::shared_ptr<job_descriptor> m_concat_job{job_descriptor::builder(CONCAT_JOB).build()};
/** Error job. */
std::shared_ptr<job_descriptor> m_error_job{job_descriptor::builder(ERROR_JOB).build()};
/** Sleep job. */
std::shared_ptr<job_descriptor> m_sleep_job{job_descriptor::builder(SLEEP_JOB).build()};
/** ToString job. */
std::shared_ptr<job_descriptor> m_to_string_job{job_descriptor::builder(TO_STRING_JOB).build()};
};
TEST_F(compute_test, get_cluster_nodes) {
auto cluster_nodes = m_client.get_cluster_nodes();
std::sort(cluster_nodes.begin(), cluster_nodes.end(),
[](const auto &n1, const auto &n2) { return n1.get_name() < n2.get_name(); });
ASSERT_EQ(4, cluster_nodes.size());
EXPECT_EQ(3344, cluster_nodes[0].get_address().port);
EXPECT_EQ(3345, cluster_nodes[1].get_address().port);
EXPECT_FALSE(cluster_nodes[0].get_address().host.empty());
EXPECT_FALSE(cluster_nodes[1].get_address().host.empty());
EXPECT_EQ(cluster_nodes[0].get_address().host, cluster_nodes[1].get_address().host);
}
TEST_F(compute_test, execute_on_random_node) {
auto cluster_nodes = m_client.get_cluster_nodes();
auto execution = m_client.get_compute().submit(job_target::any_node(cluster_nodes), m_node_name_job, {});
auto result = execution.get_result();
ASSERT_TRUE(result.has_value());
EXPECT_THAT(result.value().get_primitive().get<std::string>(), ::testing::StartsWith(PLATFORM_TEST_NODE_RUNNER));
}
TEST_F(compute_test, execute_on_specific_node) {
auto execution1 = m_client.get_compute().submit(job_target::node(get_node(0)), m_node_name_job, {"-11"});
auto execution2 = m_client.get_compute().submit(job_target::node(get_node(1)), m_node_name_job, {42});
auto res1 = execution1.get_result();
auto res2 = execution2.get_result();
ASSERT_TRUE(res1.has_value());
ASSERT_TRUE(res2.has_value());
EXPECT_EQ(res1.value().get_primitive().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "-11");
EXPECT_EQ(res2.value().get_primitive().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "_242");
}
TEST_F(compute_test, execute_broadcast_one_node) {
auto res = m_client.get_compute().submit_broadcast(broadcast_job_target::node(get_node(1)), m_node_name_job, {"42"});
auto execs = res.get_job_executions();
ASSERT_EQ(execs.size(), 1);
ASSERT_TRUE(execs.front().has_value());
EXPECT_EQ(execs.front().value().get_result()->get_primitive(), PLATFORM_TEST_NODE_RUNNER + "_242");
}
TEST_F(compute_test, execute_broadcast_all_nodes) {
auto res = m_client.get_compute().submit_broadcast(broadcast_job_target::nodes(get_node_set()), m_node_name_job, {"42"});
auto execs = res.get_job_executions();
ASSERT_EQ(execs.size(), 4);
std::sort(execs.begin(), execs.end(), [] (auto &n1, auto &n2) {
return n1.value().get_node().get_name() < n2.value().get_node().get_name();
});
EXPECT_EQ(execs[0].value().get_result()->get_primitive().get<std::string>(), get_node(0).get_name() + "42");
EXPECT_EQ(execs[1].value().get_result()->get_primitive().get<std::string>(), get_node(1).get_name() + "42");
EXPECT_EQ(execs[2].value().get_result()->get_primitive().get<std::string>(), get_node(2).get_name() + "42");
EXPECT_EQ(execs[3].value().get_result()->get_primitive().get<std::string>(), get_node(3).get_name() + "42");
}
TEST_F(compute_test, job_error_propagates_to_client) {
auto cluster_nodes = m_client.get_cluster_nodes();
EXPECT_THROW(
{
try {
m_client.get_compute().submit(job_target::any_node(cluster_nodes), m_error_job, {"unused"}).get_result();
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job error"));
// TODO https://issues.apache.org/jira/browse/IGNITE-19603
// EXPECT_THAT(e.what_str(),
// testing::HasSubstr(
// "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$CustomException"));
// EXPECT_THAT(e.what_str(), testing::HasSubstr("IGN-TBL-3"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, unknown_node_execute_throws) {
auto unknown_node = cluster_node(uuid(1, 2), "random", {"127.0.0.1", 1234});
EXPECT_THROW(
{
try {
m_client.get_compute().submit(job_target::node(unknown_node), m_echo_job, {"unused"});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(),
testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, unknown_node_broadcast_throws) {
auto unknown_node = cluster_node(uuid(1, 2), "random", {"127.0.0.1", 1234});
auto results =
m_client.get_compute().submit_broadcast(broadcast_job_target::node(unknown_node), m_echo_job, {"unused"});
EXPECT_TRUE(results.get_job_executions()[0].has_error());
auto& e = results.get_job_executions()[0].error();
EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]"));
}
TEST_F(compute_test, all_arg_types) {
check_argument<std::int8_t>(42);
check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::max());
check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::min());
check_argument<std::int16_t>(4242);
check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::max());
check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::min());
check_argument<std::int32_t>(424242);
check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::max());
check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::min());
check_argument<std::int64_t>(424242424242);
check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::max());
check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::min());
check_argument<float>(0.123456f);
check_argument<float>(std::numeric_limits<float>::max(), "3.4028235E38");
try {
check_argument<float>(std::numeric_limits<float>::min(), "1.17549435E-38");
} catch (ignite_error &) {
check_argument<float>(std::numeric_limits<float>::min(), "1.1754944E-38");
}
check_argument<double>(0.987654);
check_argument<double>(std::numeric_limits<double>::max(), "1.7976931348623157E308");
check_argument<double>(std::numeric_limits<double>::min(), "2.2250738585072014E-308");
check_argument<big_decimal>({123456, 3}, "123.456");
check_argument<big_decimal>({}, "0");
check_argument<big_decimal>({1, 0}, "1");
auto str_dec = "12345678909876543211234567890.987654321";
check_argument<big_decimal>(big_decimal(str_dec), str_dec);
check_argument<ignite_date>({2021, 11, 18}, "2021-11-18");
check_argument<ignite_time>({13, 8, 55, 266574889}, "13:08:55.266574889");
check_argument<ignite_date_time>({{2021, 11, 18}, {13, 8, 55, 266574889}}, "2021-11-18T13:08:55.266574889");
check_argument<uuid>({0, 0}, "00000000-0000-0000-0000-000000000000");
check_argument<uuid>({0x123e4567e89b12d3, 0x7456426614174000}, "123e4567-e89b-12d3-7456-426614174000");
}
TEST_F(compute_test, submit_colocated) {
std::map<std::int32_t, std::string> nodes_for_values = {{1, "_2"}, {5, "_4"}, {9, ""}, {10, "_2"}, {11, "_4"}};
for (const auto &var : nodes_for_values) {
SCOPED_TRACE("key=" + std::to_string(var.first) + ", node=" + var.second);
auto key = get_tuple(var.first);
auto execution = m_client.get_compute().submit(job_target::colocated(TABLE_1, key), m_node_name_job, {});
auto res_node_name = execution.get_result();
auto expected_node_name = PLATFORM_TEST_NODE_RUNNER + var.second;
EXPECT_EQ(expected_node_name, res_node_name.value().get_primitive().get<std::string>());
}
}
TEST_F(compute_test, execute_colocated_throws_when_table_does_not_exist) {
EXPECT_THROW(
{
try {
(void) m_client.get_compute().submit(job_target::colocated("UNKNOWN_TABLE", get_tuple(42)), m_echo_job, {});
} catch (const ignite_error &e) {
EXPECT_STREQ("Table does not exist: 'PUBLIC.UNKNOWN_TABLE'", e.what());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_colocated_throws_when_key_column_is_missing) {
EXPECT_THROW(
{
try {
(void) m_client.get_compute().submit(job_target::colocated(TABLE_1, get_tuple("some")), m_echo_job, {});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Missed key column: KEY"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_colocated_throws_when_key_is_empty) {
EXPECT_THROW(
{
try {
(void) m_client.get_compute().submit(job_target::colocated(TABLE_1, {}), m_echo_job, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Key tuple can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, unknown_unit) {
EXPECT_THROW(
{
try {
auto cluster_nodes = m_client.get_cluster_nodes();
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"unknown"}})
.build();
(void) m_client.get_compute().submit(job_target::any_node(cluster_nodes), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:latest doesn't exist"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_unknown_unit_and_version) {
EXPECT_THROW(
{
try {
auto cluster_nodes = m_client.get_cluster_nodes();
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"unknown", "1.2.3"}})
.build();
(void) m_client.get_compute().submit(job_target::any_node(cluster_nodes), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_colocated_unknown_unit_and_version) {
EXPECT_THROW(
{
try {
auto comp = m_client.get_compute();
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"unknown", "1.2.3"}})
.build();
(void) comp.submit(job_target::colocated(TABLE_1, get_tuple(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist"));
throw;
}
},
ignite_error);
}
TEST_F(compute_test, broadcast_unknown_unit_and_version) {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"unknown", "1.2.3"}})
.build();
auto res = m_client.get_compute().submit_broadcast(broadcast_job_target::node(get_node(1)), job_desc, {});
auto execs = res.get_job_executions();
ASSERT_EQ(execs.size(), 1);
auto &exec1 = execs.front();
ASSERT_TRUE(exec1.has_error());
EXPECT_THAT(exec1.error().what_str(), ::testing::HasSubstr("Deployment unit unknown:1.2.3 doesn't exist"));
}
TEST_F(compute_test, execute_empty_unit_name) {
EXPECT_THROW(
{
try {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{""}})
.build();
(void) m_client.get_compute().submit(job_target::node(get_node(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_empty_unit_version) {
EXPECT_THROW(
{
try {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"some", ""}})
.build();
(void) m_client.get_compute().submit(job_target::node(get_node(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, broadcast_empty_unit_name) {
EXPECT_THROW(
{
try {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{""}})
.build();
(void) m_client.get_compute().submit_broadcast(broadcast_job_target::node(get_node(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, broadcast_empty_unit_version) {
EXPECT_THROW(
{
try {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"some", ""}})
.build();
(void) m_client.get_compute().submit_broadcast(broadcast_job_target::node(get_node(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_colocated_empty_unit_name) {
EXPECT_THROW(
{
try {
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{""}})
.build();
(void) m_client.get_compute().submit(job_target::colocated(TABLE_1, get_tuple(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, execute_colocated_empty_unit_version) {
EXPECT_THROW(
{
try {
auto comp = m_client.get_compute();
auto job_desc = job_descriptor::builder(NODE_NAME_JOB)
.deployment_units({{"some", ""}})
.build();
comp.submit(job_target::colocated(TABLE_1, get_tuple(1)), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty", e.what_str());
throw;
}
},
ignite_error);
}
TEST_F(compute_test, job_execution_status_executing) {
const std::int32_t sleep_ms = 3000;
auto execution = m_client.get_compute().submit(job_target::node(get_node(1)), m_sleep_job, {sleep_ms});
auto state = execution.get_state();
ASSERT_TRUE(state.has_value());
EXPECT_EQ(job_status::EXECUTING, state->status);
}
TEST_F(compute_test, DISABLED_job_execution_status_completed) {
const std::int32_t sleep_ms = 1;
auto execution = m_client.get_compute().submit(job_target::node(get_node(1)), m_sleep_job, {sleep_ms});
execution.get_result();
auto state = execution.get_state();
ASSERT_TRUE(state.has_value());
EXPECT_EQ(job_status::COMPLETED, state->status);
}
TEST_F(compute_test, job_execution_status_failed) {
auto execution = m_client.get_compute().submit(job_target::node(get_node(1)), m_error_job, {"unused"});
EXPECT_THROW(
{
try {
execution.get_result();
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job error"));
throw;
}
},
ignite_error);
auto state = execution.get_state();
ASSERT_TRUE(state.has_value());
EXPECT_EQ(job_status::FAILED, state->status);
}
TEST_F(compute_test, job_execution_cancel) {
constexpr std::int32_t sleep_ms = 5000;
auto execution = m_client.get_compute().submit(job_target::node(get_node(1)), m_sleep_job, {sleep_ms});
execution.cancel();
auto state = execution.get_state();
ASSERT_TRUE(state.has_value());
EXPECT_EQ(job_status::CANCELED, state->status);
}
TEST_F(compute_test, job_execution_change_priority) {
constexpr std::int32_t sleep_ms = 5000;
auto execution = m_client.get_compute().submit(job_target::node(get_node(1)), m_sleep_job, {sleep_ms});
auto res = execution.change_priority(123);
EXPECT_EQ(res, job_execution::operation_result::INVALID_STATE);
}