blob: b462241105523593007f5e7ff28a393b1149cf4a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "otel.h"
#include <chrono>
#include <functional>
#include <memory>
#include <regex>
#include <string>
#include <string_view>
#include <utility>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gutil/strings/split.h>
#include <opentelemetry/exporters/otlp/otlp_file_exporter.h>
#include <opentelemetry/exporters/otlp/otlp_file_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_file_exporter_options.h>
#include <opentelemetry/exporters/otlp/otlp_http.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_options.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_runtime_options.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <opentelemetry/sdk/common/global_log_handler.h>
#include <opentelemetry/sdk/resource/resource.h>
#include <opentelemetry/sdk/trace/batch_span_processor.h>
#include <opentelemetry/sdk/trace/batch_span_processor_factory.h>
#include <opentelemetry/sdk/trace/batch_span_processor_options.h>
#include <opentelemetry/sdk/trace/batch_span_processor_runtime_options.h>
#include <opentelemetry/sdk/trace/exporter.h>
#include <opentelemetry/sdk/trace/processor.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/sdk/trace/tracer_provider_factory.h>
#include <opentelemetry/sdk/trace/simple_processor.h>
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/tracer.h>
#include <opentelemetry/version.h>
#include "common/compiler-util.h"
#include "common/status.h"
#include "common/version.h"
#include "gen-cpp/Query_types.h"
#include "observe/otel-log-handler.h"
#include "observe/span-manager.h"
#include "service/client-request-state.h"
using namespace boost::algorithm;
using namespace opentelemetry;
using namespace opentelemetry::exporter::otlp;
using namespace opentelemetry::sdk::common::internal_log;
using namespace opentelemetry::sdk::trace;
using namespace std;
// OTel related flags
DECLARE_string(otel_trace_additional_headers);
DECLARE_int32(otel_trace_batch_queue_size);
DECLARE_int32(otel_trace_batch_max_batch_size);
DECLARE_int32(otel_trace_batch_schedule_delay_ms);
DECLARE_bool(otel_trace_beeswax);
DECLARE_string(otel_trace_ca_cert_path);
DECLARE_string(otel_trace_ca_cert_string);
DECLARE_string(otel_trace_collector_url);
DECLARE_bool(otel_trace_compression);
DECLARE_bool(otel_debug);
DECLARE_string(otel_trace_exporter);
DECLARE_string(otel_file_pattern);
DECLARE_string(otel_file_alias_pattern);
DECLARE_int32(otel_file_flush_interval_ms);
DECLARE_int32(otel_file_flush_count);
DECLARE_int32(otel_file_max_file_size);
DECLARE_int32(otel_file_max_file_count);
DECLARE_double(otel_trace_retry_policy_backoff_multiplier);
DECLARE_double(otel_trace_retry_policy_initial_backoff_s);
DECLARE_int32(otel_trace_retry_policy_max_attempts);
DECLARE_int32(otel_trace_retry_policy_max_backoff_s);
DECLARE_string(otel_trace_span_processor);
DECLARE_string(otel_trace_ssl_ciphers);
DECLARE_int32(otel_trace_timeout_s);
DECLARE_string(otel_trace_tls_cipher_suites);
DECLARE_bool(otel_trace_tls_insecure_skip_verify);
DECLARE_string(otel_trace_tls_minimum_version);
// Other flags
DECLARE_string(ssl_cipher_list);
DECLARE_string(tls_ciphersuites);
DECLARE_string(ssl_minimum_version);
// Constants
static const string SCOPE_SPAN_NAME = "org.apache.impala.impalad.query";
static const regex query_newline(
"(select|alter|compute|create|delete|drop|insert|invalidate|update|with)\\s*"
"(\n|\\s*\\\\*\\/)", regex::icase | regex::optimize | regex::nosubs);
// Holds the custom log handler for OpenTelemetry internal logs.
static nostd::shared_ptr<LogHandler> otel_log_handler_;
// Lambda function to check if SQL starts with relevant keywords for tracing
static const function<bool(std::string_view)> is_traceable_sql =
[](std::string_view sql_str) -> bool {
return
LIKELY(boost::algorithm::istarts_with(sql_str, "select ")
|| boost::algorithm::istarts_with(sql_str, "alter ")
|| boost::algorithm::istarts_with(sql_str, "compute ")
|| boost::algorithm::istarts_with(sql_str, "create ")
|| boost::algorithm::istarts_with(sql_str, "delete ")
|| boost::algorithm::istarts_with(sql_str, "drop ")
|| boost::algorithm::istarts_with(sql_str, "insert ")
|| boost::algorithm::istarts_with(sql_str, "invalidate ")
|| boost::algorithm::istarts_with(sql_str, "update ")
|| boost::algorithm::istarts_with(sql_str, "with "))
|| regex_search(sql_str.cbegin(), sql_str.cend(), query_newline);
};
namespace impala {
// TraceProvider is a singleton that provides access to the OpenTelemetry TracerProvider.
// Not shared globally via opentelemetry::trace::Provider::SetTracerProvider to enforce
// all tracing to go through the Impala-specific code interfaces to OpenTelemetry tracing.
static unique_ptr<trace::TracerProvider> provider_;
// Returns true if any TLS configuration flags are set for the OTel exporter.
static inline bool otel_tls_enabled() {
return boost::algorithm::istarts_with(FLAGS_otel_trace_collector_url, "https://");
} // function otel_tls_enabled
bool should_otel_trace_query(std::string_view sql,
const TSessionType::type& session_type) {
if (LIKELY(!FLAGS_otel_trace_beeswax) && session_type == TSessionType::BEESWAX) {
return false;
}
if (LIKELY(is_traceable_sql(sql))) {
return true;
}
// Loop until all leading comments and whitespace are skipped.
while (true) {
if (boost::algorithm::istarts_with(sql, "/*")) {
// Handle leading inline comments
size_t end_comment = sql.find("*/");
if (end_comment != string_view::npos) {
sql = sql.substr(end_comment + 2);
continue;
}
} else if (boost::algorithm::istarts_with(sql, "--")) {
// Handle leading comment lines
size_t end_comment = sql.find("\n");
if (end_comment != string_view::npos) {
sql = sql.substr(end_comment + 1);
continue;
}
} else if (UNLIKELY(boost::algorithm::istarts_with(sql, " "))) {
// Handle leading whitespace. Since Impala removes leading whitespace from the SQL
// statement, this case only happens if the sql statement starts with inline
// comments or there is a leading space on the first non-comment line.
size_t end_comment = sql.find_first_not_of(" ");
if (end_comment != string_view::npos) {
sql = sql.substr(end_comment);
continue;
}
} else if (boost::algorithm::istarts_with(sql, "\n")) {
// Handline newlines after inline comments.
size_t end_comment = sql.find_first_not_of("\n");
if (end_comment != string_view::npos) {
sql = sql.substr(end_comment);
continue;
}
}
// Check if the SQL statement starts with any of the keywords we want to trace
if (LIKELY(is_traceable_sql(sql))) {
return true;
}
// No more patterns to check
break;
}
return false;
} // function should_otel_trace_query
// Initializes an OtlpHttpExporter instance with configuration from global flags. The
// OtlpHttpExporter instance implements the SpanExporter interface. The function parameter
// `exporter` is an in-out parameter that will be populated with the created
// OtlpHttpExporter instance. Returns Status::OK() on success, or an error Status if
// configuration fails.
static Status init_exporter_http(unique_ptr<SpanExporter>& exporter) {
// Configure OTLP HTTP exporter
OtlpHttpExporterOptions opts;
opts.url = FLAGS_otel_trace_collector_url;
opts.content_type = HttpRequestContentType::kJson;
opts.timeout = chrono::seconds(FLAGS_otel_trace_timeout_s);
opts.console_debug = FLAGS_otel_debug;
// Retry settings
opts.retry_policy_max_attempts = FLAGS_otel_trace_retry_policy_max_attempts;
opts.retry_policy_initial_backoff =
chrono::duration<float>(FLAGS_otel_trace_retry_policy_initial_backoff_s);
if (FLAGS_otel_trace_retry_policy_max_backoff_s > 0) {
opts.retry_policy_max_backoff = chrono::duration<float>(
chrono::seconds(FLAGS_otel_trace_retry_policy_max_backoff_s));
}
opts.retry_policy_backoff_multiplier = FLAGS_otel_trace_retry_policy_backoff_multiplier;
// Compression Type
if (FLAGS_otel_trace_compression) {
opts.compression = "zlib";
}
// TLS Configurations
if (otel_tls_enabled()) {
if (FLAGS_otel_trace_tls_minimum_version.empty()) {
// Set minimum TLS version to the value of the global ssl_minimum_version flag.
// Since this flag is in the format "tlv1.2" or "tlsv1.3", we need to
// convert it to the format expected by OtlpHttpExporterOptions.
if (!FLAGS_ssl_minimum_version.empty()) {
opts.ssl_min_tls = FLAGS_ssl_minimum_version.substr(4); // Remove "tlsv" prefix
}
} else {
opts.ssl_min_tls = FLAGS_otel_trace_tls_minimum_version;
}
opts.ssl_insecure_skip_verify = FLAGS_otel_trace_tls_insecure_skip_verify;
opts.ssl_ca_cert_path = FLAGS_otel_trace_ca_cert_path;
opts.ssl_ca_cert_string = FLAGS_otel_trace_ca_cert_string;
opts.ssl_max_tls = "1.3";
opts.ssl_cipher = FLAGS_otel_trace_ssl_ciphers.empty() ? FLAGS_ssl_cipher_list :
FLAGS_otel_trace_ssl_ciphers;
opts.ssl_cipher_suite = FLAGS_otel_trace_tls_cipher_suites.empty() ?
FLAGS_tls_ciphersuites : FLAGS_otel_trace_tls_cipher_suites;
}
// Additional HTTP headers
if (!FLAGS_otel_trace_additional_headers.empty()) {
for (auto header : strings::Split(FLAGS_otel_trace_additional_headers, ":::")) {
auto pos = header.find('=');
const string key = trim_copy(header.substr(0, pos).as_string());
const string value = trim_copy(header.substr(pos + 1).as_string());
VLOG(2) << "Adding additional OTel header: " << key << " = " << value;
opts.http_headers.emplace(key, value);
}
}
exporter = OtlpHttpExporterFactory::Create(opts);
return Status::OK();
} // function init_exporter_http
// Initializes an OtlpFileExporter instance with configuration from global flags. The
// OtlpFileExporter instance implements the SpanExporter interface. Returns a unique_ptr
// which will always be initialized with the created OtlpHttpExporter instance.
//
// The file exporter is for test use only.
static unique_ptr<SpanExporter> init_exporter_file() {
OtlpFileClientFileSystemOptions file_client_opts;
file_client_opts.file_pattern = FLAGS_otel_file_pattern;
file_client_opts.alias_pattern = FLAGS_otel_file_alias_pattern;
file_client_opts.flush_interval = chrono::microseconds(chrono::milliseconds(
FLAGS_otel_file_flush_interval_ms));
file_client_opts.flush_count = FLAGS_otel_file_flush_count;
file_client_opts.file_size = FLAGS_otel_file_max_file_size;
file_client_opts.rotate_size = FLAGS_otel_file_max_file_count;
OtlpFileExporterOptions exporter_opts;
exporter_opts.backend_options = file_client_opts;
exporter_opts.console_debug = FLAGS_otel_debug;
return OtlpFileExporterFactory::Create(exporter_opts);
} // function init_exporter_file
// Initializes the OpenTelemetry Tracer singleton with the configuration defined in the
// coordinator startup flags. Returns Status::OK() on success, or an error Status if
// configuration fails.
Status init_otel_tracer() {
LOG(INFO) << "Initializing OpenTelemetry tracing.";
VLOG(2) << "OpenTelemetry version: " << OPENTELEMETRY_VERSION;
VLOG(2) << "OpenTelemetry ABI version: " << OPENTELEMETRY_ABI_VERSION;
VLOG(2) << "OpenTelemetry namespace: "
<< OPENTELEMETRY_STRINGIFY(OPENTELEMETRY_NAMESPACE);
otel_log_handler_ = nostd::shared_ptr<LogHandler>(new OtelLogHandler());
GlobalLogHandler::SetLogHandler(otel_log_handler_);
// Set the OpenTelemetry SDK internal log level based on the current glog level. The SDK
// does not support changing the log level once a Provider has been created.
if (FLAGS_otel_debug) {
GlobalLogHandler::SetLogLevel(LogLevel::Debug);
} else if (VLOG_IS_ON(1)) {
GlobalLogHandler::SetLogLevel(LogLevel::Info);
} else {
GlobalLogHandler::SetLogLevel(LogLevel::None);
}
unique_ptr<SpanExporter> exporter;
if(FLAGS_otel_trace_exporter == OTEL_EXPORTER_OTLP_HTTP) {
RETURN_IF_ERROR(init_exporter_http(exporter));
} else {
exporter = init_exporter_file();
}
VLOG(2) << "OpenTelemetry exporter: " << FLAGS_otel_trace_exporter;
// Set up tracer provider
unique_ptr<SpanProcessor> processor;
if (boost::iequals(trim_copy(FLAGS_otel_trace_span_processor), SPAN_PROCESSOR_BATCH)) {
VLOG(2) << "Using BatchSpanProcessor for OpenTelemetry spans";
BatchSpanProcessorOptions batch_opts;
batch_opts.max_queue_size = FLAGS_otel_trace_batch_queue_size;
batch_opts.max_export_batch_size = FLAGS_otel_trace_batch_max_batch_size;
batch_opts.schedule_delay_millis =
chrono::milliseconds(FLAGS_otel_trace_batch_schedule_delay_ms);
processor = BatchSpanProcessorFactory::Create(move(exporter), batch_opts);
} else {
VLOG(2) << "Using SimpleSpanProcessor for OTel spans";
LOG(WARNING) << "Setting --otel_trace_span_processor=simple blocks the query "
"processing thread while exporting spans to the OTel collector. This will cause "
"significant performance degradation and is not recommended for production use.";
processor = make_unique<SimpleSpanProcessor>(move(exporter));
}
provider_ = TracerProviderFactory::Create(move(processor),
sdk::resource::Resource::Create({
{"service.name", "Impala"},
{"service.version", GetDaemonBuildVersion()}
}));
return Status::OK();
} // function init_otel_tracer
void shutdown_otel_tracer() {
LOG(INFO) << "Shutting down OpenTelemetry tracing.";
DCHECK(provider_) << "OpenTelemetry tracer was not initialized.";
// Force a reset of the provider_ shared_ptr to ensure that the
// TracerProvider destructor is called, which will flush any remaining spans.
provider_.reset();
}
shared_ptr<SpanManager> build_span_manager(ClientRequestState* crs) {
DCHECK(provider_) << "OpenTelemetry tracer was not initialized.";
return make_shared<SpanManager>(
provider_->GetTracer(SCOPE_SPAN_NAME, SCOPE_SPAN_SPEC_VERSION), crs);
} // function build_span_manager
namespace test {
bool otel_tls_enabled_for_testing() {
return otel_tls_enabled();
}
} // namespace test
} // namespace impala