| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "common/init.h" |
| |
| #include <csignal> |
| #include <regex> |
| #include <boost/filesystem.hpp> |
| #include <third_party/lss/linux_syscall_support.h> |
| |
| #include "common/global-flags.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/kudu/kudu-util.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "exprs/string-functions.h" |
| #include "exprs/timezone_db.h" |
| #include "gutil/atomicops.h" |
| #include "gutil/strings/substitute.h" |
| #include "rpc/authentication.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/datetime-simple-date-format-parser.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/lib-cache.h" |
| #include "runtime/mem-tracker.h" |
| #include "service/impala-server.h" |
| #include "kudu/util/debug-util.h" |
| #include "util/cgroup-util.h" |
| #include "util/cpu-info.h" |
| #include "util/debug-util.h" |
| #include "util/disk-info.h" |
| #include "util/filesystem-util.h" |
| #include "util/jni-util.h" |
| #include "util/logging-support.h" |
| #include "util/malloc-util.h" |
| #include "util/mem-info.h" |
| #include "util/memory-metrics.h" |
| #include "util/minidump.h" |
| #include "util/network-util.h" |
| #include "util/openssl-util.h" |
| #include "util/os-info.h" |
| #include "util/os-util.h" |
| #include "util/parse-util.h" |
| #include "util/periodic-counter-updater.h" |
| #include "util/pretty-printer.h" |
| #include "util/redactor.h" |
| #include "util/test-info.h" |
| #include "util/thread.h" |
| #include "util/time.h" |
| #include "util/zip-util.h" |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| namespace filesystem = boost::filesystem; |
| |
| DECLARE_bool(enable_process_lifetime_heap_profiling); |
| DECLARE_string(heap_profile_dir); |
| DECLARE_string(hostname); |
| DECLARE_bool(use_resolved_hostname); |
| // TODO: rename this to be more generic when we have a good CM release to do so. |
| DECLARE_int32(logbufsecs); |
| DECLARE_int32(max_log_files); |
| DECLARE_int32(max_minidumps); |
| DECLARE_string(redaction_rules_file); |
| DECLARE_bool(redirect_stdout_stderr); |
| DECLARE_string(re2_mem_limit); |
| DECLARE_string(reserved_words_version); |
| DECLARE_bool(symbolize_stacktrace); |
| DECLARE_string(debug_actions); |
| DECLARE_int64(thrift_rpc_max_message_size); |
| DECLARE_int64(thrift_external_rpc_max_message_size); |
| DECLARE_double(hms_event_polling_interval_s); |
| DECLARE_bool(catalogd_ha_reset_metadata_on_failover); |
| DECLARE_bool(enable_insert_events); |
| DECLARE_bool(enable_reload_events); |
| |
| DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds " |
| "between memory maintenance iterations"); |
| |
| DEFINE_int64(pause_monitor_sleep_time_ms, 500, "Sleep time in milliseconds for " |
| "pause monitor thread."); |
| |
| DEFINE_int64(pause_monitor_warn_threshold_ms, 10000, "If the pause monitor sleeps " |
| "more than this time period, a warning is logged. If set to 0 or less, pause monitor" |
| " is disabled."); |
| |
| DEFINE_string(local_library_dir, "/tmp", |
| "Scratch space for local fs operations. Currently used for copying " |
| "UDF binaries locally from HDFS and also for initializing the timezone db"); |
| |
| DEFINE_bool(jvm_automatic_add_opens, true, |
| "Adds necessary --add-opens options for core Java modules necessary to correctly " |
| "calculate catalog metadata cache object sizes."); |
| |
| // Defined by glog. This allows users to specify the log level using a glob. For |
| // example -vmodule=*scanner*=3 would enable full logging for scanners. If redaction |
| // is enabled, this option won't be allowed because some logging dumps table data |
| // in ways the authors of redaction rules can't anticipate. |
| DECLARE_string(vmodule); |
| |
| using std::string; |
| |
| // Log maintenance thread that runs periodically. It flushes glog every logbufsecs sec. |
| // glog only automatically flushes the log file if logbufsecs has passed since the |
| // previous flush when a new log is written. That means that on a quiet system, logs |
| // will be buffered indefinitely. It also rotates log files. |
| static unique_ptr<impala::Thread> log_maintenance_thread; |
| |
| // Memory Maintenance thread that runs periodically to free up memory. It does the |
| // following things every memory_maintenance_sleep_time_ms secs: |
| // 1) Releases BufferPool memory that is not currently in use. |
| // 2) Frees excess memory that TCMalloc has left in its pageheap. |
| static unique_ptr<impala::Thread> memory_maintenance_thread; |
| |
| // Shutdown signal handler thread that calls sigwait() on IMPALA_SHUTDOWN_SIGNAL and |
| // initiates a graceful shutdown with a virtually unlimited deadline (one year). |
| static unique_ptr<impala::Thread> shutdown_signal_handler_thread; |
| |
| // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps |
| // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual |
| // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged. |
| static unique_ptr<impala::Thread> pause_monitor; |
| |
| // Thread only used in backend tests to implement a test timeout. |
| static unique_ptr<impala::Thread> be_timeout_thread; |
| |
| // Fault injection thread that is spawned if FLAGS_debug_actions has label |
| // 'LOG_MAINTENANCE_STDERR'. |
| static unique_ptr<impala::Thread> log_fault_inject_thread; |
| |
| // Timeout after 2 hours - backend tests should generally run in minutes or tens of |
| // minutes at worst. |
| #if defined(UNDEFINED_SANITIZER_FULL) |
| static const int64_t BE_TEST_TIMEOUT_S = 60L * 60L * 4L; |
| #else |
| static const int64_t BE_TEST_TIMEOUT_S = 60L * 60L * 2L; |
| #endif |
| |
| #ifdef CODE_COVERAGE_ENABLED |
| extern "C" { void __gcov_flush(); } |
| #endif |
| |
| [[noreturn]] static void LogFaultInjectionThread() { |
| const int64_t sleep_duration = 1; |
| while (true) { |
| sleep(sleep_duration); |
| |
| const int64_t now = MonotonicMillis(); |
| Status status = DebugAction(FLAGS_debug_actions, "LOG_MAINTENANCE_STDERR"); |
| if (!status.ok()) { |
| // Fault injection activated. Print the error message several times to cerr. |
| for (int i = 0; i < 128; i++) { |
| std::cerr << now << " " << i << " " |
| << " LOG_MAINTENANCE_STDERR " << status.msg().msg() << endl; |
| } |
| |
| // Check that impalad can always find INFO and ERROR log path. |
| DCHECK(impala::HasLog(google::INFO)); |
| DCHECK(impala::HasLog(google::ERROR)); |
| } |
| } |
| } |
| |
| [[noreturn]] static void LogMaintenanceThread() { |
| int64_t last_flush = MonotonicMillis(); |
| const int64_t sleep_duration = std::min(1, FLAGS_logbufsecs); |
| while (true) { |
| sleep(sleep_duration); |
| |
| const int64_t now = MonotonicMillis(); |
| bool max_log_file_exceeded = RedirectStdoutStderr() && impala::CheckLogSize(false); |
| if ((now - last_flush) / 1000 < FLAGS_logbufsecs && !max_log_file_exceeded) { |
| continue; |
| } |
| |
| google::FlushLogFiles(google::GLOG_INFO); |
| |
| // Check log size again and force log rotation this time if they still big after |
| // FlushLogFiles. |
| if (max_log_file_exceeded && impala::CheckLogSize(true)) impala::ForceRotateLog(); |
| |
| // No need to rotate log files in tests. |
| if (impala::TestInfo::is_test()) continue; |
| // Reattach stdout and stderr if necessary. |
| if (impala::RedirectStdoutStderr()) impala::AttachStdoutStderr(); |
| // Check for log rotation in every interval of the maintenance thread |
| impala::CheckAndRotateLogFiles(FLAGS_max_log_files); |
| // Check for minidump rotation in every interval of the maintenance thread. This is |
| // necessary since an arbitrary number of minidumps can be written by sending SIGUSR1 |
| // to the process. |
| impala::CheckAndRotateMinidumps(FLAGS_max_minidumps); |
| |
| // update last_flush. |
| last_flush = MonotonicMillis(); |
| } |
| } |
| |
| [[noreturn]] static void MemoryMaintenanceThread() { |
| while (true) { |
| SleepForMs(FLAGS_memory_maintenance_sleep_time_ms); |
| impala::ExecEnv* env = impala::ExecEnv::GetInstance(); |
| // ExecEnv may not have been created yet or this may be the catalogd or statestored, |
| // which don't have ExecEnvs. |
| if (env != nullptr) { |
| BufferPool* buffer_pool = env->buffer_pool(); |
| if (buffer_pool != nullptr) buffer_pool->Maintenance(); |
| |
| // The process limit as measured by our trackers may get out of sync with the |
| // process usage if memory is allocated or freed without updating a MemTracker. |
| // The metric is refreshed whenever memory is consumed or released via a MemTracker, |
| // so on a system with queries executing it will be refreshed frequently. However |
| // if the system is idle, we need to refresh the tracker occasionally since |
| // untracked memory may be allocated or freed, e.g. by background threads. |
| if (env->process_mem_tracker() != nullptr) { |
| env->process_mem_tracker()->RefreshConsumptionFromMetric(); |
| } |
| } |
| // Periodically refresh values of the aggregate memory metrics to ensure they are |
| // somewhat up-to-date. |
| AggregateMemoryMetrics::Refresh(); |
| } |
| } |
| |
| [[noreturn]] static void ImpalaShutdownSignalHandler() { |
| sigset_t signals; |
| CHECK_EQ(0, sigemptyset(&signals)); |
| CHECK_EQ(0, sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL)); |
| DCHECK(ExecEnv::GetInstance() != nullptr); |
| DCHECK(ExecEnv::GetInstance()->impala_server() != nullptr); |
| ImpalaServer* impala_server = ExecEnv::GetInstance()->impala_server(); |
| while (true) { |
| int signal; |
| int err = sigwait(&signals, &signal); |
| CHECK(err == 0) << "sigwait(): " << GetStrErrMsg(err) << ": " << err; |
| CHECK_EQ(IMPALA_SHUTDOWN_SIGNAL, signal); |
| ShutdownStatusPB shutdown_status; |
| Status status = impala_server->StartShutdown(-1, &shutdown_status); |
| if (!status.ok()) { |
| LOG(ERROR) << "Shutdown signal received but unable to initiate shutdown. Status: " |
| << status.GetDetail(); |
| continue; |
| } |
| LOG(INFO) << "Shutdown signal received. Current Shutdown Status: " |
| << ImpalaServer::ShutdownStatusToString(shutdown_status); |
| } |
| } |
| |
| static void PauseMonitorLoop() { |
| if (FLAGS_pause_monitor_warn_threshold_ms <= 0) return; |
| int64_t time_before_sleep = MonotonicMillis(); |
| while (true) { |
| SleepForMs(FLAGS_pause_monitor_sleep_time_ms); |
| int64_t sleep_time = MonotonicMillis() - time_before_sleep; |
| time_before_sleep += sleep_time; |
| if (sleep_time > FLAGS_pause_monitor_warn_threshold_ms) { |
| LOG(WARNING) << "A process pause was detected for approximately " << |
| PrettyPrinter::Print(sleep_time, TUnit::TIME_MS); |
| } |
| } |
| } |
| |
| // Signal handler for SIGTERM, that prints the message before doing an exit. |
| [[noreturn]] static void HandleSigTerm(int signum, siginfo_t* info, void* context) { |
| const char* msg = "Caught signal: SIGTERM. Daemon will exit.\n"; |
| sys_write(STDOUT_FILENO, msg, strlen(msg)); |
| #ifdef CODE_COVERAGE_ENABLED |
| // On some systems __gcov_flush() only flushes a small subset of the coverage data. |
| // If you run into this problem, there is a workaround that you can use at your own |
| // risk: instead of calling __gcov_flush() and _exit(0) try to invoke exit(0) (no |
| // underscore). You should only do this in your dev environment. |
| __gcov_flush(); |
| #endif |
| // _exit() is async signal safe and is equivalent to the behaviour of the default |
| // SIGTERM handler. exit() can run arbitrary code and is *not* safe to use here. |
| _exit(0); |
| } |
| |
| // Helper method that checks the return value of a syscall passed through |
| // 'syscall_ret_val'. If it indicates an error, it writes an error message to stderr along |
| // with the error string fetched via errno and calls exit(). |
| void AbortIfError(const int syscall_ret_val, const string& msg) { |
| if (syscall_ret_val == 0) return; |
| cerr << Substitute("$0 Error: $1", msg, GetStrErrMsg()); |
| exit(1); |
| } |
| |
| // Blocks the IMPALA_SHUTDOWN_SIGNAL signal. Should be called by the process before |
| // spawning any other threads to make sure it gets blocked in all threads and will only be |
| // caught by the thread waiting on it. |
| void BlockImpalaShutdownSignal() { |
| const string error_msg = "Failed to block IMPALA_SHUTDOWN_SIGNAL for all threads."; |
| sigset_t signals; |
| AbortIfError(sigemptyset(&signals), error_msg); |
| AbortIfError(sigaddset(&signals, IMPALA_SHUTDOWN_SIGNAL), error_msg); |
| AbortIfError(pthread_sigmask(SIG_BLOCK, &signals, nullptr), error_msg); |
| } |
| |
| // Returns Java major version, such as 8, 11, or 17. |
| static int GetJavaMajorVersion() { |
| string cmd = "java"; |
| const char* java_home = getenv("JAVA_HOME"); |
| if (java_home != NULL) { |
| cmd = (filesystem::path(java_home) / "bin" / "java").string(); |
| } |
| cmd += " -version 2>&1"; |
| string msg; |
| if (!RunShellProcess(cmd, &msg, false, {"JAVA_TOOL_OPTIONS"})) { |
| LOG(INFO) << Substitute("Unable to determine Java version (default to 8): $0", msg); |
| return 8; |
| } |
| |
| // Find a version string in the first line. |
| string first_line; |
| std::getline(istringstream(msg), first_line); |
| // Need to allow for a wide variety of formats for different JDK implementations. |
| // Example: openjdk version "11.0.19" 2023-04-18 |
| std::regex java_version_pattern("\"([0-9]{1,3})\\.[0-9]+\\.[0-9]+[^\"]*\""); |
| std::smatch matches; |
| if (!std::regex_search(first_line, matches, java_version_pattern)) { |
| LOG(INFO) << Substitute("Unable to determine Java version (default to 8): $0", msg); |
| return 8; |
| } |
| DCHECK_EQ(matches.size(), 2); |
| return std::stoi(matches.str(1)); |
| } |
| |
| // Append the javaagent arg to JAVA_TOOL_OPTIONS to load jamm. |
| static Status JavaAddJammAgent() { |
| stringstream val_out; |
| char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); |
| if (current_val_c != NULL) { |
| val_out << current_val_c << " "; |
| } |
| |
| istringstream classpath {getenv("CLASSPATH")}; |
| string jamm_path, test_path; |
| while (getline(classpath, test_path, ':')) { |
| Status status = FileSystemUtil::FindFileInPath(test_path, "jamm-.*.jar", &jamm_path); |
| // Error during FindFileInPath is not fatal if jamm path is found in another path. |
| if (!status.ok()) { |
| LOG(ERROR) << "Error when processing class path: " << status.msg().msg(); |
| } |
| if (!jamm_path.empty()) break; |
| } |
| if (jamm_path.empty()) { |
| return Status("Could not find jamm-*.jar in Java CLASSPATH"); |
| } |
| val_out << "-javaagent:" << jamm_path; |
| |
| if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { |
| return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); |
| } |
| return Status::OK(); |
| } |
| |
| // Append add-opens args to JAVA_TOOL_OPTIONS for jamm. |
| static Status JavaAddOpens() { |
| if (!FLAGS_jvm_automatic_add_opens) return Status::OK(); |
| |
| stringstream val_out; |
| char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); |
| if (current_val_c != NULL) { |
| val_out << current_val_c; |
| } |
| |
| for (const string& param : { |
| // Needed for jamm to access lambdas. |
| "--add-opens=java.base/java.lang=ALL-UNNAMED", |
| "--add-opens=java.base/java.nio=ALL-UNNAMED", |
| "--add-opens=java.base/java.util.regex=ALL-UNNAMED", |
| "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" |
| }) { |
| val_out << " " << param; |
| } |
| |
| if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { |
| return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); |
| } |
| return Status::OK(); |
| } |
| |
| static Status InitializeJavaWeigher() { |
| int version = GetJavaMajorVersion(); |
| DCHECK_GE(version, 17) << "Unsupported Java version: " << version; |
| LOG(INFO) << "Using Java weigher jamm"; |
| RETURN_IF_ERROR(JavaAddJammAgent()); |
| RETURN_IF_ERROR(JavaAddOpens()); |
| return Status::OK(); |
| } |
| |
| static Status JavaSetProcessName(const string& name) { |
| string current_val; |
| char* current_val_c = getenv("JAVA_TOOL_OPTIONS"); |
| if (current_val_c != NULL) { |
| current_val = current_val_c; |
| } |
| |
| if (!current_val.empty() && current_val.find("-Dsun.java.command") != string::npos) { |
| LOG(WARNING) << "Overriding sun.java.command in JAVA_TOOL_OPTIONS to " << name; |
| } |
| |
| stringstream val_out; |
| if (!current_val.empty()) { |
| val_out << current_val << " "; |
| } |
| // Set sun.java.command so jps reports the name correctly, and ThreadNameAnnotator can |
| // use the process name for the main thread (and correctly restore the process name). |
| val_out << "-Dsun.java.command=" << name; |
| |
| if (setenv("JAVA_TOOL_OPTIONS", val_out.str().c_str(), 1) < 0) { |
| return Status(Substitute("Could not update JAVA_TOOL_OPTIONS: $0", GetStrErrMsg())); |
| } |
| return Status::OK(); |
| } |
| |
| void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm, |
| TestInfo::Mode test_mode, bool external_fe) { |
| srand(time(NULL)); |
| BlockImpalaShutdownSignal(); |
| |
| // Set the default hostname. The user can override this with the hostname flag. |
| ABORT_IF_ERROR(GetHostname(&FLAGS_hostname)); |
| |
| #ifdef NDEBUG |
| // Symbolize stacktraces by default in debug mode. |
| FLAGS_symbolize_stacktrace = false; |
| # else |
| FLAGS_symbolize_stacktrace = true; |
| #endif |
| |
| if (external_fe) { |
| // Change defaults for flags when loaded as part of external frontend. |
| // Write logs to stderr by default (otherwise logs get written to |
| // FeSupport.INFO/ERROR). |
| FLAGS_logtostderr = true; |
| // Do not redirct stdout/stderr by default. |
| FLAGS_redirect_stdout_stderr = false; |
| } |
| |
| google::SetVersionString(impala::GetBuildVersion()); |
| google::ParseCommandLineFlags(&argc, &argv, true); |
| |
| CpuInfo::Init(); |
| DiskInfo::Init(); |
| MemInfo::Init(); |
| OsInfo::Init(); |
| TestInfo::Init(test_mode); |
| |
| if (!FLAGS_redaction_rules_file.empty()) { |
| if (VLOG_ROW_IS_ON || !FLAGS_vmodule.empty()) { |
| CLEAN_EXIT_WITH_ERROR("Redaction cannot be used in combination with log level 3 or " |
| "higher or the -vmodule option because these log levels may log data in " |
| "ways redaction rules may not anticipate."); |
| } |
| const string& error_message = SetRedactionRulesFromFile(FLAGS_redaction_rules_file); |
| if (!error_message.empty()) CLEAN_EXIT_WITH_ERROR(error_message); |
| } |
| if (FLAGS_read_size < READ_SIZE_MIN_VALUE) { |
| CLEAN_EXIT_WITH_ERROR(Substitute("read_size can not be lower than $0", |
| READ_SIZE_MIN_VALUE)); |
| } |
| |
| bool is_percent = false; // not used |
| int64_t re2_mem_limit = ParseUtil::ParseMemSpec(FLAGS_re2_mem_limit, &is_percent, 0); |
| if (re2_mem_limit <= 0) { |
| CLEAN_EXIT_WITH_ERROR( |
| Substitute("Invalid mem limit for re2's regex engine: $0", FLAGS_re2_mem_limit)); |
| } else { |
| StringFunctions::SetRE2MemLimit(re2_mem_limit); |
| } |
| |
| if (FLAGS_reserved_words_version != "2.11.0" && FLAGS_reserved_words_version != "3.0.0") |
| { |
| CLEAN_EXIT_WITH_ERROR(Substitute("Invalid flag reserved_words_version. The value must" |
| " be one of [\"2.11.0\", \"3.0.0\"], while the provided value is $0.", |
| FLAGS_reserved_words_version)); |
| } |
| |
| // Enforce a minimum value for thrift_max_message_size, as configuring the limit to |
| // a small value is very unlikely to work. |
| if (!impala::TestInfo::is_test() && FLAGS_thrift_rpc_max_message_size > 0 |
| && FLAGS_thrift_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { |
| CLEAN_EXIT_WITH_ERROR( |
| Substitute("Invalid $0: $1 is less than the minimum value of $2.", |
| "thrift_rpc_max_message_size", FLAGS_thrift_rpc_max_message_size, |
| ThriftDefaultMaxMessageSize())); |
| } |
| |
| // Enforce a minimum value for thrift_external_max_message_size, as configuring the |
| // limit to a small value is very unlikely to work. |
| if (!impala::TestInfo::is_test() && FLAGS_thrift_external_rpc_max_message_size > 0 |
| && FLAGS_thrift_external_rpc_max_message_size < ThriftDefaultMaxMessageSize()) { |
| CLEAN_EXIT_WITH_ERROR( |
| Substitute("Invalid $0: $1 is less than the minimum value of $2.", |
| "thrift_external_rpc_max_message_size", |
| FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize())); |
| } |
| |
| if (!FLAGS_catalogd_ha_reset_metadata_on_failover) { |
| if (FLAGS_hms_event_polling_interval_s <= 0) { |
| CLEAN_EXIT_WITH_ERROR(Substitute( |
| "Invalid hms_event_polling_interval_s: $0. It should be larger than 0 when " |
| "--catalogd_ha_reset_metadata_on_failover is false", |
| FLAGS_hms_event_polling_interval_s)); |
| } |
| if (!FLAGS_enable_insert_events) { |
| CLEAN_EXIT_WITH_ERROR(Substitute( |
| "--enable_insert_events should be true when " |
| "--catalogd_ha_reset_metadata_on_failover is false")); |
| } |
| if (!FLAGS_enable_reload_events) { |
| CLEAN_EXIT_WITH_ERROR(Substitute( |
| "--enable_reload_events should be true when " |
| "--catalogd_ha_reset_metadata_on_failover is false")); |
| } |
| } |
| |
| // Initialize the signal handler for stack trace collection BEFORE |
| // InitGoogleLoggingSafe. This must happen before |
| // google::InstallFailureSignalHandler() is called (which happens |
| // inside InitGoogleLoggingSafe), otherwise that might block our signal. |
| // |
| // We use SIGRTMIN+10 instead of SIGUSR1/SIGUSR2 because: |
| // - SIGUSR1 is used by minidump handling |
| // - SIGUSR2 crashes the embedded JVM |
| // |
| // IMPORTANT: This signal handler is used INTERNALLY by the /stacks web endpoint. |
| // It is NOT meant to be triggered manually via kill/pkill. The handler expects |
| // specific data structures to be set up when triggered, which only happens during |
| // a web request to /stacks. Manually sending this signal will crash the process. |
| // If you want to see thread stacks, visit the /stacks web UI page instead. |
| const int stack_trace_signal = SIGRTMIN + 10; |
| kudu::Status stack_trace_status = kudu::SetStackTraceSignal(stack_trace_signal); |
| if (!stack_trace_status.ok()) { |
| // Using fprintf since LOG isn't available yet |
| fprintf(stderr, "WARNING: Failed to initialize stack trace signal handler with " |
| "signal %d: %s. The /stacks endpoint will not work.\n", stack_trace_signal, |
| stack_trace_status.ToString().c_str()); |
| } else { |
| fprintf(stderr, "INFO: Stack trace signal handler initialized with signal %d " |
| "(SIGRTMIN+10). Access thread stacks via the /stacks web UI (DO NOT manually " |
| "send this signal).\n", stack_trace_signal); |
| } |
| |
| impala::InitGoogleLoggingSafe(argv[0]); |
| // Breakpad needs flags and logging to initialize. |
| if (!external_fe) { |
| ABORT_IF_ERROR(RegisterMinidump(argv[0])); |
| } |
| impala::InitThreading(); |
| impala::datetime_parse_util::SimpleDateFormatTokenizer::InitCtx(); |
| impala::SeedOpenSSLRNG(); |
| ABORT_IF_ERROR(impala::InitAuth(argv[0])); |
| |
| // Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading. |
| Status thread_spawn_status = Thread::Create("common", "log-maintenance-thread", |
| &LogMaintenanceThread, &log_maintenance_thread); |
| if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); |
| |
| thread_spawn_status = |
| Thread::Create("common", "pause-monitor", &PauseMonitorLoop, &pause_monitor); |
| if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); |
| |
| // Initialize log fault injection if such debug action exist. |
| if (strstr(FLAGS_debug_actions.c_str(), "LOG_MAINTENANCE_STDERR") != NULL) { |
| thread_spawn_status = Thread::Create("common", "log-fault-inject-thread", |
| &LogFaultInjectionThread, &log_fault_inject_thread); |
| if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); |
| } |
| |
| // Implement timeout for backend tests. |
| if (impala::TestInfo::is_be_test()) { |
| thread_spawn_status = Thread::Create("common", "be-test-timeout-thread", |
| []() { |
| SleepForMs(BE_TEST_TIMEOUT_S * 1000L); |
| LOG(FATAL) << "Backend test timed out after " << BE_TEST_TIMEOUT_S << "s"; |
| }, |
| &be_timeout_thread); |
| if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail()); |
| } |
| |
| PeriodicCounterUpdater::Init(); |
| |
| LOG(INFO) << impala::GetVersionString(); |
| LOG(INFO) << "Using hostname: " << FLAGS_hostname; |
| LOG(INFO) << "Using locale: " << std::locale("").name(); |
| impala::LogCommandLineFlags(); |
| |
| // When a process calls send(2) on a socket closed on the other end, linux generates |
| // SIGPIPE. MSG_NOSIGNAL can be passed to send(2) to disable it, which thrift does. But |
| // OpenSSL doesn't have place for this parameter so the signal must be disabled |
| // manually. |
| signal(SIGPIPE, SIG_IGN); |
| InitThriftLogging(); |
| |
| LOG(INFO) << CpuInfo::DebugString(); |
| LOG(INFO) << DiskInfo::DebugString(); |
| LOG(INFO) << MemInfo::DebugString(); |
| LOG(INFO) << OsInfo::DebugString(); |
| LOG(INFO) << CGroupUtil::DebugString(); |
| LOG(INFO) << "Process ID: " << getpid(); |
| LOG(INFO) << "Default AES cipher mode for spill-to-disk: " |
| << EncryptionKey::ModeToString(EncryptionKey::GetSupportedDefaultMode()); |
| |
| // After initializing logging and printing the machine information, verify the |
| // minimal CPU requirements and exit if they are not met. |
| Status cpu_requirement_status = CpuInfo::EnforceCpuRequirements(); |
| if (!cpu_requirement_status.ok()) { |
| CLEAN_EXIT_WITH_ERROR(cpu_requirement_status.GetDetail()); |
| } |
| |
| if (FLAGS_use_resolved_hostname) { |
| IpAddr ip_address; |
| Status status = HostnameToIpAddr(FLAGS_hostname, &ip_address); |
| if (!status.ok()) CLEAN_EXIT_WITH_ERROR(status.GetDetail()); |
| LOG(INFO) << Substitute("Resolved hostname $0 to $1", FLAGS_hostname, ip_address); |
| FLAGS_hostname = ip_address; |
| } |
| |
| // Required for the FE's Catalog |
| ABORT_IF_ERROR(impala::LibCache::Init(external_fe)); |
| Status fs_cache_init_status = impala::HdfsFsCache::Init(); |
| if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail()); |
| |
| if (init_jvm) { |
| if (!external_fe) { |
| ABORT_IF_ERROR(InitializeJavaWeigher()); |
| ABORT_IF_ERROR(JavaSetProcessName(filesystem::path(argv[0]).filename().string())); |
| JniUtil::InitLibhdfs(); |
| } |
| ABORT_IF_ERROR(JniUtil::Init()); |
| InitJvmLoggingSupport(); |
| if (!external_fe) { |
| ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor()); |
| } |
| ZipUtil::InitJvm(); |
| } |
| |
| if (argc == -1) { |
| // Should not be called. We need BuiltinsInit() so the builtin symbols are |
| // not stripped. |
| DCHECK(false); |
| ScalarExprEvaluator::InitBuiltinsDummy(); |
| } |
| |
| if (impala::KuduIsAvailable()) impala::InitKuduLogging(); |
| |
| // Start lifetime heap profiling if it is supported and configured |
| if (MallocUtil::GetInstance()->SupportsHeapProfiling() && |
| FLAGS_enable_process_lifetime_heap_profiling) { |
| MallocUtil::GetInstance()->HeapProfilerStart(FLAGS_heap_profile_dir.c_str()); |
| } |
| |
| // Signal handler for handling the SIGTERM. We want to log a message when catalogd or |
| // impalad or statestored is being shutdown using a SIGTERM. |
| struct sigaction action; |
| memset(&action, 0, sizeof(struct sigaction)); |
| action.sa_sigaction = &HandleSigTerm; |
| action.sa_flags = SA_SIGINFO; |
| if (sigaction(SIGTERM, &action, nullptr) == -1) { |
| stringstream error_msg; |
| error_msg << "Failed to register action for SIGTERM: " << GetStrErrMsg(); |
| CLEAN_EXIT_WITH_ERROR(error_msg.str()); |
| } |
| |
| if (external_fe || test_mode == TestInfo::FE_TEST) { |
| // Explicitly load the timezone database for external FEs and FE tests. |
| // Impala daemons load it through ImpaladMain |
| ABORT_IF_ERROR(TimezoneDatabase::Initialize()); |
| } |
| } |
| |
| Status impala::StartMemoryMaintenanceThread() { |
| DCHECK(AggregateMemoryMetrics::TOTAL_USED != nullptr) << "Mem metrics not registered."; |
| return Thread::Create("common", "memory-maintenance-thread", |
| &MemoryMaintenanceThread, &memory_maintenance_thread); |
| } |
| |
| Status impala::StartImpalaShutdownSignalHandlerThread() { |
| return Thread::Create("common", "shutdown-signal-handler", &ImpalaShutdownSignalHandler, |
| &shutdown_signal_handler_thread); |
| } |
| |
| #if defined(ADDRESS_SANITIZER) |
| // Default ASAN_OPTIONS. Override by setting environment variable $ASAN_OPTIONS. |
| extern "C" const char *__asan_default_options() { |
| // IMPALA-2746: backend tests don't pass with leak sanitizer enabled. |
| return "handle_segv=0 detect_leaks=0 allocator_may_return_null=1"; |
| } |
| #endif |
| |
| #if defined(THREAD_SANITIZER) |
| extern "C" const char* __tsan_default_options() { |
| // Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS. |
| // TSAN and Java don't play nicely together because JVM code is not instrumented with |
| // TSAN. TSAN requires all libs to be compiled with '-fsanitize=thread' (see |
| // https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code), |
| // which is not currently possible for Java code. See |
| // https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520 for efforts to get |
| // TSAN to run against Java code. The flag ignore_noninstrumented_modules tells TSAN to |
| // ignore all interceptors called from any non-instrumented libraries (e.g. Java). |
| return "ignore_noninstrumented_modules=" |
| #if defined(THREAD_SANITIZER_FULL) |
| "0 " |
| #else |
| "1 " |
| #endif |
| "halt_on_error=1 history_size=7 allocator_may_return_null=1 " |
| "suppressions=" THREAD_SANITIZER_SUPPRESSIONS; |
| } |
| #endif |
| |
| // Default UBSAN_OPTIONS. Override by setting environment variable $UBSAN_OPTIONS. |
| #if defined(UNDEFINED_SANITIZER) |
| extern "C" const char *__ubsan_default_options() { |
| return "print_stacktrace=1 suppressions=" UNDEFINED_SANITIZER_SUPPRESSIONS; |
| } |
| #endif |