| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <stdlib.h> |
| #include <algorithm> |
| #include <iostream> |
| #include <random> |
| |
| #include <boost/bind.hpp> |
| |
| #include "common/object-pool.h" |
| #include "testutil/gtest-util.h" |
| #include "testutil/rand-util.h" |
| #include "testutil/scoped-flag-setter.h" |
| #include "util/container-util.h" |
| #include "util/periodic-counter-updater.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/thread.h" |
| |
| #include "common/names.h" |
| |
| DECLARE_bool(gen_experimental_profile); |
| DECLARE_int32(status_report_interval_ms); |
| DECLARE_int32(periodic_counter_update_period_ms); |
| DECLARE_uint64(json_profile_event_timestamp_limit); |
| |
| using std::mt19937; |
| using std::shuffle; |
| |
| namespace impala { |
| |
| /// Return true if this is one of the counters automatically added to profiles, |
| /// e.g. TotalTime. |
| static bool IsDefaultCounter(const string& counter_name) { |
| return counter_name == "TotalTime" || counter_name == "InactiveTotalTime"; |
| } |
| |
| TEST(CountersTest, Basic) { |
| ObjectPool pool; |
| RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA"); |
| RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1"); |
| RuntimeProfile* profile_a2 = RuntimeProfile::Create(&pool, "ProfileAb"); |
| |
| TRuntimeProfileTree thrift_profile; |
| |
| profile_a->AddChild(profile_a1); |
| profile_a->AddChild(profile_a2); |
| |
| // Test Empty |
| profile_a->ToThrift(&thrift_profile); |
| EXPECT_EQ(thrift_profile.nodes.size(), 3); |
| thrift_profile.nodes.clear(); |
| |
| RuntimeProfile::Counter* counter_a; |
| RuntimeProfile::Counter* counter_b; |
| RuntimeProfile::Counter* counter_merged; |
| |
| // Updating/setting counter |
| counter_a = profile_a->AddCounter("A", TUnit::UNIT); |
| EXPECT_TRUE(counter_a != NULL); |
| counter_a->Add(10); |
| counter_a->Add(-5); |
| EXPECT_EQ(counter_a->value(), 5); |
| counter_a->Set(1); |
| EXPECT_EQ(counter_a->value(), 1); |
| |
| counter_b = profile_a2->AddCounter("B", TUnit::BYTES); |
| EXPECT_TRUE(counter_b != NULL); |
| |
| // Update status to be included in ExecSummary |
| TExecSummary exec_summary; |
| TStatus status; |
| status.__set_status_code(TErrorCode::CANCELLED); |
| exec_summary.__set_status(status); |
| profile_a->SetTExecSummary(exec_summary); |
| |
| // Serialize/deserialize to thrift |
| profile_a->ToThrift(&thrift_profile); |
| RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile); |
| counter_merged = from_thrift->GetCounter("A"); |
| EXPECT_EQ(counter_merged->value(), 1); |
| EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL); |
| EXPECT_TRUE(from_thrift->GetCounter("Not there") == nullptr); |
| TExecSummary exec_summary_result; |
| from_thrift->GetExecSummary(&exec_summary_result); |
| EXPECT_EQ(exec_summary_result.status, status); |
| |
| // Serialize/deserialize to archive string |
| string archive_str; |
| EXPECT_OK(profile_a->SerializeToArchiveString(&archive_str)); |
| TRuntimeProfileTree deserialized_thrift_profile; |
| EXPECT_OK(RuntimeProfile::DeserializeFromArchiveString( |
| archive_str, &deserialized_thrift_profile)); |
| RuntimeProfile* deserialized_profile = |
| RuntimeProfile::CreateFromThrift(&pool, deserialized_thrift_profile); |
| counter_merged = deserialized_profile->GetCounter("A"); |
| EXPECT_EQ(counter_merged->value(), 1); |
| EXPECT_TRUE(deserialized_profile->GetCounter("Not there") == NULL); |
| EXPECT_TRUE(deserialized_profile->GetCounter("Not there") == nullptr); |
| deserialized_profile->GetExecSummary(&exec_summary_result); |
| EXPECT_EQ(exec_summary_result.status, status); |
| |
| // Serialize/deserialize to compressed binary |
| vector<uint8_t> compressed; |
| EXPECT_OK(profile_a->Compress(&compressed)); |
| RuntimeProfile* deserialized_profile2; |
| EXPECT_OK( |
| RuntimeProfile::DecompressToProfile(compressed, &pool, &deserialized_profile2)); |
| counter_merged = deserialized_profile2->GetCounter("A"); |
| EXPECT_EQ(counter_merged->value(), 1); |
| EXPECT_TRUE(deserialized_profile2->GetCounter("Not there") == NULL); |
| EXPECT_TRUE(deserialized_profile2->GetCounter("Not there") == nullptr); |
| deserialized_profile2->GetExecSummary(&exec_summary_result); |
| EXPECT_EQ(exec_summary_result.status, status); |
| |
| // Averaged |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", 2, true); |
| averaged_profile->UpdateAggregatedFromInstance(from_thrift, 0); |
| counter_merged = averaged_profile->GetCounter("A"); |
| EXPECT_EQ(counter_merged->value(), 1); |
| |
| // Update again, there should be no change. |
| averaged_profile->UpdateAggregatedFromInstance(from_thrift, 0); |
| EXPECT_EQ(counter_merged->value(), 1); |
| |
| counter_a = profile_a2->AddCounter("A", TUnit::UNIT); |
| counter_a->Set(3); |
| averaged_profile->UpdateAggregatedFromInstance(profile_a2, 1); |
| EXPECT_EQ(counter_merged->value(), 2); |
| |
| // Update |
| RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated"); |
| updated_profile->Update(thrift_profile); |
| RuntimeProfile::Counter* counter_updated = updated_profile->GetCounter("A"); |
| EXPECT_EQ(counter_updated->value(), 1); |
| |
| // Update 2 more times, counters should stay the same |
| updated_profile->Update(thrift_profile); |
| updated_profile->Update(thrift_profile); |
| EXPECT_EQ(counter_updated->value(), 1); |
| } |
| |
| void ValidateCounter(RuntimeProfileBase* profile, const string& name, int64_t value) { |
| RuntimeProfile::Counter* counter = profile->GetCounter(name); |
| EXPECT_TRUE(counter != NULL); |
| EXPECT_EQ(counter->value(), value); |
| } |
| |
| TEST(CountersTest, MergeAndUpdate) { |
| // Create two trees. Each tree has two children, one of which has the |
| // same name in both trees. Merging the two trees should result in 3 |
| // children, with the counters from the shared child aggregated. |
| |
| ObjectPool pool; |
| RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent1"); |
| RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1"); |
| RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2"); |
| profile1->AddChild(p1_child1); |
| profile1->AddChild(p1_child2); |
| |
| RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Parent2"); |
| RuntimeProfile* p2_child1 = RuntimeProfile::Create(&pool, "Child1"); |
| RuntimeProfile* p2_child3 = RuntimeProfile::Create(&pool, "Child3"); |
| profile2->AddChild(p2_child1); |
| profile2->AddChild(p2_child3); |
| |
| // Create parent level counters |
| RuntimeProfile::Counter* parent1_shared = |
| profile1->AddCounter("Parent Shared", TUnit::UNIT); |
| RuntimeProfile::Counter* parent2_shared = |
| profile2->AddCounter("Parent Shared", TUnit::UNIT); |
| RuntimeProfile::Counter* parent1_only = |
| profile1->AddCounter("Parent 1 Only", TUnit::UNIT); |
| RuntimeProfile::Counter* parent2_only = |
| profile2->AddCounter("Parent 2 Only", TUnit::UNIT); |
| parent1_shared->Add(1); |
| parent2_shared->Add(3); |
| parent1_only->Add(2); |
| parent2_only->Add(5); |
| |
| // Create child level counters |
| RuntimeProfile::Counter* p1_c1_shared = |
| p1_child1->AddCounter("Child1 Shared", TUnit::UNIT); |
| RuntimeProfile::Counter* p1_c1_only = |
| p1_child1->AddCounter("Child1 Parent 1 Only", TUnit::UNIT); |
| RuntimeProfile::Counter* p1_c2 = |
| p1_child2->AddCounter("Child2", TUnit::UNIT); |
| RuntimeProfile::Counter* p2_c1_shared = |
| p2_child1->AddCounter("Child1 Shared", TUnit::UNIT); |
| RuntimeProfile::Counter* p2_c1_only = |
| p1_child1->AddCounter("Child1 Parent 2 Only", TUnit::UNIT); |
| RuntimeProfile::Counter* p2_c3 = |
| p2_child3->AddCounter("Child3", TUnit::UNIT); |
| p1_c1_shared->Add(10); |
| p1_c1_only->Add(50); |
| p2_c1_shared->Add(20); |
| p2_c1_only->Add(100); |
| p2_c3->Add(30); |
| p1_c2->Add(40); |
| |
| // Merge the two and validate |
| TRuntimeProfileTree tprofile1; |
| profile1->ToThrift(&tprofile1); |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "merged", 2, true); |
| averaged_profile->UpdateAggregatedFromInstance(profile1, 0); |
| averaged_profile->UpdateAggregatedFromInstance(profile2, 1); |
| EXPECT_EQ(5, averaged_profile->num_counters()); |
| ValidateCounter(averaged_profile, "Parent Shared", 2); |
| ValidateCounter(averaged_profile, "Parent 1 Only", 2); |
| ValidateCounter(averaged_profile, "Parent 2 Only", 5); |
| |
| vector<RuntimeProfileBase*> children; |
| averaged_profile->GetChildren(&children); |
| EXPECT_EQ(children.size(), 3); |
| |
| for (int i = 0; i < 3; ++i) { |
| RuntimeProfileBase* profile = children[i]; |
| if (profile->name().compare("Child1") == 0) { |
| EXPECT_EQ(5, profile->num_counters()); |
| ValidateCounter(profile, "Child1 Shared", 15); |
| ValidateCounter(profile, "Child1 Parent 1 Only", 50); |
| ValidateCounter(profile, "Child1 Parent 2 Only", 100); |
| } else if (profile->name().compare("Child2") == 0) { |
| EXPECT_EQ(3, profile->num_counters()); |
| ValidateCounter(profile, "Child2", 40); |
| } else if (profile->name().compare("Child3") == 0) { |
| EXPECT_EQ(3, profile->num_counters()); |
| ValidateCounter(profile, "Child3", 30); |
| } else { |
| FAIL(); |
| } |
| } |
| |
| // make sure we can print |
| stringstream dummy; |
| averaged_profile->PrettyPrint(&dummy); |
| |
| // Update profile2 w/ profile1 and validate |
| profile2->Update(tprofile1); |
| EXPECT_EQ(5, profile2->num_counters()); |
| ValidateCounter(profile2, "Parent Shared", 1); |
| ValidateCounter(profile2, "Parent 1 Only", 2); |
| ValidateCounter(profile2, "Parent 2 Only", 5); |
| |
| profile2->GetChildren(&children); |
| EXPECT_EQ(children.size(), 3); |
| |
| for (int i = 0; i < 3; ++i) { |
| RuntimeProfileBase* profile = children[i]; |
| if (profile->name().compare("Child1") == 0) { |
| EXPECT_EQ(5, profile->num_counters()); |
| ValidateCounter(profile, "Child1 Shared", 10); |
| ValidateCounter(profile, "Child1 Parent 1 Only", 50); |
| ValidateCounter(profile, "Child1 Parent 2 Only", 100); |
| } else if (profile->name().compare("Child2") == 0) { |
| EXPECT_EQ(3, profile->num_counters()); |
| ValidateCounter(profile, "Child2", 40); |
| } else if (profile->name().compare("Child3") == 0) { |
| EXPECT_EQ(3, profile->num_counters()); |
| ValidateCounter(profile, "Child3", 30); |
| } else { |
| FAIL(); |
| } |
| } |
| |
| // make sure we can print |
| profile2->PrettyPrint(&dummy); |
| } |
| |
| // Regression test for IMPALA-6694 - child order isn't preserved if a child |
| // is prepended between updates. |
| TEST(CountersTest, MergeAndUpdateChildOrder) { |
| ObjectPool pool; |
| // Add Child2 first. |
| RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent"); |
| RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2"); |
| profile1->AddChild(p1_child2); |
| TRuntimeProfileTree tprofile1_v1, tprofile1_v2, tprofile1_v3; |
| profile1->ToThrift(&tprofile1_v1); |
| |
| // Update averaged and deserialized profiles from the serialized profile. |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "merged", 2, true); |
| RuntimeProfile* deserialized_profile = RuntimeProfile::Create(&pool, "Parent"); |
| averaged_profile->UpdateAggregatedFromInstance(profile1, 0); |
| deserialized_profile->Update(tprofile1_v1); |
| |
| std::vector<RuntimeProfileBase*> tmp_children; |
| averaged_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(1, tmp_children.size()); |
| EXPECT_EQ("Child2", tmp_children[0]->name()); |
| deserialized_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(1, tmp_children.size()); |
| EXPECT_EQ("Child2", tmp_children[0]->name()); |
| |
| // Prepend Child1 and update profiles. |
| RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1"); |
| profile1->PrependChild(p1_child1); |
| profile1->ToThrift(&tprofile1_v2); |
| averaged_profile->UpdateAggregatedFromInstance(profile1, 0); |
| deserialized_profile->Update(tprofile1_v2); |
| |
| averaged_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(2, tmp_children.size()); |
| EXPECT_EQ("Child1", tmp_children[0]->name()); |
| EXPECT_EQ("Child2", tmp_children[1]->name()); |
| deserialized_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(2, tmp_children.size()); |
| EXPECT_EQ("Child1", tmp_children[0]->name()); |
| EXPECT_EQ("Child2", tmp_children[1]->name()); |
| |
| // Test that changes in order of children is handled gracefully by preserving the |
| // order from the previous update. Sorting puts the children in descending total time |
| // order. |
| p1_child1->total_time_counter()->Set(1); |
| p1_child2->total_time_counter()->Set(2); |
| profile1->SortChildrenByTotalTime(); |
| profile1->GetChildren(&tmp_children); |
| EXPECT_EQ("Child2", tmp_children[0]->name()); |
| EXPECT_EQ("Child1", tmp_children[1]->name()); |
| profile1->ToThrift(&tprofile1_v3); |
| averaged_profile->UpdateAggregatedFromInstance(profile1, 0); |
| deserialized_profile->Update(tprofile1_v2); |
| |
| // The previous order of children that were already present is preserved. |
| averaged_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(2, tmp_children.size()); |
| EXPECT_EQ("Child1", tmp_children[0]->name()); |
| EXPECT_EQ("Child2", tmp_children[1]->name()); |
| deserialized_profile->GetChildren(&tmp_children); |
| EXPECT_EQ(2, tmp_children.size()); |
| EXPECT_EQ("Child1", tmp_children[0]->name()); |
| EXPECT_EQ("Child2", tmp_children[1]->name()); |
| |
| // Make sure we can print the profiles. |
| stringstream dummy; |
| averaged_profile->PrettyPrint(&dummy); |
| deserialized_profile->PrettyPrint(&dummy); |
| } |
| |
| TEST(CountersTest, TotalTimeCounters) { |
| ObjectPool pool; |
| |
| // Set up a three layer profile: parent -> child1 -> child2 |
| RuntimeProfile* parent = RuntimeProfile::Create(&pool, "Parent"); |
| RuntimeProfile* child1 = RuntimeProfile::Create(&pool, "Child1"); |
| RuntimeProfile* child2 = RuntimeProfile::Create(&pool, "Child2"); |
| child1->AddChild(child2); |
| parent->AddChild(child1); |
| |
| // Part 1: Test accumulation of time up from child2 to child1 to parent |
| // One millisecond passes in child2 |
| int64_t one_milli_ns = 1 * NANOS_PER_MICRO * MICROS_PER_MILLI; |
| child2->total_time_counter()->Add(1 * NANOS_PER_MICRO * MICROS_PER_MILLI); |
| parent->ComputeTimeInProfile(); |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| |
| // Child1 is a parent of child2, so it is expected to contain at least as much time |
| // as in child2. In this case, it is equal. However, none of the time is local. |
| EXPECT_EQ(child1->total_time(), one_milli_ns); |
| EXPECT_EQ(child1->local_time(), 0); |
| |
| // The parent is in the same situation as child1 |
| EXPECT_EQ(parent->total_time(), one_milli_ns); |
| EXPECT_EQ(parent->local_time(), 0); |
| |
| // Time now accumulates up to child1 |
| child1->total_time_counter()->Add(child2->total_time()); |
| parent->ComputeTimeInProfile(); |
| |
| // This doesn't change anything for anyone |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| EXPECT_EQ(child1->total_time(), one_milli_ns); |
| EXPECT_EQ(child1->local_time(), 0); |
| EXPECT_EQ(parent->total_time(), one_milli_ns); |
| EXPECT_EQ(parent->local_time(), 0); |
| |
| // Time now accumulates up to parent |
| parent->total_time_counter()->Add(child1->total_time()); |
| parent->ComputeTimeInProfile(); |
| |
| // This doesn't change anything for the parent |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| EXPECT_EQ(child1->total_time(), one_milli_ns); |
| EXPECT_EQ(child1->local_time(), 0); |
| EXPECT_EQ(parent->total_time(), one_milli_ns); |
| EXPECT_EQ(parent->local_time(), 0); |
| |
| // Part 2: Time accumulated in middle child |
| // Add 1ms to the middle child |
| child1->total_time_counter()->Add(one_milli_ns); |
| parent->ComputeTimeInProfile(); |
| |
| // Child2 did not change |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| |
| // Child1 has 1ms more of total time and local time |
| EXPECT_EQ(child1->total_time(), 2 * one_milli_ns); |
| EXPECT_EQ(child1->local_time(), one_milli_ns); |
| |
| // Parent has more total time, but no local time |
| EXPECT_EQ(parent->total_time(), 2 * one_milli_ns); |
| EXPECT_EQ(parent->local_time(), 0); |
| |
| // Accumulate the middle child up to the parent |
| parent->total_time_counter()->Add(one_milli_ns); |
| parent->ComputeTimeInProfile(); |
| |
| // Doesn't change anything |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| EXPECT_EQ(child1->total_time(), 2 * one_milli_ns); |
| EXPECT_EQ(child1->local_time(), one_milli_ns); |
| EXPECT_EQ(parent->total_time(), 2 * one_milli_ns); |
| EXPECT_EQ(parent->local_time(), 0); |
| |
| // Part 3: Time accumulated at parent |
| // Add 1ms to the parent |
| parent->total_time_counter()->Add(one_milli_ns); |
| parent->ComputeTimeInProfile(); |
| |
| // Child1 and child2 don't change |
| EXPECT_EQ(child2->total_time(), one_milli_ns); |
| EXPECT_EQ(child2->local_time(), one_milli_ns); |
| EXPECT_EQ(child1->total_time(), 2 * one_milli_ns); |
| EXPECT_EQ(child1->local_time(), one_milli_ns); |
| |
| // Parent has 1ms more total time and local time |
| EXPECT_EQ(parent->total_time(), 3 * one_milli_ns); |
| EXPECT_EQ(parent->local_time(), one_milli_ns); |
| } |
| |
| TEST(CountersTest, HighWaterMarkCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::HighWaterMarkCounter* bytes_counter = |
| profile->AddHighWaterMarkCounter("bytes", TUnit::BYTES); |
| |
| bytes_counter->Set(10); |
| EXPECT_EQ(bytes_counter->current_value(), 10); |
| EXPECT_EQ(bytes_counter->value(), 10); |
| |
| bytes_counter->Add(5); |
| EXPECT_EQ(bytes_counter->current_value(), 15); |
| EXPECT_EQ(bytes_counter->value(), 15); |
| |
| bytes_counter->Set(5); |
| EXPECT_EQ(bytes_counter->current_value(), 5); |
| EXPECT_EQ(bytes_counter->value(), 15); |
| |
| bytes_counter->Add(3); |
| EXPECT_EQ(bytes_counter->current_value(), 8); |
| EXPECT_EQ(bytes_counter->value(), 15); |
| |
| bool success = bytes_counter->TryAdd(20, 30); |
| EXPECT_TRUE(success); |
| EXPECT_EQ(bytes_counter->current_value(), 28); |
| EXPECT_EQ(bytes_counter->value(), 28); |
| |
| success = bytes_counter->TryAdd(5, 30); |
| EXPECT_FALSE(success); |
| EXPECT_EQ(bytes_counter->current_value(), 28); |
| EXPECT_EQ(bytes_counter->value(), 28); |
| } |
| |
| TEST(CountersTest, SummaryStatsCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Profile 1"); |
| RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 = |
| profile1->AddSummaryStatsCounter("summary_stats", TUnit::UNIT); |
| |
| EXPECT_EQ(summary_stats_counter_1->value(), 0); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max()); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), numeric_limits<int64_t>::min()); |
| |
| summary_stats_counter_1->UpdateCounter(10); |
| EXPECT_EQ(summary_stats_counter_1->value(), 10); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), 10); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), 10); |
| |
| // Check that the average stays the same when updating with the same number. |
| summary_stats_counter_1->UpdateCounter(10); |
| EXPECT_EQ(summary_stats_counter_1->value(), 10); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), 10); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), 10); |
| |
| summary_stats_counter_1->UpdateCounter(40); |
| EXPECT_EQ(summary_stats_counter_1->value(), 20); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), 10); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40); |
| |
| // Verify an update with 0. This should still change the average as the number of |
| // samples increase |
| summary_stats_counter_1->UpdateCounter(0); |
| EXPECT_EQ(summary_stats_counter_1->value(), 15); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), 0); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40); |
| |
| // Verify a negative update.. |
| summary_stats_counter_1->UpdateCounter(-40); |
| EXPECT_EQ(summary_stats_counter_1->value(), 4); |
| EXPECT_EQ(summary_stats_counter_1->MinValue(), -40); |
| EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40); |
| |
| RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Profile 2"); |
| RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 = |
| profile2->AddSummaryStatsCounter("summary_stats", TUnit::UNIT); |
| |
| summary_stats_counter_2->UpdateCounter(100); |
| EXPECT_EQ(summary_stats_counter_2->value(), 100); |
| EXPECT_EQ(summary_stats_counter_2->MinValue(), 100); |
| EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100); |
| |
| TRuntimeProfileTree tprofile1; |
| profile1->ToThrift(&tprofile1); |
| |
| // Merge profile1 and profile2 and check that profile2 is overwritten. |
| profile2->Update(tprofile1); |
| EXPECT_EQ(summary_stats_counter_2->value(), 4); |
| EXPECT_EQ(summary_stats_counter_2->MinValue(), -40); |
| EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40); |
| |
| } |
| |
| // Helper for the AggregateSummaryStats that verifies the encoded event sequence |
| // in the thrift representation when it was merged into the profile at instance offset |
| // 'offset'. |
| static void VerifyThriftSummaryStats( |
| const TRuntimeProfileNode& tnode, int offset, int total_instances) { |
| const int NUM_VALID_INSTANCES = 3; |
| DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances); |
| ASSERT_TRUE(tnode.__isset.aggregated); |
| |
| const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated; |
| ASSERT_TRUE(agg_node.__isset.summary_stats_counters); |
| |
| const TAggSummaryStatsCounter& tcounter = agg_node.summary_stats_counters[0]; |
| EXPECT_EQ("test ss", tcounter.name); |
| EXPECT_EQ(TUnit::UNIT, tcounter.unit); |
| |
| EXPECT_LE(offset + NUM_VALID_INSTANCES, tcounter.has_value.size()); |
| EXPECT_LE(offset + NUM_VALID_INSTANCES, tcounter.sum.size()); |
| EXPECT_LE(offset + NUM_VALID_INSTANCES, tcounter.total_num_values.size()); |
| EXPECT_LE(offset + NUM_VALID_INSTANCES, tcounter.min_value.size()); |
| EXPECT_LE(offset + NUM_VALID_INSTANCES, tcounter.max_value.size()); |
| |
| for (int i = 0; i < total_instances; ++i) { |
| if (i < offset || i >= offset + NUM_VALID_INSTANCES) { |
| EXPECT_FALSE(tcounter.has_value[i]); |
| continue; |
| } |
| EXPECT_TRUE(tcounter.has_value[i]); |
| int min_val = i - offset; |
| EXPECT_EQ(min_val * 2 + 1, tcounter.sum[i]); |
| EXPECT_EQ(2, tcounter.total_num_values[i]); |
| EXPECT_EQ(min_val, tcounter.min_value[i]); |
| EXPECT_EQ(min_val + 1, tcounter.max_value[i]); |
| } |
| } |
| |
| // Test handling of summary statistics in the aggregated profile. |
| TEST(CountersTest, AggregateSummaryStats) { |
| auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const int NUM_PROFILES = 3; |
| // Create a profile with event sequences with some shared event keys. |
| ObjectPool pool; |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| RuntimeProfile::SummaryStatsCounter* counters[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, "Profile"); |
| counters[i] = profiles[i]->AddSummaryStatsCounter("test ss", TUnit::UNIT); |
| counters[i]->UpdateCounter(i); |
| counters[i]->UpdateCounter(i + 1); |
| } |
| |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", 3, true); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| averaged_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| |
| TRuntimeProfileTree ttree; |
| averaged_profile->ToThrift(&ttree); |
| VerifyThriftSummaryStats(ttree.nodes[0], 0, NUM_PROFILES); |
| |
| // Test merging into another averaged profile at an offset |
| const int NUM_UNINIT_PROFILES = 2; |
| const int OFFSET = 1; |
| AggregatedRuntimeProfile* averaged_profile2 = AggregatedRuntimeProfile::Create( |
| &pool, "Merged 2", NUM_PROFILES + NUM_UNINIT_PROFILES, true); |
| averaged_profile2->UpdateAggregatedFromInstances(ttree, OFFSET); |
| TRuntimeProfileTree ttree2; |
| averaged_profile2->ToThrift(&ttree2); |
| VerifyThriftSummaryStats(ttree2.nodes[0], OFFSET, NUM_PROFILES + NUM_UNINIT_PROFILES); |
| } |
| |
| TEST(CountersTest, DerivedCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::Counter* bytes_counter = |
| profile->AddCounter("bytes", TUnit::BYTES); |
| RuntimeProfile::Counter* ticks_counter = |
| profile->AddCounter("ticks", TUnit::TIME_NS); |
| // set to 1 sec |
| ticks_counter->Set(1000L * 1000L * 1000L); |
| |
| RuntimeProfile::DerivedCounter* throughput_counter = |
| profile->AddDerivedCounter("throughput", TUnit::BYTES, |
| bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter)); |
| |
| bytes_counter->Set(10); |
| EXPECT_EQ(throughput_counter->value(), 10); |
| bytes_counter->Set(20); |
| EXPECT_EQ(throughput_counter->value(), 20); |
| ticks_counter->Set(ticks_counter->value() / 2); |
| EXPECT_EQ(throughput_counter->value(), 40); |
| } |
| |
| TEST(CountersTest, AverageSetCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::Counter* bytes_1_counter = |
| profile->AddCounter("bytes 1", TUnit::BYTES); |
| RuntimeProfile::Counter* bytes_2_counter = |
| profile->AddCounter("bytes 2", TUnit::BYTES); |
| |
| bytes_1_counter->Set(10); |
| RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES, 2); |
| bytes_avg.UpdateCounter(bytes_1_counter, 0); |
| // Avg of 10L |
| EXPECT_EQ(bytes_avg.value(), 10); |
| bytes_1_counter->Set(20L); |
| bytes_avg.UpdateCounter(bytes_1_counter, 0); |
| // Avg of 20L |
| EXPECT_EQ(bytes_avg.value(), 20); |
| bytes_2_counter->Set(40L); |
| bytes_avg.UpdateCounter(bytes_2_counter, 1); |
| // Avg of 20L and 40L |
| EXPECT_EQ(bytes_avg.value(), 30); |
| bytes_2_counter->Set(30L); |
| bytes_avg.UpdateCounter(bytes_2_counter, 1); |
| // Avg of 20L and 30L |
| EXPECT_EQ(bytes_avg.value(), 25); |
| |
| RuntimeProfile::Counter* double_1_counter = |
| profile->AddCounter("double 1", TUnit::DOUBLE_VALUE); |
| RuntimeProfile::Counter* double_2_counter = |
| profile->AddCounter("double 2", TUnit::DOUBLE_VALUE); |
| double_1_counter->Set(1.0f); |
| RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE, 2); |
| double_avg.UpdateCounter(double_1_counter, 0); |
| // Avg of 1.0f |
| EXPECT_EQ(double_avg.double_value(), 1.0f); |
| double_1_counter->Set(2.0f); |
| double_avg.UpdateCounter(double_1_counter, 0); |
| // Avg of 2.0f |
| EXPECT_EQ(double_avg.double_value(), 2.0f); |
| double_2_counter->Set(4.0f); |
| double_avg.UpdateCounter(double_2_counter, 1); |
| // Avg of 2.0f and 4.0f |
| EXPECT_EQ(double_avg.double_value(), 3.0f); |
| double_2_counter->Set(3.0f); |
| double_avg.UpdateCounter(double_2_counter, 1); |
| // Avg of 2.0f and 3.0f |
| EXPECT_EQ(double_avg.double_value(), 2.5f); |
| } |
| |
| TEST(CountersTest, AveragedCounterStats) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| // Average 100 input counters with values 100-199. |
| const int NUM_COUNTERS = 100; |
| vector<RuntimeProfile::Counter*> counters; |
| for (int i = 0; i < NUM_COUNTERS; ++i) { |
| counters.push_back( |
| profile->AddCounter(Substitute("c$0", i), TUnit::BYTES)); |
| counters.back()->Set(100 + i); |
| } |
| // Randomize counter order - computed stats shouldn't depend on order. |
| mt19937 rng; |
| RandTestUtil::SeedRng("RUNTIME_PROFILE_TEST_SEED", &rng); |
| shuffle(counters.begin(), counters.end(), rng); |
| |
| RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES, NUM_COUNTERS); |
| for (int i = 0; i < NUM_COUNTERS; ++i) { |
| bytes_avg.UpdateCounter(counters[i], i); |
| } |
| RuntimeProfile::AveragedCounter::Stats<int64_t> stats = bytes_avg.GetStats<int64_t>(); |
| EXPECT_EQ(NUM_COUNTERS, stats.num_vals); |
| EXPECT_EQ(100, stats.min); |
| EXPECT_EQ(199, stats.max); |
| EXPECT_EQ(149, stats.mean); |
| EXPECT_EQ(149, stats.p50); |
| EXPECT_EQ(174, stats.p75); |
| EXPECT_EQ(189, stats.p90); |
| EXPECT_EQ(194, stats.p95); |
| |
| // Round-trip via thrift and confirm values are all the same. |
| TAggCounter tcounter; |
| bytes_avg.ToThrift("", &tcounter); |
| RuntimeProfile::AveragedCounter bytes_avg2( |
| TUnit::BYTES, tcounter.has_value, tcounter.values); |
| stats = bytes_avg2.GetStats<int64_t>(); |
| EXPECT_EQ(NUM_COUNTERS, stats.num_vals); |
| EXPECT_EQ(100, stats.min); |
| EXPECT_EQ(199, stats.max); |
| EXPECT_EQ(149, stats.mean); |
| EXPECT_EQ(149, stats.p50); |
| EXPECT_EQ(174, stats.p75); |
| EXPECT_EQ(189, stats.p90); |
| EXPECT_EQ(194, stats.p95); |
| } |
| |
| TEST(CountersTest, InfoStringTest) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| EXPECT_TRUE(profile->GetInfoString("Key") == NULL); |
| |
| profile->AddInfoString("Key", "Value"); |
| const string* value = profile->GetInfoString("Key"); |
| EXPECT_TRUE(value != NULL); |
| EXPECT_EQ(*value, "Value"); |
| |
| // Convert it to thrift |
| TRuntimeProfileTree tprofile; |
| profile->ToThrift(&tprofile); |
| |
| // Convert it back |
| RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift( |
| &pool, tprofile); |
| value = from_thrift->GetInfoString("Key"); |
| EXPECT_TRUE(value != NULL); |
| EXPECT_EQ(*value, "Value"); |
| |
| // Test update. |
| RuntimeProfile* update_dst_profile = RuntimeProfile::Create(&pool, "Profile2"); |
| update_dst_profile->Update(tprofile); |
| value = update_dst_profile->GetInfoString("Key"); |
| EXPECT_TRUE(value != NULL); |
| EXPECT_EQ(*value, "Value"); |
| |
| // Update the original profile, convert it to thrift and update from the dst |
| // profile |
| profile->AddInfoString("Key", "NewValue"); |
| profile->AddInfoString("Foo", "Bar"); |
| EXPECT_EQ(*profile->GetInfoString("Key"), "NewValue"); |
| EXPECT_EQ(*profile->GetInfoString("Foo"), "Bar"); |
| profile->ToThrift(&tprofile); |
| |
| update_dst_profile->Update(tprofile); |
| EXPECT_EQ(*update_dst_profile->GetInfoString("Key"), "NewValue"); |
| EXPECT_EQ(*update_dst_profile->GetInfoString("Foo"), "Bar"); |
| } |
| |
| // Helper for the AggregateInfoStrings that verifies the encoded event sequence |
| // in the thrift representation when it was merged into the profile at instance offset |
| // 'offset'. |
| static void VerifyThriftInfoStrings( |
| const TRuntimeProfileNode& tnode, int offset, int total_instances) { |
| ASSERT_TRUE(tnode.__isset.aggregated); |
| const int NUM_VALID_INSTANCES = 3; |
| DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances); |
| |
| const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated; |
| ASSERT_TRUE(agg_node.__isset.info_strings); |
| const map<string, map<string, vector<int32_t>>>& info_strings = agg_node.info_strings; |
| auto it = info_strings.find("shared"); |
| EXPECT_TRUE(it != info_strings.end()); |
| EXPECT_EQ(1, it->second.size()) << "Only one distinct value for shared"; |
| |
| // Same value should be present in all instances, i.e. all indices should be present. |
| auto it2 = it->second.find("same value"); |
| EXPECT_TRUE(it2 != it->second.end()); |
| EXPECT_EQ(it2->second, vector<int32_t>({offset + 0, offset + 1, offset + 2})); |
| |
| // Distinct value should have different value per instance. |
| it = info_strings.find("distinct"); |
| EXPECT_TRUE(it != info_strings.end()); |
| EXPECT_EQ(NUM_VALID_INSTANCES, it->second.size()) << "One distinct value per instance"; |
| it2 = it->second.find("val0"); |
| EXPECT_TRUE(it2 != it->second.end()); |
| EXPECT_EQ(it2->second, vector<int32_t>({offset + 0})); |
| it2 = it->second.find("val1"); |
| EXPECT_TRUE(it2 != it->second.end()); |
| EXPECT_EQ(it2->second, vector<int32_t>({offset + 1})); |
| it2 = it->second.find("val2"); |
| EXPECT_TRUE(it2 != it->second.end()); |
| EXPECT_EQ(it2->second, vector<int32_t>({offset + 2})); |
| } |
| |
| // Test handling of info strings in the aggregated profile |
| TEST(CountersTest, AggregateInfoStrings) { |
| auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const int NUM_PROFILES = 3; |
| // Create a profile with info strings that are shared across instances and then |
| // distinct across instances to test that they are deduplicated appropriately. |
| ObjectPool pool; |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, "Profile"); |
| profiles[i]->AddInfoString("shared", "same value"); |
| profiles[i]->AddInfoString("distinct", Substitute("val$0", i)); |
| } |
| |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", 3, true); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| averaged_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| |
| TRuntimeProfileTree ttree; |
| averaged_profile->ToThrift(&ttree); |
| VerifyThriftInfoStrings(ttree.nodes[0], 0, NUM_PROFILES); |
| |
| // Test merging into another averaged profile at an offset |
| const int NUM_UNINIT_PROFILES = 2; |
| const int OFFSET = 1; |
| AggregatedRuntimeProfile* averaged_profile2 = AggregatedRuntimeProfile::Create( |
| &pool, "Merged 2", NUM_PROFILES + NUM_UNINIT_PROFILES, true); |
| averaged_profile2->UpdateAggregatedFromInstances(ttree, OFFSET); |
| TRuntimeProfileTree ttree2; |
| averaged_profile2->ToThrift(&ttree2); |
| VerifyThriftInfoStrings(ttree2.nodes[0], OFFSET, NUM_PROFILES + NUM_UNINIT_PROFILES); |
| } |
| |
| TEST(CountersTest, RateCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| RuntimeProfile::Counter* bytes_counter = |
| profile->AddCounter("bytes", TUnit::BYTES); |
| |
| RuntimeProfile::Counter* rate_counter = |
| profile->AddRateCounter("RateCounter", bytes_counter); |
| EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND); |
| |
| EXPECT_EQ(rate_counter->value(), 0); |
| // set to 100MB. Use bigger units to avoid truncating to 0 after divides. |
| bytes_counter->Set(100L * 1024L * 1024L); |
| |
| // Wait one second. |
| sleep(1); |
| |
| int64_t rate = rate_counter->value(); |
| |
| // Stop the counter so it no longer gets updates |
| profile->StopPeriodicCounters(); |
| |
| // The rate counter is not perfectly accurate. Currently updated at 500ms intervals, |
| // we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s) |
| EXPECT_GT(rate, 66 * 1024 * 1024); |
| EXPECT_LE(rate, 200 * 1024 * 1024); |
| |
| // Wait another second. The counter has been removed. So the value should not be |
| // changed (much). |
| sleep(2); |
| |
| rate = rate_counter->value(); |
| EXPECT_GT(rate, 66 * 1024 * 1024); |
| EXPECT_LE(rate, 200 * 1024 * 1024); |
| } |
| |
| TEST(CountersTest, BucketCounters) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| RuntimeProfile::Counter* unit_counter = |
| profile->AddCounter("unit", TUnit::UNIT); |
| |
| // Set the unit to 1 before sampling |
| unit_counter->Set(1); |
| |
| // Create the bucket counters and start sampling |
| vector<RuntimeProfile::Counter*>* buckets = |
| profile->AddBucketingCounters(unit_counter, 2); |
| |
| // Wait two seconds. |
| sleep(2); |
| |
| // Stop sampling |
| profile->StopPeriodicCounters(); |
| |
| // TODO: change the value to double |
| // The value of buckets[0] should be zero and buckets[1] should be 1. |
| double val0 = (*buckets)[0]->double_value(); |
| double val1 = (*buckets)[1]->double_value(); |
| EXPECT_EQ(0, val0); |
| EXPECT_EQ(100, val1); |
| |
| // Wait another second. The counter has been removed. So the value should not be |
| // changed (much). |
| sleep(2); |
| EXPECT_EQ(val0, (*buckets)[0]->double_value()); |
| EXPECT_EQ(val1, (*buckets)[1]->double_value()); |
| } |
| |
| TEST(CountersTest, EventSequences) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence"); |
| seq->MarkEvent("aaaa"); |
| seq->MarkEvent("bbbb"); |
| seq->MarkEvent("cccc"); |
| |
| vector<RuntimeProfile::EventSequence::Event> events; |
| seq->GetEvents(&events); |
| EXPECT_EQ(3, events.size()); |
| |
| uint64_t last_timestamp = 0; |
| string last_string = ""; |
| for (const RuntimeProfile::EventSequence::Event& ev: events) { |
| EXPECT_TRUE(ev.second >= last_timestamp); |
| last_timestamp = ev.second; |
| EXPECT_TRUE(ev.first > last_string); |
| last_string = ev.first; |
| } |
| |
| TRuntimeProfileTree thrift_profile; |
| profile->ToThrift(&thrift_profile); |
| EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences); |
| EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size()); |
| |
| RuntimeProfile* reconstructed_profile = |
| RuntimeProfile::CreateFromThrift(&pool, thrift_profile); |
| |
| last_timestamp = 0; |
| last_string = ""; |
| EXPECT_EQ(NULL, reconstructed_profile->GetEventSequence("doesn't exist")); |
| seq = reconstructed_profile->GetEventSequence("event sequence"); |
| EXPECT_TRUE(seq != NULL); |
| seq->GetEvents(&events); |
| EXPECT_EQ(3, events.size()); |
| for (const RuntimeProfile::EventSequence::Event& ev: events) { |
| EXPECT_TRUE(ev.second >= last_timestamp); |
| last_timestamp = ev.second; |
| EXPECT_TRUE(ev.first > last_string); |
| last_string = ev.first; |
| } |
| } |
| |
| static void CheckAscending(const vector<int64_t>& v) { |
| if (v.empty()) return; |
| int64_t prev = v[0]; |
| for (int i = 1; i < v.size(); ++i) { |
| EXPECT_LE(prev, v[i]); |
| prev = v[i]; |
| } |
| } |
| |
| // Helper for the AggregateEventSequences that verifies the encoded event sequence |
| // in the thrift representation when it was merged into the profile at instance offset |
| // 'offset'. |
| static void VerifyThriftEventSequences( |
| const TRuntimeProfileNode& tnode, int offset, int total_instances) { |
| ASSERT_TRUE(tnode.__isset.aggregated); |
| const int NUM_VALID_INSTANCES = 3; |
| DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances); |
| |
| const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated; |
| ASSERT_TRUE(agg_node.__isset.event_sequences); |
| ASSERT_EQ(1, agg_node.event_sequences.size()); |
| // Check that the dictionary encoding worked. |
| const TAggEventSequence& tseq = agg_node.event_sequences[0]; |
| EXPECT_EQ("event sequence", tseq.name); |
| EXPECT_EQ("aaaa", tseq.label_dict[0]); |
| EXPECT_EQ("bbbb", tseq.label_dict[1]); |
| EXPECT_EQ("cccc", tseq.label_dict[2]); |
| EXPECT_EQ("dddd", tseq.label_dict[3]); |
| |
| // Validate that the right number of instances are present and that the invalid |
| // instances do not have any data associated with them. |
| EXPECT_EQ(total_instances, tseq.label_idxs.size()); |
| EXPECT_EQ(total_instances, tseq.timestamps.size()); |
| for (int i = 0; i < total_instances; ++i) { |
| if (i < offset || i >= offset + NUM_VALID_INSTANCES) { |
| EXPECT_EQ(0, tseq.label_idxs[i].size()); |
| EXPECT_EQ(0, tseq.timestamps[i].size()); |
| } |
| } |
| |
| // Validate the label/timestamp values for the valid instances. |
| EXPECT_EQ(3, tseq.label_idxs[offset + 0].size()); |
| EXPECT_EQ(0, tseq.label_idxs[offset + 0][0]); |
| EXPECT_EQ(1, tseq.label_idxs[offset + 0][1]); |
| EXPECT_EQ(2, tseq.label_idxs[offset + 0][2]); |
| EXPECT_EQ(3, tseq.timestamps[offset + 0].size()); |
| CheckAscending(tseq.timestamps[offset + 0]); |
| |
| EXPECT_EQ(2, tseq.label_idxs[offset + 1].size()); |
| EXPECT_EQ(0, tseq.label_idxs[offset + 1][0]); |
| EXPECT_EQ(2, tseq.label_idxs[offset + 1][1]); |
| EXPECT_EQ(2, tseq.timestamps[offset + 1].size()); |
| CheckAscending(tseq.timestamps[offset + 1]); |
| |
| EXPECT_EQ(3, tseq.label_idxs[offset + 2].size()); |
| EXPECT_EQ(0, tseq.label_idxs[offset + 2][0]); |
| EXPECT_EQ(3, tseq.label_idxs[offset + 2][1]); |
| EXPECT_EQ(1, tseq.label_idxs[offset + 2][2]); |
| EXPECT_EQ(3, tseq.timestamps[offset + 2].size()); |
| CheckAscending(tseq.timestamps[offset + 2]); |
| } |
| |
| // Test handling of event sequences in the aggregated profile. |
| TEST(CountersTest, AggregateEventSequences) { |
| auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const int NUM_PROFILES = 3; |
| // Create a profile with event sequences with some shared event keys. |
| ObjectPool pool; |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| RuntimeProfile::EventSequence* seqs[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, "Profile"); |
| seqs[i] = profiles[i]->AddEventSequence("event sequence"); |
| seqs[i]->MarkEvent("aaaa"); |
| } |
| seqs[0]->MarkEvent("bbbb"); |
| seqs[0]->MarkEvent("cccc"); |
| seqs[1]->MarkEvent("cccc"); |
| seqs[2]->MarkEvent("dddd"); |
| seqs[2]->MarkEvent("bbbb"); |
| |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", 3, true); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| averaged_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| |
| TRuntimeProfileTree ttree; |
| averaged_profile->ToThrift(&ttree); |
| VerifyThriftEventSequences(ttree.nodes[0], 0, NUM_PROFILES); |
| |
| // Test merging into another averaged profile at an offset |
| const int NUM_UNINIT_PROFILES = 2; |
| const int OFFSET = 1; |
| AggregatedRuntimeProfile* averaged_profile2 = AggregatedRuntimeProfile::Create( |
| &pool, "Merged 2", NUM_PROFILES + NUM_UNINIT_PROFILES, true); |
| averaged_profile2->UpdateAggregatedFromInstances(ttree, OFFSET); |
| TRuntimeProfileTree ttree2; |
| averaged_profile2->ToThrift(&ttree2); |
| VerifyThriftEventSequences(ttree2.nodes[0], OFFSET, NUM_PROFILES + NUM_UNINIT_PROFILES); |
| } |
| |
| TEST(CountersTest, UpdateEmptyEventSequence) { |
| // IMPALA-6824: This test makes sure that adding events to an empty event sequence does |
| // not crash. |
| ObjectPool pool; |
| |
| // Create the profile to send in the update and add some events. |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence"); |
| seq->Start(); |
| // Sleep for 10ms to make sure the events are logged at a time > 0. |
| SleepForMs(10); |
| seq->MarkEvent("aaaa"); |
| seq->MarkEvent("bbbb"); |
| |
| vector<RuntimeProfile::EventSequence::Event> events; |
| seq->GetEvents(&events); |
| EXPECT_EQ(2, events.size()); |
| |
| TRuntimeProfileTree thrift_profile; |
| profile->ToThrift(&thrift_profile); |
| |
| // Create the profile that will be updated and add the empty event sequence to it. |
| RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated Profile"); |
| seq = updated_profile->AddEventSequence("event sequence"); |
| updated_profile->Update(thrift_profile); |
| |
| // Verify that the events have been updated successfully. |
| events.clear(); |
| seq->GetEvents(&events); |
| EXPECT_EQ(2, events.size()); |
| } |
| |
| void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num, |
| int expected_period, int expected_delta) { |
| const int* samples = NULL; |
| int num_samples; |
| int period; |
| |
| samples = sampler.GetSamples(&num_samples, &period); |
| EXPECT_TRUE(samples != NULL); |
| EXPECT_EQ(num_samples, expected_num); |
| EXPECT_EQ(period, expected_period); |
| |
| for (int i = 0; i < expected_num - 1; ++i) { |
| EXPECT_EQ(samples[i] + expected_delta, samples[i + 1]) << i; |
| } |
| } |
| |
| TEST(CountersTest, StreamingSampler) { |
| StreamingSampler<int, 10> sampler(500); |
| |
| int idx = 0; |
| for (int i = 0; i < 3; ++i) { |
| sampler.AddSample(idx++, 500); |
| } |
| ValidateSampler(sampler, 3, 500, 1); |
| |
| for (int i = 0; i < 3; ++i) { |
| sampler.AddSample(idx++, 500); |
| } |
| ValidateSampler(sampler, 6, 500, 1); |
| |
| for (int i = 0; i < 3; ++i) { |
| sampler.AddSample(idx++, 500); |
| } |
| ValidateSampler(sampler, 9, 500, 1); |
| |
| // Added enough to cause a collapse |
| for (int i = 0; i < 3; ++i) { |
| sampler.AddSample(idx++, 500); |
| } |
| // Added enough to cause a collapse |
| ValidateSampler(sampler, 6, 1000, 2); |
| |
| for (int i = 0; i < 3; ++i) { |
| sampler.AddSample(idx++, 500); |
| } |
| ValidateSampler(sampler, 7, 1000, 2); |
| } |
| |
| // Test class to test ConcurrentStopWatch and RuntimeProfile::ConcurrentTimerCounter |
| // don't double count in multithread environment. |
| class TimerCounterTest { |
| public: |
| TimerCounterTest() |
| : timercounter_(TUnit::TIME_NS) {} |
| |
| struct DummyWorker { |
| thread* thread_handle; |
| AtomicBool done; |
| |
| DummyWorker() |
| : thread_handle(NULL), done(false) {} |
| |
| ~DummyWorker() { |
| Stop(); |
| } |
| |
| DummyWorker(const DummyWorker& dummy_worker) |
| : thread_handle(dummy_worker.thread_handle), done(dummy_worker.done.Load()) {} |
| |
| void Stop() { |
| if (!done.Load() && thread_handle != NULL) { |
| done.Store(true); |
| thread_handle->join(); |
| delete thread_handle; |
| thread_handle = NULL; |
| } |
| } |
| }; |
| |
| void Run(DummyWorker* worker) { |
| SCOPED_CONCURRENT_STOP_WATCH(&csw_); |
| SCOPED_CONCURRENT_COUNTER(&timercounter_); |
| while (!worker->done.Load()) { |
| SleepForMs(10); |
| // Each test case should be no more than one second. |
| // Consider test failed if timer is more than 3 seconds. |
| if (csw_.TotalRunningTime() > 6000000000) { |
| FAIL(); |
| } |
| } |
| } |
| |
| // Start certain number of worker threads. If interval is set, it will add some delay |
| // between creating worker thread. |
| void StartWorkers(int num, int interval) { |
| workers_.reserve(num); |
| for (int i = 0; i < num; ++i) { |
| workers_.push_back(DummyWorker()); |
| DummyWorker& worker = workers_.back(); |
| worker.thread_handle = new thread(&TimerCounterTest::Run, this, &worker); |
| SleepForMs(interval); |
| } |
| } |
| |
| // Stop specified thread by index. if index is -1, stop all threads |
| void StopWorkers(int thread_index = -1) { |
| if (thread_index >= 0) { |
| workers_[thread_index].Stop(); |
| } else { |
| for (int i = 0; i < workers_.size(); ++i) { |
| workers_[i].Stop(); |
| } |
| } |
| } |
| |
| void Reset() { |
| workers_.clear(); |
| } |
| |
| // Allow some timer inaccuracy (30ms) since thread join could take some time. |
| static const int MAX_TIMER_ERROR_NS = 30000000; |
| vector<DummyWorker> workers_; |
| ConcurrentStopWatch csw_; |
| RuntimeProfile::ConcurrentTimerCounter timercounter_; |
| }; |
| |
| void ValidateTimerValue(const TimerCounterTest& timer, int64_t start) { |
| int64_t expected_value = MonotonicStopWatch::Now() - start; |
| int64_t stopwatch_value = timer.csw_.TotalRunningTime(); |
| EXPECT_GE(stopwatch_value, expected_value - TimerCounterTest::MAX_TIMER_ERROR_NS); |
| EXPECT_LE(stopwatch_value, expected_value + TimerCounterTest::MAX_TIMER_ERROR_NS); |
| |
| int64_t timer_value = timer.timercounter_.value(); |
| EXPECT_GE(timer_value, expected_value - TimerCounterTest::MAX_TIMER_ERROR_NS); |
| EXPECT_LE(timer_value, expected_value + TimerCounterTest::MAX_TIMER_ERROR_NS); |
| } |
| |
| void ValidateLapTime(TimerCounterTest* timer, int64_t expected_value) { |
| int64_t stopwatch_value = timer->csw_.LapTime(); |
| EXPECT_GE(stopwatch_value, expected_value - TimerCounterTest::MAX_TIMER_ERROR_NS); |
| EXPECT_LE(stopwatch_value, expected_value + TimerCounterTest::MAX_TIMER_ERROR_NS); |
| |
| int64_t timer_value = timer->timercounter_.LapTime(); |
| EXPECT_GE(timer_value, expected_value - TimerCounterTest::MAX_TIMER_ERROR_NS); |
| EXPECT_LE(timer_value, expected_value + TimerCounterTest::MAX_TIMER_ERROR_NS); |
| } |
| |
| TEST(TimerCounterTest, CountersTestOneThread) { |
| TimerCounterTest tester; |
| int64_t start = MonotonicStopWatch::Now(); |
| tester.StartWorkers(1, 0); |
| SleepForMs(500); |
| ValidateTimerValue(tester, start); |
| tester.StopWorkers(-1); |
| ValidateTimerValue(tester, start); |
| } |
| |
| TEST(TimerCounterTest, CountersTestTwoThreads) { |
| TimerCounterTest tester; |
| int64_t start = MonotonicStopWatch::Now(); |
| tester.StartWorkers(2, 10); |
| SleepForMs(500); |
| ValidateTimerValue(tester, start); |
| tester.StopWorkers(-1); |
| ValidateTimerValue(tester, start); |
| } |
| |
| TEST(TimerCounterTest, CountersTestRandom) { |
| TimerCounterTest tester; |
| int64_t start = MonotonicStopWatch::Now(); |
| ValidateTimerValue(tester, start); |
| // First working period |
| tester.StartWorkers(5, 10); |
| ValidateTimerValue(tester, start); |
| SleepForMs(400); |
| tester.StopWorkers(2); |
| ValidateTimerValue(tester, start); |
| SleepForMs(100); |
| tester.StopWorkers(4); |
| ValidateTimerValue(tester, start); |
| SleepForMs(600); |
| tester.StopWorkers(-1); |
| ValidateTimerValue(tester, start); |
| tester.Reset(); |
| |
| ValidateLapTime(&tester, MonotonicStopWatch::Now() - start); |
| int64_t first_run_end = MonotonicStopWatch::Now(); |
| // Adding some idle time. concurrent stopwatch and timer should not count the idle time. |
| SleepForMs(200); |
| start += MonotonicStopWatch::Now() - first_run_end; |
| |
| // Second working period |
| tester.StartWorkers(2, 0); |
| // We just get lap time after first run finish. so at start of second run, expect lap time == 0 |
| ValidateLapTime(&tester, 0); |
| int64_t lap_time_start = MonotonicStopWatch::Now(); |
| SleepForMs(200); |
| ValidateTimerValue(tester, start); |
| SleepForMs(200); |
| tester.StopWorkers(-1); |
| ValidateTimerValue(tester, start); |
| ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start); |
| } |
| |
| // Don't run TestAddClearRace against TSAN builds as it is expected to have race |
| // conditions. |
| #ifndef THREAD_SANITIZER |
| |
| TEST(TimeSeriesCounterTest, TestAddClearRace) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| int i = 0; |
| // Return and increment i |
| auto f = [&i]() { return i++; }; |
| RuntimeProfile::TimeSeriesCounter* counter = |
| profile->AddChunkedTimeSeriesCounter("Counter", TUnit::UNIT, f); |
| // Sleep 1 second for some values to accumulate. |
| sleep(1); |
| int num_samples, period; |
| counter->GetSamplesTest(&num_samples, &period); |
| EXPECT_GT(num_samples, 0); |
| |
| // Wait for more values to show up |
| sleep(1); |
| |
| // Stop the counters. The rest of the test assumes that no new values will be added. |
| profile->StopPeriodicCounters(); |
| |
| // Clear the counter |
| profile->ClearChunkedTimeSeriesCounters(); |
| |
| // Check that clearing multiple times doesn't affect valued that have not been |
| // retrieved. |
| profile->ClearChunkedTimeSeriesCounters(); |
| |
| // Make sure that it still has values in it. |
| counter->GetSamplesTest(&num_samples, &period); |
| EXPECT_GT(num_samples, 0); |
| |
| // Clear it again |
| profile->ClearChunkedTimeSeriesCounters(); |
| |
| // Make sure the values are gone. |
| counter->GetSamplesTest(&num_samples, &period); |
| EXPECT_EQ(num_samples, 0); |
| } |
| |
| #endif |
| |
| /// Stops the periodic counter updater in 'profile' and then clears the samples in |
| /// 'counter'. |
| void StopAndClearCounter(RuntimeProfile* profile, |
| RuntimeProfile::TimeSeriesCounter* counter) { |
| // There's a race between adding the counter and calling StopPeriodicCounters so we |
| // sleep here to make sure we exercise the code that handles the race. |
| sleep(1); |
| profile->StopPeriodicCounters(); |
| |
| // Reset the counter state by reading and clearing its samples. |
| int num_samples = 0; |
| int result_period_unused = 0; |
| counter->GetSamplesTest(&num_samples, &result_period_unused); |
| ASSERT_GT(num_samples, 0); |
| profile->ClearChunkedTimeSeriesCounters(); |
| // Ensure clean state. |
| counter->GetSamplesTest(&num_samples, &result_period_unused); |
| ASSERT_EQ(num_samples, 0); |
| } |
| |
| /// Tests that ChunkedTimeSeriesCounters are bounded by a maximum size. |
| TEST(TimeSeriesCounterTest, TestMaximumSize) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| |
| // Add a counter with a sample function that counts up, starting from 0. |
| int value = 0; |
| auto sample_fn = [&value]() { return value++; }; |
| RuntimeProfile::TimeSeriesCounter* counter = |
| profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn); |
| |
| // Stop counter updates from interfering with the rest of the test. |
| StopAndClearCounter(profile, counter); |
| |
| // Reset value after previous values have been retrieved. |
| value = 0; |
| |
| int64_t max_size = 10 * FLAGS_status_report_interval_ms / test_period; |
| for (int i = 0; i < 10 + max_size; ++i) counter->AddSample(test_period); |
| |
| int num_samples = 0; |
| int result_period = 0; |
| // Retrieve and validate samples. |
| const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period); |
| ASSERT_EQ(num_samples, max_size); |
| // No resampling happens with ChunkedTimeSeriesCounter. |
| ASSERT_EQ(result_period, test_period); |
| |
| // First 10 samples have been truncated |
| ASSERT_EQ(samples[0], 10); |
| } |
| |
| // Helper for the AggregateTimeSeries that verifies the encoded event sequence |
| // in the thrift representation when it was merged into the profile at instance offset |
| // 'offset'. |
| static void VerifyThriftTimeSeries( |
| const TRuntimeProfileNode& tnode, int offset, int total_instances) { |
| ASSERT_TRUE(tnode.__isset.aggregated); |
| const int NUM_VALID_INSTANCES = 3; |
| DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances); |
| |
| const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated; |
| ASSERT_TRUE(agg_node.__isset.time_series_counters); |
| |
| const TAggTimeSeriesCounter& tcounter = agg_node.time_series_counters[0]; |
| EXPECT_EQ("TestCounter", tcounter.name); |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| for (int i = 0; i < total_instances; ++i) { |
| if (i < offset || i >= offset + NUM_VALID_INSTANCES) { |
| EXPECT_EQ(0, tcounter.period_ms[i]); |
| EXPECT_EQ(0, tcounter.values[i].size()); |
| EXPECT_EQ(0, tcounter.start_index[i]); |
| continue; |
| } |
| EXPECT_EQ(test_period, tcounter.period_ms[i]); |
| EXPECT_GE(tcounter.start_index[i], 0); |
| EXPECT_EQ(vector<int64_t>({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), tcounter.values[i]); |
| } |
| } |
| |
| // Test handling of time series counters in the aggregated profile. |
| TEST(TimeSeriesCounterTest, AggregateTimeSeries) { |
| auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const int NUM_PROFILES = 3; |
| // Create a profile with event sequences with some shared event keys. |
| ObjectPool pool; |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| RuntimeProfile::TimeSeriesCounter* counters[NUM_PROFILES]; |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, "Profile"); |
| // Add a counter with a sample function that counts up, starting from 0. |
| int value = 0; |
| auto sample_fn = [&value]() { return value++; }; |
| counters[i] = |
| profiles[i]->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn); |
| |
| // Stop counter updates from interfering with the rest of the test. |
| StopAndClearCounter(profiles[i], counters[i]); |
| |
| // Reset value after previous values have been retrieved. |
| value = 0; |
| |
| for (int j = 0; j < 10; ++j) counters[i]->AddSample(test_period); |
| } |
| |
| AggregatedRuntimeProfile* averaged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", 3, true); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| averaged_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| TRuntimeProfileTree ttree; |
| averaged_profile->ToThrift(&ttree); |
| VerifyThriftTimeSeries(ttree.nodes[0], 0, NUM_PROFILES); |
| |
| // Test merging into another averaged profile at an offset |
| const int NUM_UNINIT_PROFILES = 2; |
| const int OFFSET = 1; |
| AggregatedRuntimeProfile* averaged_profile2 = AggregatedRuntimeProfile::Create( |
| &pool, "Merged 2", NUM_PROFILES + NUM_UNINIT_PROFILES, true); |
| averaged_profile2->UpdateAggregatedFromInstances(ttree, OFFSET); |
| TRuntimeProfileTree ttree2; |
| averaged_profile2->ToThrift(&ttree2); |
| VerifyThriftTimeSeries(ttree2.nodes[0], OFFSET, NUM_PROFILES + NUM_UNINIT_PROFILES); |
| } |
| |
| // Helper for the TAggCounter that verifies the encoded counter in the thrift |
| // representation when it was merged into the profile at instance offset 'offset'. |
| static void VerifyThriftCounters( |
| const TRuntimeProfileNode& tnode, int offset, int total_instances) { |
| ASSERT_TRUE(tnode.__isset.aggregated); |
| const int NUM_VALID_INSTANCES = 3; |
| DCHECK_LE(offset + NUM_VALID_INSTANCES, total_instances); |
| |
| const TAggregatedRuntimeProfileNode& agg_node = tnode.aggregated; |
| ASSERT_TRUE(agg_node.__isset.time_series_counters); |
| |
| const TAggCounter& tcounter = agg_node.counters[2]; |
| EXPECT_EQ("simple_counter", tcounter.name); |
| EXPECT_EQ(TUnit::BYTES, tcounter.unit); |
| for (int i = 0; i < total_instances; ++i) { |
| if (i < offset || i >= offset + NUM_VALID_INSTANCES) { |
| EXPECT_EQ(false, tcounter.has_value[i]); |
| EXPECT_EQ(0, tcounter.values[i]); |
| continue; |
| } |
| EXPECT_EQ(true, tcounter.has_value[i]); |
| EXPECT_EQ((i - offset + 1) * 11, tcounter.values[i]); |
| } |
| } |
| |
| // Test handling aggregation of two profile update, where the second profile update is a |
| // partial update. |
| TEST(CountersTest, PartialUpdate) { |
| auto cert = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const int NUM_PROFILES = 3; |
| // Create a profile with event sequences with some shared event keys. |
| ObjectPool pool; |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| |
| // Create Profiles and Counters. |
| RuntimeProfile::Counter* counters[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, strings::Substitute("Profile $0", i)); |
| counters[i] = profiles[i]->AddCounter("simple_counter", TUnit::BYTES); |
| counters[i]->Set((i + 1) * (i > 0 ? 11 : 5)); |
| } |
| |
| // Create SummaryStatsCounters. |
| RuntimeProfile::SummaryStatsCounter* ss_counters[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| ss_counters[i] = profiles[i]->AddSummaryStatsCounter("test ss", TUnit::UNIT); |
| ss_counters[i]->UpdateCounter(i); |
| if (i > 0) ss_counters[i]->UpdateCounter(i + 1); |
| } |
| |
| // Create InfoStrings. |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i]->AddInfoString("shared", "same value"); |
| if (i > 0) profiles[i]->AddInfoString("distinct", Substitute("val$0", i)); |
| } |
| |
| // Create EventSequences. |
| RuntimeProfile::EventSequence* seqs[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| seqs[i] = profiles[i]->AddEventSequence("event sequence"); |
| seqs[i]->MarkEvent("aaaa"); |
| } |
| seqs[0]->MarkEvent("bbbb"); |
| seqs[1]->MarkEvent("cccc"); |
| seqs[2]->MarkEvent("dddd"); |
| seqs[2]->MarkEvent("bbbb"); |
| |
| // Create TimeSeriesCounters. |
| RuntimeProfile::TimeSeriesCounter* ts_counters[NUM_PROFILES]; |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| // Add a counter with a sample function that counts up, starting from 0. |
| int ts_value = 0; |
| for (int i = NUM_PROFILES - 1; i >= 0; --i) { |
| auto sample_fn = [&ts_value]() { return ts_value++; }; |
| ts_counters[i] = |
| profiles[i]->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn); |
| |
| // Stop counter updates from interfering with the rest of the test. |
| StopAndClearCounter(profiles[i], ts_counters[i]); |
| |
| // Reset value after previous values have been retrieved. |
| ts_value = 0; |
| |
| for (int j = 0; j < (i == 0 ? 9 : 10); ++j) ts_counters[i]->AddSample(test_period); |
| } |
| |
| // Update 1 has instance #1 and #2 reporting final update, while instance #0 reporting |
| // its first update. |
| AggregatedRuntimeProfile* aggregated_profile_1 = |
| AggregatedRuntimeProfile::Create(&pool, "Update 1", NUM_PROFILES, true); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| aggregated_profile_1->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| TRuntimeProfileTree ttree1; |
| aggregated_profile_1->ToThrift(&ttree1); |
| |
| // Update 2 has only instance #0 reporting its final update, which has 1 more sample in |
| // its counter, event sequence, and time series counter. |
| counters[0]->Set(11); |
| ss_counters[0]->UpdateCounter(1); |
| profiles[0]->AddInfoString("distinct", "val0"); |
| seqs[0]->MarkEvent("cccc"); |
| ts_counters[0]->AddSample(test_period); |
| AggregatedRuntimeProfile* aggregated_profile_2 = |
| AggregatedRuntimeProfile::Create(&pool, "Update 2", NUM_PROFILES, true); |
| aggregated_profile_2->UpdateAggregatedFromInstance(profiles[0], 0); |
| TRuntimeProfileTree ttree2; |
| aggregated_profile_2->ToThrift(&ttree2); |
| |
| // Test merging both update into larger aggregated profile (size of 9) at an offset 3. |
| const int MERGE_SIZE = NUM_PROFILES * 3; |
| const int OFFSET = NUM_PROFILES; |
| AggregatedRuntimeProfile* merged_profile = |
| AggregatedRuntimeProfile::Create(&pool, "Merged", MERGE_SIZE, true); |
| merged_profile->UpdateAggregatedFromInstances(ttree1, OFFSET); |
| merged_profile->UpdateAggregatedFromInstances(ttree2, OFFSET); |
| TRuntimeProfileTree ttree_merged; |
| merged_profile->ToThrift(&ttree_merged); |
| |
| // Verify merged SummaryStats, InfoStrings, EventSequences, and TimeSeries. |
| VerifyThriftCounters(ttree_merged.nodes[0], OFFSET, MERGE_SIZE); |
| VerifyThriftSummaryStats(ttree_merged.nodes[0], OFFSET, MERGE_SIZE); |
| VerifyThriftInfoStrings(ttree_merged.nodes[0], OFFSET, MERGE_SIZE); |
| VerifyThriftEventSequences(ttree_merged.nodes[0], OFFSET, MERGE_SIZE); |
| VerifyThriftTimeSeries(ttree_merged.nodes[0], OFFSET, MERGE_SIZE); |
| |
| // Verify that all profile name match. |
| ASSERT_EQ(ttree_merged.nodes[0].aggregated.num_instances, MERGE_SIZE); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| ASSERT_STREQ(profiles[i]->name().c_str(), |
| ttree_merged.nodes[0].aggregated.input_profiles[i + OFFSET].c_str()); |
| } |
| } |
| |
| /// Test parameter class that helps to test time series resampling during profile pretty |
| /// printing with a varying number of test samples. |
| struct TimeSeriesTestParam { |
| TimeSeriesTestParam(int num_samples, vector<const char*> expected) |
| : num_samples(num_samples), expected(move(expected)) {} |
| int num_samples; |
| vector<const char*> expected; |
| |
| // Used by gtest to print values of this struct |
| friend std::ostream& operator<<(std::ostream& os, const TimeSeriesTestParam& p) { |
| return os << "num_samples: " << p.num_samples << endl; |
| } |
| }; |
| |
| class TimeSeriesCounterResampleTest : public testing::TestWithParam<TimeSeriesTestParam> { |
| }; |
| |
| /// Tests that pretty-printing a ChunkedTimeSeriesCounter limits the number or printed |
| /// samples to 64 or lower. |
| TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| const TimeSeriesTestParam& param = GetParam(); |
| FLAGS_periodic_counter_update_period_ms = 500; |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| |
| // Add a counter with a sample function that counts up, starting from 0. |
| int value = 0; |
| auto sample_fn = [&value]() { return value++; }; |
| // We increase the value of this flag to allow the counter to store enough samples. |
| FLAGS_status_report_interval_ms = 50000; |
| RuntimeProfile::TimeSeriesCounter* counter = |
| profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn); |
| |
| // Stop counter updates from interfering with the rest of the test. |
| StopAndClearCounter(profile, counter); |
| |
| // Reset value after previous values have been retrieved. |
| value = 0; |
| for (int i = 0; i < param.num_samples; ++i) counter->AddSample(test_period); |
| |
| int num_samples = 0; |
| int result_period = 0; |
| // Retrieve and validate samples. |
| const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period); |
| ASSERT_EQ(num_samples, param.num_samples); |
| // No resampling happens with ChunkedTimeSeriesCounter. |
| ASSERT_EQ(result_period, test_period); |
| |
| for (int i = 0; i < param.num_samples; ++i) ASSERT_EQ(samples[i], i); |
| |
| stringstream pretty; |
| profile->PrettyPrint(&pretty); |
| const string pretty_str = pretty.str(); |
| |
| for (const char* e : param.expected) EXPECT_STR_CONTAINS(pretty_str, e); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(VariousNumbers, TimeSeriesCounterResampleTest, |
| ::testing::Values( |
| TimeSeriesTestParam(64, {"TestCounter (500.000ms): 0, 1, 2, 3", "61, 62, 63"}), |
| |
| TimeSeriesTestParam(65, {"TestCounter (1s000ms): 0, 2, 4, 6,", |
| "60, 62, 64 (Showing 33 of 65 values from Thrift Profile)"}), |
| |
| TimeSeriesTestParam(80, {"TestCounter (1s000ms): 0, 2, 4, 6,", |
| "74, 76, 78 (Showing 40 of 80 values from Thrift Profile)"}), |
| |
| TimeSeriesTestParam(127, {"TestCounter (1s000ms): 0, 2, 4, 6,", |
| "122, 124, 126 (Showing 64 of 127 values from Thrift Profile)"}), |
| |
| TimeSeriesTestParam(128, {"TestCounter (1s000ms): 0, 2, 4, 6,", |
| "122, 124, 126 (Showing 64 of 128 values from Thrift Profile)"}), |
| |
| TimeSeriesTestParam(129, {"TestCounter (1s500ms): 0, 3, 6, 9,", |
| "120, 123, 126 (Showing 43 of 129 values from Thrift Profile)"}) |
| )); |
| |
| // Tests that the __isset field for TRuntimeProfileNode.node_metadata is set correctly |
| // (IMPALA-8252). |
| TEST(ToThrift, NodeMetadataIsSetCorrectly) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| TRuntimeProfileTree thrift_profile; |
| profile->ToThrift(&thrift_profile); |
| // Profile is empty, expect 0 nodes |
| EXPECT_EQ(thrift_profile.nodes.size(), 1); |
| EXPECT_FALSE(thrift_profile.nodes[0].__isset.node_metadata); |
| |
| // Set the plan node ID and make sure that the field is marked correctly |
| profile->SetPlanNodeId(1); |
| profile->ToThrift(&thrift_profile); |
| EXPECT_TRUE(thrift_profile.nodes[0].__isset.node_metadata); |
| } |
| |
| TEST(ToJson, RuntimeProfileToJsonTest) { |
| ObjectPool pool; |
| RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA"); |
| RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1"); |
| RuntimeProfile* profile_ab = RuntimeProfile::Create(&pool, "ProfileAb"); |
| RuntimeProfile::Counter* counter_a; |
| |
| // Initialize for further validation |
| profile_a->AddChild(profile_a1); |
| profile_a->AddChild(profile_ab); |
| profile_a->AddInfoString("Key", "Value"); |
| |
| counter_a = profile_a->AddCounter("A", TUnit::UNIT); |
| counter_a->Set(1); |
| RuntimeProfile::HighWaterMarkCounter* high_water_counter = |
| profile_a->AddHighWaterMarkCounter("high_water_counter", TUnit::BYTES); |
| high_water_counter->Set(10); |
| high_water_counter->Add(10); |
| high_water_counter->Set(10); |
| |
| RuntimeProfile::SummaryStatsCounter* summary_stats_counter = |
| profile_a->AddSummaryStatsCounter("summary_stats_counter", TUnit::TIME_NS); |
| summary_stats_counter->UpdateCounter(10); |
| summary_stats_counter->UpdateCounter(20); |
| |
| // Serialize to json |
| rapidjson::Document doc(rapidjson::kObjectType); |
| profile_a->ToJson(&doc); |
| rapidjson::Value& content = doc["contents"]; |
| |
| // Check profile correct |
| EXPECT_EQ("ProfileA", content["profile_name"]); |
| EXPECT_EQ("ProfileA1", content["child_profiles"][0]["profile_name"]); |
| EXPECT_EQ("ProfileAb", content["child_profiles"][1]["profile_name"]); |
| |
| // Check Info String correct |
| EXPECT_EQ(1, content["info_strings"].Size()); |
| EXPECT_EQ("Key", content["info_strings"][0]["key"]); |
| EXPECT_EQ("Value", content["info_strings"][0]["value"]); |
| |
| // Check counter value matches |
| EXPECT_EQ(4, content["counters"].Size()); |
| for (auto& itr : content["counters"].GetArray()) { |
| // check normal Counter |
| if (itr["counter_name"] == "A") { |
| EXPECT_EQ(1, itr["value"].GetInt()); |
| EXPECT_EQ("UNIT", itr["unit"]); |
| }// check HighWaterMarkCounter |
| else if (itr["counter_name"] == "high_water_counter") { |
| EXPECT_EQ(20, itr["value"].GetInt()); |
| EXPECT_EQ("BYTES", itr["unit"]); |
| } else { |
| EXPECT_TRUE(IsDefaultCounter(itr["counter_name"].GetString())) |
| << itr["counter_name"].GetString(); |
| } |
| } |
| |
| // Check SummaryStatsCounter |
| EXPECT_EQ(1, content["summary_stats_counters"].Size()); |
| for (auto& itr : content["summary_stats_counters"].GetArray()) { |
| if (itr["counter_name"] == "summary_stats_counter") { |
| EXPECT_EQ(10, itr["min"].GetInt()); |
| EXPECT_EQ(20, itr["max"].GetInt()); |
| EXPECT_EQ(15, itr["avg"].GetInt()); |
| EXPECT_EQ(2, itr["num_of_samples"].GetInt()); |
| EXPECT_EQ("TIME_NS", itr["unit"]); |
| } |
| } |
| } |
| |
| // Test when some fields are not set. ToJson will not add them as a member |
| TEST(ToJson, EmptyTest) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| // Serialize to json |
| rapidjson::Document doc(rapidjson::kObjectType); |
| profile->ToJson(&doc); |
| rapidjson::Value& content = doc["contents"]; |
| |
| EXPECT_EQ("Profile", content["profile_name"]); |
| EXPECT_TRUE(content.HasMember("num_children")); |
| |
| // Empty profile should not have following members |
| EXPECT_TRUE(!content.HasMember("info_strings")); |
| EXPECT_TRUE(!content.HasMember("event_sequences")); |
| EXPECT_TRUE(!content.HasMember("summary_stats_counters")); |
| EXPECT_TRUE(!content.HasMember("time_series_counters")); |
| EXPECT_TRUE(!content.HasMember("child_profiles")); |
| |
| // Only default counters should be present. |
| EXPECT_EQ(2, content["counters"].Size()); |
| for (auto& itr : content["counters"].GetArray()) { |
| EXPECT_TRUE(IsDefaultCounter(itr["counter_name"].GetString())) |
| << itr["counter_name"].GetString(); |
| } |
| } |
| |
| TEST(ToJson, EventSequenceToJsonTest) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence"); |
| seq->MarkEvent("aaaa"); |
| seq->MarkEvent("bbbb"); |
| seq->MarkEvent("cccc"); |
| |
| // Serialize to json |
| rapidjson::Document doc(rapidjson::kObjectType); |
| rapidjson::Value event_sequence_json(rapidjson::kObjectType); |
| seq->ToJson(RuntimeProfile::Verbosity::DEFAULT, doc, &event_sequence_json); |
| |
| EXPECT_EQ(0, event_sequence_json["offset"].GetInt()); |
| |
| uint64_t last_timestamp = 0; |
| string last_string = ""; |
| for (auto& itr : event_sequence_json["events"].GetArray()) { |
| EXPECT_TRUE(itr["timestamp"].GetInt() >= last_timestamp); |
| last_timestamp = itr["timestamp"].GetInt(); |
| string label = string(itr["label"].GetString()); |
| EXPECT_TRUE(label > last_string); |
| last_string = label; |
| } |
| } |
| |
| TEST(ToJson, TimeSeriesCounterToJsonTest) { |
| ObjectPool pool; |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); |
| |
| // 1. TimeSeriesCounter should be empty |
| rapidjson::Document doc(rapidjson::kObjectType); |
| profile->ToJson(&doc); |
| EXPECT_TRUE(!doc["contents"].HasMember("time_series_counters")); |
| |
| // 2. Check Serialize to json |
| const int test_period = FLAGS_periodic_counter_update_period_ms; |
| |
| // Add a counter with a sample function that counts up, starting from 0. |
| int value = 0; |
| auto sample_fn = [&value]() { return value++; }; |
| |
| // We increase the value of this flag to allow the counter to store enough samples. |
| FLAGS_status_report_interval_ms = 50000; |
| RuntimeProfile::TimeSeriesCounter* counter = |
| profile->AddChunkedTimeSeriesCounter("TimeSeriesCounter", TUnit::UNIT, sample_fn); |
| auto counter2 = static_cast<RuntimeProfile::SamplingTimeSeriesCounter*>( |
| profile->AddSamplingTimeSeriesCounter("SamplingCounter", TUnit::UNIT, sample_fn)); |
| |
| // Stop counter updates from interfering with the rest of the test. |
| StopAndClearCounter(profile, counter); |
| // ChunkedTimeSeriesCounters are stopped and cleared above. |
| // But SamplingTimeSeriesCounter needs explicitly Reset() to back to the initial state. |
| counter2->Reset(); |
| |
| // Reset value after previous values have been retrieved. |
| value = 0; |
| for (int i = 0; i < 64; ++i) counter->AddSample(test_period); |
| |
| value = 0; |
| for (int i = 0; i < 80; ++i) counter2->AddSample(test_period); |
| |
| profile->ToJson(&doc); |
| EXPECT_EQ(doc["contents"]["time_series_counters"][1]["counter_name"], |
| "TimeSeriesCounter"); |
| EXPECT_STR_CONTAINS( |
| doc["contents"]["time_series_counters"][1]["data"].GetString(), "0,1,2,3,4"); |
| EXPECT_STR_CONTAINS( |
| doc["contents"]["time_series_counters"][1]["data"].GetString(), "60,61,62,63"); |
| |
| EXPECT_EQ(doc["contents"]["time_series_counters"][0]["counter_name"], |
| "SamplingCounter"); |
| EXPECT_STR_CONTAINS( |
| doc["contents"]["time_series_counters"][0]["data"].GetString(), "0,2,4,6"); |
| EXPECT_STR_CONTAINS( |
| doc["contents"]["time_series_counters"][0]["data"].GetString(), "72,74,76,78"); |
| } |
| |
| // Test handling of info strings in aggregated JSON profile |
| TEST(ToJson, AggregatedInfoStringsToJsonTest) { |
| auto s1 = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| const size_t NUM_PROFILES = 3; |
| |
| ObjectPool pool; |
| |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, strings::Substitute("Instance $0", |
| i + 1)); |
| } |
| |
| // Add dummy info strings to the profile |
| profiles[0]->AddInfoString("Table Name", "A_TABLE"); |
| profiles[1]->AddInfoString("Table Name", "B_TABLE"); |
| profiles[2]->AddInfoString("Table Name", "C_TABLE"); |
| |
| AggregatedRuntimeProfile* aggregated_profile = AggregatedRuntimeProfile::Create( |
| &pool, "PlanNode", NUM_PROFILES, true, false); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| aggregated_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| |
| rapidjson::Document doc(rapidjson::kObjectType); |
| rapidjson::Value plan_node_profile(rapidjson::kObjectType); |
| |
| aggregated_profile->ToJsonSubclass(RuntimeProfile::Verbosity::DEFAULT, |
| &plan_node_profile, &doc); |
| |
| // Verify the structure of aggregated info strings, |
| // {... |
| // "info_strings" : |
| // [{ |
| // "key": "<info string's key>", |
| // "values": [<distinct info string values>] |
| // }] |
| // } |
| |
| EXPECT_TRUE(plan_node_profile.HasMember("info_strings")); |
| |
| EXPECT_EQ(plan_node_profile["info_strings"].Size(), 1); |
| |
| EXPECT_TRUE(plan_node_profile["info_strings"][0].HasMember("key")); |
| EXPECT_STREQ(plan_node_profile["info_strings"][0]["key"].GetString(), "Table Name"); |
| |
| EXPECT_TRUE(plan_node_profile["info_strings"][0].HasMember("values")); |
| EXPECT_EQ(plan_node_profile["info_strings"][0]["values"].Size(), 3); |
| EXPECT_STREQ(plan_node_profile["info_strings"][0]["values"][0].GetString(), "A_TABLE"); |
| EXPECT_STREQ(plan_node_profile["info_strings"][0]["values"][1].GetString(), "B_TABLE"); |
| EXPECT_STREQ(plan_node_profile["info_strings"][0]["values"][2].GetString(), "C_TABLE"); |
| } |
| |
| class AggregatedEventSequenceToJsonTest : public ::testing::Test { |
| protected: |
| // Common event timeline names and event labels |
| const string NODE_LIFECYCLE_EVENT_TIMELINE; |
| const string NODE_LIFECYCLE_EVENT_LABELS[6]; |
| const size_t MAX_SETUP_RETRIES; |
| // Maintain rapidjson::Document across methods, with support for tearing down |
| rapidjson::Document doc; |
| rapidjson::Value plan_node_profile; |
| // Randomly generated sizes, with constraints based on setup parameters |
| size_t NUM_PROFILES; |
| size_t BUCKET_SIZE; |
| bool EVENT_COMPLETENESS; |
| // Record instances with missing instances to validate |
| std::unordered_set<size_t> missing_event_instances_expected; |
| // Keep track of maximum retries for missing events generation |
| size_t setup_retries; |
| |
| AggregatedEventSequenceToJsonTest() : |
| NODE_LIFECYCLE_EVENT_TIMELINE{"Node Lifecycle Event Timeline"}, |
| NODE_LIFECYCLE_EVENT_LABELS{"Open Started", "Open Finished", "First Batch Requested" |
| , "First Batch Returned", "Last Batch Returned", "Closed"}, |
| MAX_SETUP_RETRIES{3}, |
| doc{rapidjson::Document(rapidjson::kObjectType)}, |
| plan_node_profile(rapidjson::Value(rapidjson::kObjectType)), |
| setup_retries{0} { |
| } |
| |
| void SetUp() override {} |
| |
| void SetUp(float events_completeness, bool timestamp_aggregation, |
| bool is_zero_bucket_size = false) { |
| std::random_device rd; |
| std::mt19937 gen(rd()); |
| |
| std::uniform_int_distribution<> uni_dis_instances(1, 30); |
| |
| NUM_PROFILES = uni_dis_instances(gen); |
| if (timestamp_aggregation) { |
| // Generate a bucket size smaller than number of instances |
| if (NUM_PROFILES <= 1) ++NUM_PROFILES; |
| std::uniform_int_distribution<> uni_dis_granularity(1, NUM_PROFILES - 1); |
| BUCKET_SIZE = uni_dis_granularity(gen); |
| } else if (is_zero_bucket_size) { |
| // Set bucket size equal to the 0 |
| BUCKET_SIZE = 0; |
| } else { |
| // Generate a bucket size greater than or equal to number of instances |
| std::uniform_int_distribution<> uni_dis_granularity(NUM_PROFILES, |
| NUM_PROFILES * 2); |
| BUCKET_SIZE = uni_dis_granularity(gen); |
| } |
| |
| VLOG(1) << "Number of profiles :" << NUM_PROFILES << endl; |
| VLOG(1) << "Bucket size :" << BUCKET_SIZE << endl; |
| |
| auto s1 = ScopedFlagSetter<bool>::Make(&FLAGS_gen_experimental_profile, true); |
| auto s2 = ScopedFlagSetter<uint64>::Make( |
| &FLAGS_json_profile_event_timestamp_limit, BUCKET_SIZE); |
| |
| ObjectPool pool; |
| // Profiles containing a complete event sequence |
| RuntimeProfile* profiles[NUM_PROFILES]; |
| // Create EventSequences |
| RuntimeProfile::EventSequence* seqs[NUM_PROFILES]; |
| |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| profiles[i] = RuntimeProfile::Create(&pool, strings::Substitute("Instance $0", |
| i + 1)); |
| seqs[i] = profiles[i]->AddEventSequence(NODE_LIFECYCLE_EVENT_TIMELINE); |
| // Begin timer for the event sequence |
| seqs[i]->Start(); |
| } |
| |
| int64_t dummy_event_duration = 50; |
| |
| // Simulate parallel execution of events across instances |
| if (events_completeness >= 1.0f) { |
| EVENT_COMPLETENESS = true; |
| // Add complete set of events to the event sequence |
| for (const string& event : NODE_LIFECYCLE_EVENT_LABELS) { |
| SleepForMs(dummy_event_duration); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| seqs[i]->MarkEvent(event); |
| } |
| } |
| } else { |
| EVENT_COMPLETENESS = false; |
| // Add partial set of events to the event sequence |
| std::bernoulli_distribution ber_dis_event_probability(events_completeness); |
| for (const string& event : NODE_LIFECYCLE_EVENT_LABELS) { |
| SleepForMs(dummy_event_duration); |
| for (int i = 0; i < NUM_PROFILES - 1; ++i) { |
| if (ber_dis_event_probability(gen)){ |
| seqs[i]->MarkEvent(event); |
| } else { |
| missing_event_instances_expected.insert(i); |
| } |
| } |
| seqs[NUM_PROFILES - 1]->MarkEvent(event); |
| } |
| } |
| |
| // Retry until maximum setup retries |
| if (!EVENT_COMPLETENESS && missing_event_instances_expected.size() <= 0 |
| && setup_retries < MAX_SETUP_RETRIES) { |
| ++setup_retries; |
| SetUp(events_completeness, timestamp_aggregation, is_zero_bucket_size); |
| return; |
| } else if (setup_retries >= MAX_SETUP_RETRIES) { |
| ASSERT_TRUE(false) << "SetUp failed to create desired test case after " |
| << MAX_SETUP_RETRIES << "retries."; |
| } |
| |
| AggregatedRuntimeProfile* aggregated_profile = AggregatedRuntimeProfile::Create( |
| &pool, "PlanNode", NUM_PROFILES, true, false); |
| for (int i = 0; i < NUM_PROFILES; ++i) { |
| aggregated_profile->UpdateAggregatedFromInstance(profiles[i], i); |
| } |
| |
| // This test fixture class has been added as a friend within RuntimeProfileBase |
| // AggEventSequence::ToJson() method is invoked and tested through the ToJsonHelper() |
| aggregated_profile->ToJsonHelper(RuntimeProfile::Verbosity::DEFAULT, |
| &plan_node_profile, &doc); |
| |
| // Validate the basic structure of event sequences |
| ASSERT_TRUE(plan_node_profile.HasMember("event_sequences")); |
| ASSERT_EQ(plan_node_profile["event_sequences"].Size(), 1); |
| ASSERT_TRUE(plan_node_profile["event_sequences"][0].HasMember("events")); |
| |
| RecordProperty("NumberOfProfiles", NUM_PROFILES); |
| RecordProperty("BucketSize", BUCKET_SIZE); |
| RecordProperty("EventCompletenessFactor", events_completeness); |
| } |
| |
| void TearDown() override { |
| // Reset the rapidjson document |
| setup_retries = 0; |
| plan_node_profile.RemoveAllMembers(); |
| doc.RemoveAllMembers(); |
| |
| // Clear missing_event_instances |
| missing_event_instances_expected.clear(); |
| } |
| |
| // Structure of event sequence JSON |
| // { |
| // profile_name : <PLAN_NODE_NAME>, |
| // num_children : <NUM_CHILDREN> |
| // node_metadata : <NODE_METADATA_OBJECT> |
| // event_sequences : |
| // [{ |
| // events : // An example event |
| // [{ |
| // label : "Open Started"" |
| // ts_list : [ 2257887941, <other instances' timestamps> ] |
| // // OR |
| // ts_stat : |
| // { |
| // min : [ 2257887941, ...<other divisions' minimum timestamps> ], |
| // max : [ 3257887941, ...<other divisions' maximum timestamps> ], |
| // avg : [ 2757887941, ...<other divisions' average timestamps> ] |
| // count : [ 2, ... <other counts of divisions' no. of instances> ] |
| // } |
| // }, <...other plan node's events> |
| // ], |
| // // This field is only included, if there are missing / unreported events |
| // unreported_event_instance_idxs : [ 3, 5, 0 ] |
| // }], |
| // counters : <COUNTERS_OBJECT_ARRAY>, |
| // child_profiles : <CHILD_PROFILES> |
| // } |
| |
| void Validate() { |
| // When at least one set of complete events are present from one of the instances, |
| // the labels and timestamps of the events are rebuilt with proper ordering and |
| // alignment. |
| |
| // In some rare cases, when all instances contain missing events, it becomes |
| // functionally impossible to identify the order of events across instances. |
| |
| // Please check IMPALA-13555 for further details. |
| |
| rapidjson::Value& events_json = plan_node_profile["event_sequences"][0]["events"]; |
| // Validate the number of recorded events |
| EXPECT_EQ(events_json.Size(), sizeof(NODE_LIFECYCLE_EVENT_LABELS) / |
| sizeof(NODE_LIFECYCLE_EVENT_LABELS[0])); |
| |
| // Validate the order of event labels in the event sequence |
| for (size_t i = 0; i < events_json.Size(); ++i) { |
| EXPECT_STREQ(events_json[i]["label"].GetString(), |
| NODE_LIFECYCLE_EVENT_LABELS[i].c_str()); |
| } |
| |
| // Parse and verify reported instances with missing events |
| std::unordered_set<size_t> missing_event_instances; |
| if (plan_node_profile["event_sequences"][0].HasMember( |
| "unreported_event_instance_idxs")) { |
| rapidjson::Value& missing_events_json = plan_node_profile["event_sequences"][0] |
| ["unreported_event_instance_idxs"]; |
| for (size_t i = 0; i < missing_events_json.Size(); ++i) { |
| missing_event_instances.insert(missing_events_json[i].GetUint64()); |
| EXPECT_EQ(missing_event_instances_expected.count( |
| missing_events_json[i].GetUint64()), 1); |
| } |
| } |
| EXPECT_TRUE(EVENT_COMPLETENESS == missing_event_instances_expected.empty()); |
| EXPECT_EQ(missing_event_instances_expected.size(), missing_event_instances.size()); |
| |
| if (NUM_PROFILES <= BUCKET_SIZE || BUCKET_SIZE == 0) { |
| // Verify the structure of timestamps |
| EXPECT_TRUE(events_json[0].HasMember("ts_list")); |
| rapidjson::Value& ts_list_prev_json = events_json[0]["ts_list"]; |
| for (size_t j = 0; j < ts_list_prev_json.Size(); ++j) { |
| if (ts_list_prev_json[j].GetInt64() == 0) { |
| EXPECT_EQ(missing_event_instances.count(j), 1); |
| } |
| } |
| int64_t ts_cur; |
| for (size_t i = 1; i < events_json.Size(); ++i) { |
| EXPECT_TRUE(events_json[i].HasMember("ts_list")); |
| rapidjson::Value& ts_list_cur_json = events_json[i]["ts_list"]; |
| // Number of instance timestamps across events should be same |
| EXPECT_EQ(ts_list_cur_json.Size(), ts_list_prev_json.Size()); |
| // Each instance must have event timestamps in increasing order |
| for (size_t j = 0; j < ts_list_cur_json.Size(); ++j) { |
| ts_cur = ts_list_cur_json[j].GetInt64(); |
| // Skip -1 s inserted to handle missing event timestamps |
| if (ts_cur == -1) { |
| EXPECT_EQ(missing_event_instances.count(j), 1); |
| } else { |
| EXPECT_GT(ts_cur, ts_list_prev_json[j].GetInt64()); |
| } |
| } |
| ts_list_prev_json = ts_list_cur_json; |
| } |
| } else { |
| // Verify the structure of timestamps |
| for (size_t i = 0; i < events_json.Size(); ++i) { |
| EXPECT_TRUE(events_json[i].HasMember("ts_stat")); |
| VerifyAggregatedTimestamps(events_json[i]["ts_stat"], NUM_PROFILES, |
| BUCKET_SIZE, missing_event_instances.size()); |
| } |
| |
| if (missing_event_instances.size() == 0) { |
| rapidjson::Value& ts_stat_prev_json = events_json[0]["ts_stat"]; |
| for (size_t i = 1; i < events_json.Size(); ++i) { |
| // Each event's timestamp aggregate should be in increasing order |
| rapidjson::Value& ts_stat_cur_json = events_json[i]["ts_stat"]; |
| EXPECT_GT(GetFirstNonZeroValueUint64(ts_stat_cur_json["min"]), |
| GetFirstNonZeroValueUint64(ts_stat_prev_json["min"])); |
| EXPECT_GT(GetFirstNonZeroValueDouble(ts_stat_cur_json["avg"]), |
| GetFirstNonZeroValueDouble(ts_stat_prev_json["avg"])); |
| EXPECT_GT(GetFirstNonZeroValueUint64(ts_stat_cur_json["max"]), |
| GetFirstNonZeroValueUint64(ts_stat_prev_json["max"])); |
| ts_stat_prev_json = ts_stat_cur_json; |
| } |
| } |
| } |
| } |
| |
| static inline void VerifyAggregatedTimestamps(const rapidjson::Value& ts_stat_json, |
| size_t inst_count, size_t bucket_size, size_t missing_events_instance_count = 0) { |
| // Verify the structure of aggregated values |
| EXPECT_TRUE(ts_stat_json.HasMember("min")); |
| EXPECT_TRUE(ts_stat_json.HasMember("max")); |
| EXPECT_TRUE(ts_stat_json.HasMember("avg")); |
| EXPECT_TRUE(ts_stat_json.HasMember("count")); |
| EXPECT_EQ(bucket_size, ts_stat_json["min"].Size()); |
| EXPECT_EQ(bucket_size, ts_stat_json["max"].Size()); |
| EXPECT_EQ(bucket_size, ts_stat_json["avg"].Size()); |
| EXPECT_EQ(bucket_size, ts_stat_json["count"].Size()); |
| uint64_t inst_count_cur = 0; |
| // Validate quantitative relationships between aggregated values |
| for (size_t i = 0; i < bucket_size; ++i) { |
| EXPECT_GE(ts_stat_json["max"][i].GetUint64(), ts_stat_json["avg"][i].GetDouble()); |
| EXPECT_GE(ts_stat_json["avg"][i].GetDouble(), ts_stat_json["min"][i].GetUint64()); |
| inst_count_cur += ts_stat_json["count"][i].GetUint64(); |
| } |
| EXPECT_GE(inst_count_cur, inst_count - missing_events_instance_count); |
| } |
| |
| static inline int64_t GetFirstNonZeroValueUint64(const rapidjson::Value& json_array) { |
| auto it = std::find_if(json_array.GetArray().Begin(), json_array.GetArray().End(), |
| [](const rapidjson::Value& cur_val) { return cur_val.GetInt64() != 0;}); |
| if (it != json_array.GetArray().End()) { |
| return it->GetInt64(); |
| } else { |
| return 0; |
| } |
| } |
| |
| static inline double GetFirstNonZeroValueDouble(const rapidjson::Value& json_array) { |
| auto it = std::find_if(json_array.GetArray().Begin(), json_array.GetArray().End(), |
| [](const rapidjson::Value& cur_val) { return cur_val.GetDouble() != 0;}); |
| if (it != json_array.GetArray().End()) { |
| return it->GetDouble(); |
| } else { |
| return 0; |
| } |
| } |
| }; |
| |
| // Test well-formed event sequences, during generation of JSON profiles |
| TEST_F(AggregatedEventSequenceToJsonTest, CompleteEvents_GroupedAndAggregatedCase) { |
| // To simulate an aggregated case with complete set of events |
| // bucket size < number of instances |
| |
| // Run 2 deterministic tests with random size of events and instances |
| SetUp(1.0f, true); |
| Validate(); |
| TearDown(); |
| |
| SetUp(1.0f, true); |
| Validate(); |
| // TearDown is implicitly invoked |
| } |
| |
| TEST_F(AggregatedEventSequenceToJsonTest, CompleteEvents_GroupedAndUnggregatedCase) { |
| // To simulate a grouped case with complete set of events |
| // number of instances <= bucket size OR bucket size = 0 |
| |
| // Run 3 deterministic tests with random size of events and instances |
| SetUp(1.0f, false); |
| Validate(); |
| TearDown(); |
| |
| SetUp(1.0f, false); |
| Validate(); |
| TearDown(); |
| |
| SetUp(1.0f, false, true); |
| Validate(); |
| // TearDown is implicitly invoked |
| } |
| |
| |
| TEST_F(AggregatedEventSequenceToJsonTest, MissingEvents_GroupedAndAggregatedCase) { |
| // To simulate an aggregated case with possible missing events |
| // bucket size < number of instances |
| |
| // Run 2 deterministic tests with random size of events and instances |
| SetUp(0.5f, true); |
| Validate(); |
| TearDown(); |
| |
| SetUp(0.8f, true); |
| Validate(); |
| // TearDown is implicitly invoked |
| } |
| |
| TEST_F(AggregatedEventSequenceToJsonTest, MissingEvents_GroupedAndUnaggregatedCase) { |
| // To simulate a grouped case with possible missing events |
| // number of instances <= bucket size OR bucket size = 0 |
| |
| // Run 3 deterministic tests with random size of events and instances |
| SetUp(0.5f, false); |
| Validate(); |
| TearDown(); |
| |
| SetUp(0.8f, false); |
| Validate(); |
| TearDown(); |
| |
| SetUp(0.5f, false, true); |
| Validate(); |
| // TearDown is implicitly invoked |
| } |
| |
| } // namespace impala |
| |