| /*------------------------------------------------------------------------- |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| * ProtoUtils.cpp |
| * |
| * IDENTIFICATION |
| * gpcontrib/gp_stats_collector/src/ProtoUtils.cpp |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "ProtoUtils.h" |
| #include "PgUtils.h" |
| #include "ProcStats.h" |
| #include "Config.h" |
| #include "memory/gpdbwrappers.h" |
| |
| #define typeid __typeid |
| #define operator __operator |
| extern "C" { |
| #include "postgres.h" |
| #include "access/hash.h" |
| #include "access/xact.h" |
| #include "cdb/cdbinterconnect.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/ml_ipc.h" |
| #ifdef IC_TEARDOWN_HOOK |
| #include "cdb/ic_udpifc.h" |
| #endif |
| #include "utils/workfile_mgr.h" |
| } |
| #undef typeid |
| #undef operator |
| |
| #include <ctime> |
| #include <string> |
| |
| extern void gp_gettmid(int32 *); |
| |
| namespace { |
| constexpr uint8_t UTF8_CONTINUATION_BYTE_MASK = (1 << 7) | (1 << 6); |
| constexpr uint8_t UTF8_CONTINUATION_BYTE = (1 << 7); |
| constexpr uint8_t UTF8_MAX_SYMBOL_BYTES = 4; |
| |
| // Returns true if byte is the starting byte of utf8 |
| // character, false if byte is the continuation (10xxxxxx). |
| inline bool utf8_start_byte(uint8_t byte) { |
| return (byte & UTF8_CONTINUATION_BYTE_MASK) != UTF8_CONTINUATION_BYTE; |
| } |
| } // namespace |
| |
| google::protobuf::Timestamp current_ts() { |
| google::protobuf::Timestamp current_ts; |
| struct timeval tv; |
| gettimeofday(&tv, nullptr); |
| current_ts.set_seconds(tv.tv_sec); |
| current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000)); |
| return current_ts; |
| } |
| |
| void set_query_key(gpsc::QueryKey *key) { |
| key->set_ccnt(gp_command_count); |
| key->set_ssid(gp_session_id); |
| int32 tmid = 0; |
| gp_gettmid(&tmid); |
| key->set_tmid(tmid); |
| } |
| |
| void set_segment_key(gpsc::SegmentKey *key) { |
| key->set_dbid(GpIdentity.dbid); |
| key->set_segindex(GpIdentity.segindex); |
| } |
| |
| std::string trim_str_shrink_utf8(const char *str, size_t len, size_t lim) { |
| if (unlikely(str == nullptr)) { |
| return std::string(); |
| } |
| if (likely(len <= lim || GetDatabaseEncoding() != PG_UTF8)) { |
| return std::string(str, std::min(len, lim)); |
| } |
| |
| // Handle trimming of utf8 correctly, do not cut multi-byte characters. |
| size_t cut_pos = lim; |
| size_t visited_bytes = 1; |
| while (visited_bytes < UTF8_MAX_SYMBOL_BYTES && cut_pos > 0) { |
| if (utf8_start_byte(static_cast<uint8_t>(str[cut_pos]))) { |
| break; |
| } |
| ++visited_bytes; |
| --cut_pos; |
| } |
| |
| return std::string(str, cut_pos); |
| } |
| |
| void set_query_plan(gpsc::SetQueryReq *req, QueryDesc *query_desc, |
| const Config &config) { |
| if (Gp_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) { |
| auto qi = req->mutable_query_info(); |
| qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER |
| ? gpsc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER |
| : gpsc::PlanGenerator::PLAN_GENERATOR_PLANNER); |
| MemoryContext oldcxt = |
| gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt); |
| ExplainState es = gpdb::get_explain_state(query_desc, true); |
| if (es.str) { |
| *qi->mutable_plan_text() = trim_str_shrink_utf8(es.str->data, es.str->len, |
| config.max_plan_size()); |
| StringInfo norm_plan = gpdb::gen_normplan(es.str->data); |
| if (norm_plan) { |
| *qi->mutable_template_plan_text() = trim_str_shrink_utf8( |
| norm_plan->data, norm_plan->len, config.max_plan_size()); |
| qi->set_plan_id( |
| hash_any((unsigned char *)norm_plan->data, norm_plan->len)); |
| gpdb::pfree(norm_plan->data); |
| } |
| qi->set_query_id(query_desc->plannedstmt->queryId); |
| gpdb::pfree(es.str->data); |
| } |
| gpdb::mem_ctx_switch_to(oldcxt); |
| } |
| } |
| |
| void set_query_text(gpsc::SetQueryReq *req, QueryDesc *query_desc, |
| const Config &config) { |
| if (Gp_role == GP_ROLE_DISPATCH && query_desc->sourceText) { |
| auto qi = req->mutable_query_info(); |
| *qi->mutable_query_text() = trim_str_shrink_utf8( |
| query_desc->sourceText, strlen(query_desc->sourceText), |
| config.max_text_size()); |
| char *norm_query = gpdb::gen_normquery(query_desc->sourceText); |
| if (norm_query) { |
| *qi->mutable_template_query_text() = trim_str_shrink_utf8( |
| norm_query, strlen(norm_query), config.max_text_size()); |
| gpdb::pfree(norm_query); |
| } |
| } |
| } |
| |
| void clear_big_fields(gpsc::SetQueryReq *req) { |
| if (Gp_role == GP_ROLE_DISPATCH) { |
| auto qi = req->mutable_query_info(); |
| qi->clear_plan_text(); |
| qi->clear_template_plan_text(); |
| qi->clear_query_text(); |
| qi->clear_template_query_text(); |
| qi->clear_analyze_text(); |
| } |
| } |
| |
| void set_query_info(gpsc::SetQueryReq *req) { |
| if (Gp_role == GP_ROLE_DISPATCH) { |
| auto qi = req->mutable_query_info(); |
| qi->set_username(get_user_name()); |
| if (IsTransactionState()) |
| qi->set_databasename(get_db_name()); |
| qi->set_rsgname(get_rg_name()); |
| } |
| } |
| |
| void set_qi_nesting_level(gpsc::SetQueryReq *req, int nesting_level) { |
| auto aqi = req->mutable_add_info(); |
| aqi->set_nested_level(nesting_level); |
| } |
| |
| void set_qi_slice_id(gpsc::SetQueryReq *req) { |
| auto aqi = req->mutable_add_info(); |
| aqi->set_slice_id(currentSliceId); |
| } |
| |
| void set_qi_error_message(gpsc::SetQueryReq *req, const char *err_msg, |
| const Config &config) { |
| auto aqi = req->mutable_add_info(); |
| *aqi->mutable_error_message() = |
| trim_str_shrink_utf8(err_msg, strlen(err_msg), config.max_text_size()); |
| } |
| |
| void set_metric_instrumentation(gpsc::MetricInstrumentation *metrics, |
| QueryDesc *query_desc, int nested_calls, |
| double nested_time) { |
| auto instrument = query_desc->planstate->instrument; |
| if (instrument) { |
| metrics->set_ntuples(instrument->ntuples); |
| metrics->set_nloops(instrument->nloops); |
| metrics->set_tuplecount(instrument->tuplecount); |
| metrics->set_firsttuple(instrument->firsttuple); |
| metrics->set_startup(instrument->startup); |
| metrics->set_total(instrument->total); |
| auto &buffusage = instrument->bufusage; |
| metrics->set_shared_blks_hit(buffusage.shared_blks_hit); |
| metrics->set_shared_blks_read(buffusage.shared_blks_read); |
| metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied); |
| metrics->set_shared_blks_written(buffusage.shared_blks_written); |
| metrics->set_local_blks_hit(buffusage.local_blks_hit); |
| metrics->set_local_blks_read(buffusage.local_blks_read); |
| metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied); |
| metrics->set_local_blks_written(buffusage.local_blks_written); |
| metrics->set_temp_blks_read(buffusage.temp_blks_read); |
| metrics->set_temp_blks_written(buffusage.temp_blks_written); |
| metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time)); |
| metrics->set_blk_write_time( |
| INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time)); |
| } |
| if (query_desc->estate && query_desc->estate->motionlayer_context) { |
| MotionLayerState *mlstate = |
| (MotionLayerState *)query_desc->estate->motionlayer_context; |
| metrics->mutable_sent()->set_total_bytes(mlstate->stat_total_bytes_sent); |
| metrics->mutable_sent()->set_tuple_bytes(mlstate->stat_tuple_bytes_sent); |
| metrics->mutable_sent()->set_chunks(mlstate->stat_total_chunks_sent); |
| metrics->mutable_received()->set_total_bytes( |
| mlstate->stat_total_bytes_recvd); |
| metrics->mutable_received()->set_tuple_bytes( |
| mlstate->stat_tuple_bytes_recvd); |
| metrics->mutable_received()->set_chunks(mlstate->stat_total_chunks_recvd); |
| } |
| metrics->set_inherited_calls(nested_calls); |
| metrics->set_inherited_time(nested_time); |
| } |
| |
| void set_gp_metrics(gpsc::GPMetrics *metrics, QueryDesc *query_desc, |
| int nested_calls, double nested_time) { |
| if (query_desc->planstate && query_desc->planstate->instrument) { |
| set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc, |
| nested_calls, nested_time); |
| } |
| fill_self_stats(metrics->mutable_systemstat()); |
| metrics->mutable_systemstat()->set_runningtimeseconds( |
| time(NULL) - metrics->mutable_systemstat()->runningtimeseconds()); |
| metrics->mutable_spill()->set_filecount( |
| WorkfileTotalFilesCreated() - metrics->mutable_spill()->filecount()); |
| metrics->mutable_spill()->set_totalbytes( |
| WorkfileTotalBytesWritten() - metrics->mutable_spill()->totalbytes()); |
| } |
| |
| #define UPDATE_IC_STATS(proto_name, stat_name) \ |
| metrics->mutable_interconnect()->set_##proto_name( \ |
| ic_statistics->stat_name - \ |
| metrics->mutable_interconnect()->proto_name()); \ |
| Assert(metrics->mutable_interconnect()->proto_name() >= 0 && \ |
| metrics->mutable_interconnect()->proto_name() <= \ |
| ic_statistics->stat_name) |
| |
| void set_ic_stats(gpsc::MetricInstrumentation *metrics, |
| const ICStatistics *ic_statistics) { |
| #ifdef IC_TEARDOWN_HOOK |
| UPDATE_IC_STATS(total_recv_queue_size, totalRecvQueueSize); |
| UPDATE_IC_STATS(recv_queue_size_counting_time, recvQueueSizeCountingTime); |
| UPDATE_IC_STATS(total_capacity, totalCapacity); |
| UPDATE_IC_STATS(capacity_counting_time, capacityCountingTime); |
| UPDATE_IC_STATS(total_buffers, totalBuffers); |
| UPDATE_IC_STATS(buffer_counting_time, bufferCountingTime); |
| UPDATE_IC_STATS(active_connections_num, activeConnectionsNum); |
| UPDATE_IC_STATS(retransmits, retransmits); |
| UPDATE_IC_STATS(startup_cached_pkt_num, startupCachedPktNum); |
| UPDATE_IC_STATS(mismatch_num, mismatchNum); |
| UPDATE_IC_STATS(crc_errors, crcErrors); |
| UPDATE_IC_STATS(snd_pkt_num, sndPktNum); |
| UPDATE_IC_STATS(recv_pkt_num, recvPktNum); |
| UPDATE_IC_STATS(disordered_pkt_num, disorderedPktNum); |
| UPDATE_IC_STATS(duplicated_pkt_num, duplicatedPktNum); |
| UPDATE_IC_STATS(recv_ack_num, recvAckNum); |
| UPDATE_IC_STATS(status_query_msg_num, statusQueryMsgNum); |
| #endif |
| } |
| |
| gpsc::SetQueryReq create_query_req(gpsc::QueryStatus status) { |
| gpsc::SetQueryReq req; |
| req.set_query_status(status); |
| *req.mutable_datetime() = current_ts(); |
| set_query_key(req.mutable_query_key()); |
| set_segment_key(req.mutable_segment_key()); |
| return req; |
| } |
| |
| double protots_to_double(const google::protobuf::Timestamp &ts) { |
| return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0; |
| } |
| |
| void set_analyze_plan_text(QueryDesc *query_desc, gpsc::SetQueryReq *req, |
| const Config &config) { |
| // Make sure it is a valid txn and it is not an utility |
| // statement for ExplainPrintPlan() later. |
| if (!IsTransactionState() || !query_desc->plannedstmt) { |
| return; |
| } |
| MemoryContext oldcxt = |
| gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt); |
| ExplainState es = gpdb::get_analyze_state( |
| query_desc, query_desc->instrument_options && config.enable_analyze()); |
| gpdb::mem_ctx_switch_to(oldcxt); |
| if (es.str) { |
| // Remove last line break. |
| if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') { |
| es.str->data[--es.str->len] = '\0'; |
| } |
| auto trimmed_analyze = |
| trim_str_shrink_utf8(es.str->data, es.str->len, config.max_plan_size()); |
| req->mutable_query_info()->set_analyze_text(trimmed_analyze); |
| gpdb::pfree(es.str->data); |
| } |
| } |