| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "common/thread-debug-info.h" |
| #include "service/impala-server.h" |
| #include "service/impala-server.inline.h" |
| |
| #include <algorithm> |
| #include <type_traits> |
| |
| #include <boost/algorithm/string.hpp> |
| #include <boost/algorithm/string/join.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/date_time/posix_time/posix_time_types.hpp> |
| #include <boost/unordered_set.hpp> |
| #include <jni.h> |
| #include <gperftools/heap-profiler.h> |
| #include <gperftools/malloc_extension.h> |
| #include <gtest/gtest.h> |
| #include <gutil/strings/substitute.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/prettywriter.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include "common/logging.h" |
| #include "common/version.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/coordinator.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/raw-value.h" |
| #include "scheduling/admission-controller.h" |
| #include "service/client-request-state.h" |
| #include "service/hs2-util.h" |
| #include "service/query-options.h" |
| #include "service/query-result-set.h" |
| #include "util/auth-util.h" |
| #include "util/debug-util.h" |
| #include "util/impalad-metrics.h" |
| #include "util/metrics.h" |
| #include "util/pretty-printer.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/string-parser.h" |
| |
| #include "common/names.h" |
| |
| using boost::adopt_lock_t; |
| using boost::algorithm::join; |
| using boost::algorithm::iequals; |
| using boost::uuids::uuid; |
| using namespace apache::hive::service::cli::thrift; |
| using namespace apache::hive::service::cli; |
| using namespace apache::thrift; |
| using namespace beeswax; // Converting QueryState |
| using namespace strings; |
| |
| const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION = |
| TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6; |
| |
| // HiveServer2 error returning macro |
| #define HS2_RETURN_ERROR(return_val, error_msg, error_state) \ |
| do { \ |
| return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \ |
| return_val.status.__set_errorMessage((error_msg)); \ |
| return_val.status.__set_sqlState((error_state)); \ |
| return; \ |
| } while (false) |
| |
| #define HS2_RETURN_IF_ERROR(return_val, status, error_state) \ |
| do { \ |
| const Status& _status = (status); \ |
| if (UNLIKELY(!_status.ok())) { \ |
| HS2_RETURN_ERROR(return_val, _status.GetDetail(), (error_state)); \ |
| return; \ |
| } \ |
| } while (false) |
| |
| DECLARE_string(hostname); |
| DECLARE_int32(webserver_port); |
| DECLARE_int32(idle_session_timeout); |
| DECLARE_int32(disconnected_session_timeout); |
| |
| namespace impala { |
| |
| const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size"; |
| |
| void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, |
| TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) { |
| TUniqueId session_id; |
| TUniqueId secret; |
| Status unique_id_status = |
| THandleIdentifierToTUniqueId(session_handle, &session_id, &secret); |
| if (!unique_id_status.ok()) { |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(unique_id_status.GetDetail()); |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| ScopedSessionState scoped_session(this); |
| shared_ptr<SessionState> session; |
| Status get_session_status = |
| scoped_session.WithSession(session_id, SecretArg::Session(secret), &session); |
| if (!get_session_status.ok()) { |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(get_session_status.GetDetail()); |
| // TODO: (here and elsewhere) - differentiate between invalid session ID and timeout |
| // when setting the error code. |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| |
| if (session == NULL) { |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(Substitute("Invalid session id: $0", |
| PrintId(session_id))); |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| TQueryCtx query_ctx; |
| PrepareQueryContext(&query_ctx); |
| ScopedThreadContext tdi_context(GetThreadDebugInfo(), query_ctx.query_id); |
| session->ToThrift(session_id, &query_ctx.session); |
| request->__set_session(query_ctx.session); |
| |
| shared_ptr<ClientRequestState> request_state; |
| // There is no user-supplied query text available because this metadata operation comes |
| // from an RPC. As a best effort, we use the type of the operation. |
| map<int, const char*>::const_iterator query_text_it = |
| _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode); |
| const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ? |
| "N/A" : query_text_it->second; |
| query_ctx.client_request.stmt = query_text; |
| request_state.reset(new ClientRequestState(query_ctx, exec_env_, |
| exec_env_->frontend(), this, session)); |
| Status register_status = RegisterQuery(session, request_state); |
| if (!register_status.ok()) { |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(register_status.GetDetail()); |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| |
| Status exec_status = request_state->Exec(*request); |
| if (!exec_status.ok()) { |
| discard_result(UnregisterQuery(request_state->query_id(), false, &exec_status)); |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(exec_status.GetDetail()); |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| |
| request_state->UpdateNonErrorOperationState(TOperationState::FINISHED_STATE); |
| |
| Status inflight_status = SetQueryInflight(session, request_state); |
| if (!inflight_status.ok()) { |
| discard_result(UnregisterQuery(request_state->query_id(), false, &inflight_status)); |
| status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| status->__set_errorMessage(inflight_status.GetDetail()); |
| status->__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| return; |
| } |
| handle->__set_hasResultSet(true); |
| // Secret is inherited from session. |
| TUniqueId operation_id = request_state->query_id(); |
| TUniqueIdToTHandleIdentifier(operation_id, secret, &(handle->operationId)); |
| status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| Status ImpalaServer::FetchInternal(ClientRequestState* request_state, |
| SessionState* session, int32_t fetch_size, bool fetch_first, |
| TFetchResultsResp* fetch_results, int32_t* num_results) { |
| // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait() |
| // ensures that rows are ready to be fetched (e.g., Wait() opens |
| // ClientRequestState::output_exprs_, which are evaluated in |
| // ClientRequestState::FetchRows() below). |
| int64_t block_on_wait_time_us = 0; |
| if (!request_state->BlockOnWait( |
| request_state->fetch_rows_timeout_us(), &block_on_wait_time_us)) { |
| fetch_results->status.__set_statusCode(thrift::TStatusCode::STILL_EXECUTING_STATUS); |
| fetch_results->__set_hasMoreRows(true); |
| fetch_results->__isset.results = false; |
| return Status::OK(); |
| } |
| |
| lock_guard<mutex> frl(*request_state->fetch_rows_lock()); |
| lock_guard<mutex> l(*request_state->lock()); |
| |
| // Check for cancellation or an error. |
| RETURN_IF_ERROR(request_state->query_status()); |
| |
| if (request_state->num_rows_fetched() == 0) { |
| request_state->set_fetched_rows(); |
| } |
| |
| if (fetch_first) RETURN_IF_ERROR(request_state->RestartFetch()); |
| |
| fetch_results->results.__set_startRowOffset(request_state->num_rows_fetched()); |
| |
| // Child queries should always return their results in row-major format, rather than |
| // inheriting the parent session's setting. |
| bool is_child_query = request_state->parent_query_id() != TUniqueId(); |
| TProtocolVersion::type version = is_child_query ? |
| TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version; |
| scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet( |
| version, *(request_state->result_metadata()), &(fetch_results->results))); |
| RETURN_IF_ERROR( |
| request_state->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us)); |
| *num_results = result_set->size(); |
| fetch_results->__isset.results = true; |
| fetch_results->__set_hasMoreRows(!request_state->eos()); |
| return Status::OK(); |
| } |
| |
| Status ImpalaServer::TExecuteStatementReqToTQueryContext( |
| const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) { |
| query_ctx->client_request.stmt = execute_request.statement; |
| VLOG_QUERY << "TExecuteStatementReq: " << ThriftDebugString(execute_request); |
| QueryOptionsMask set_query_options_mask; |
| { |
| shared_ptr<SessionState> session_state; |
| TUniqueId session_id; |
| TUniqueId secret; |
| RETURN_IF_ERROR(THandleIdentifierToTUniqueId(execute_request.sessionHandle.sessionId, |
| &session_id, &secret)); |
| |
| RETURN_IF_ERROR( |
| GetSessionState(session_id, SecretArg::Session(secret), &session_state)); |
| session_state->ToThrift(session_id, &query_ctx->session); |
| lock_guard<mutex> l(session_state->lock); |
| query_ctx->client_request.query_options = session_state->QueryOptions(); |
| set_query_options_mask = session_state->set_query_options_mask; |
| } |
| |
| if (execute_request.__isset.confOverlay) { |
| TQueryOptions overlay; |
| QueryOptionsMask overlay_mask; |
| map<string, string>::const_iterator conf_itr = execute_request.confOverlay.begin(); |
| for (; conf_itr != execute_request.confOverlay.end(); ++conf_itr) { |
| if (conf_itr->first == IMPALA_RESULT_CACHING_OPT) continue; |
| if (conf_itr->first == ChildQuery::PARENT_QUERY_OPT) { |
| if (ParseId(conf_itr->second, &query_ctx->parent_query_id)) { |
| query_ctx->__isset.parent_query_id = true; |
| } |
| continue; |
| } |
| RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second, |
| &overlay, &overlay_mask)); |
| } |
| OverlayQueryOptions(overlay, overlay_mask, &query_ctx->client_request.query_options); |
| set_query_options_mask |= overlay_mask; |
| } |
| // Only query options not set in the session or confOverlay can be overridden by the |
| // pool options. |
| AddPoolConfiguration(query_ctx, ~set_query_options_mask); |
| VLOG_QUERY << "TClientRequest.queryOptions: " |
| << ThriftDebugString(query_ctx->client_request.query_options); |
| return Status::OK(); |
| } |
| |
| // HiveServer2 API |
| void ImpalaServer::OpenSession(TOpenSessionResp& return_val, |
| const TOpenSessionReq& request) { |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| // Generate session ID and the secret |
| uuid secret_uuid; |
| uuid session_uuid; |
| { |
| lock_guard<mutex> l(uuid_lock_); |
| secret_uuid = crypto_uuid_generator_(); |
| session_uuid = crypto_uuid_generator_(); |
| } |
| return_val.sessionHandle.sessionId.guid.assign( |
| session_uuid.begin(), session_uuid.end()); |
| return_val.sessionHandle.sessionId.secret.assign( |
| secret_uuid.begin(), secret_uuid.end()); |
| DCHECK_EQ(return_val.sessionHandle.sessionId.guid.size(), 16); |
| DCHECK_EQ(return_val.sessionHandle.sessionId.secret.size(), 16); |
| return_val.__isset.sessionHandle = true; |
| TUniqueId session_id, secret; |
| UUIDToTUniqueId(session_uuid, &session_id); |
| UUIDToTUniqueId(secret_uuid, &secret); |
| |
| // DO NOT log this Thrift struct in its entirety, in case a bad client sets the |
| // password. |
| VLOG_QUERY << "Opening session: " << PrintId(session_id) << " username: " |
| << request.username; |
| |
| // create a session state: initialize start time, session type, database and default |
| // query options. |
| // TODO: Fix duplication of code between here and ConnectionStart(). |
| shared_ptr<SessionState> state = make_shared<SessionState>(this, session_id, secret); |
| state->closed = false; |
| state->start_time_ms = UnixMillis(); |
| state->session_type = TSessionType::HIVESERVER2; |
| state->network_address = ThriftServer::GetThreadConnectionContext()->network_address; |
| state->last_accessed_ms = UnixMillis(); |
| // request.client_protocol is not guaranteed to be a valid TProtocolVersion::type, so |
| // loading it can cause undefined behavior. Instead, we copy it to a value of the |
| // "underlying type" of the enum, then copy it back to state->hs_version only once we |
| // have clamped it to be at most MAX_SUPPORTED_HS2_VERSION. |
| std::underlying_type_t<decltype(request.client_protocol)> protocol_integer; |
| memcpy(&protocol_integer, &request.client_protocol, sizeof(request.client_protocol)); |
| state->hs2_version = static_cast<TProtocolVersion::type>( |
| min<decltype(protocol_integer)>(MAX_SUPPORTED_HS2_VERSION, protocol_integer)); |
| state->kudu_latest_observed_ts = 0; |
| |
| // If the username was set by a lower-level transport, use it. |
| const ThriftServer::ConnectionContext* connection_context = |
| ThriftServer::GetThreadConnectionContext(); |
| if (!connection_context->username.empty()) { |
| state->connected_user = connection_context->username; |
| if (!connection_context->do_as_user.empty()) { |
| state->do_as_user = connection_context->do_as_user; |
| Status status = AuthorizeProxyUser(state->connected_user, state->do_as_user); |
| HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); |
| } |
| } else { |
| state->connected_user = request.username; |
| } |
| |
| // Process the supplied configuration map. |
| state->database = "default"; |
| state->server_default_query_options = &default_query_options_; |
| state->session_timeout = FLAGS_idle_session_timeout; |
| if (request.__isset.configuration) { |
| typedef map<string, string> ConfigurationMap; |
| for (const ConfigurationMap::value_type& v: request.configuration) { |
| if (iequals(v.first, "impala.doas.user")) { |
| // If the current user is a valid proxy user, he/she can optionally perform |
| // authorization requests on behalf of another user. This is done by setting |
| // the 'impala.doas.user' Hive Server 2 configuration property. |
| state->do_as_user = v.second; |
| Status status = AuthorizeProxyUser(state->connected_user, state->do_as_user); |
| HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); |
| } else if (iequals(v.first, "use:database")) { |
| state->database = v.second; |
| } else { |
| // Normal configuration key. Use it to set session default query options. |
| // Ignore failure (failures will be logged in SetQueryOption()). |
| Status status = SetQueryOption(v.first, v.second, &state->set_query_options, |
| &state->set_query_options_mask); |
| if (status.ok() && iequals(v.first, "idle_session_timeout")) { |
| state->session_timeout = state->set_query_options.idle_session_timeout; |
| VLOG_QUERY << "OpenSession(): session: " << PrintId(session_id) |
| << " idle_session_timeout=" |
| << PrettyPrinter::Print(state->session_timeout, TUnit::TIME_S); |
| } |
| } |
| } |
| } |
| RegisterSessionTimeout(state->session_timeout); |
| TQueryOptionsToMap(state->QueryOptions(), &return_val.configuration); |
| |
| // OpenSession() should return the coordinator's HTTP server address. |
| const string& http_addr = TNetworkAddressToString(MakeNetworkAddress( |
| FLAGS_hostname, FLAGS_webserver_port)); |
| return_val.configuration.insert(make_pair("http_addr", http_addr)); |
| |
| { |
| lock_guard<mutex> l(connection_to_sessions_map_lock_); |
| const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId(); |
| connection_to_sessions_map_[connection_id].insert(session_id); |
| state->connections.insert(connection_id); |
| } |
| |
| // Put the session state in session_state_map_ |
| { |
| lock_guard<mutex> l(session_state_map_lock_); |
| session_state_map_.insert(make_pair(session_id, state)); |
| } |
| |
| ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(1); |
| |
| return_val.__isset.configuration = true; |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| return_val.serverProtocolVersion = state->hs2_version; |
| VLOG_QUERY << "Opened session: " << PrintId(session_id) << " username: " |
| << request.username; |
| } |
| |
| void ImpalaServer::CloseSession(TCloseSessionResp& return_val, |
| const TCloseSessionReq& request) { |
| VLOG_QUERY << "CloseSession(): request=" << ThriftDebugString(request); |
| |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR); |
| HS2_RETURN_IF_ERROR(return_val, |
| CloseSessionInternal( |
| session_id, SecretArg::Session(secret), /* ignore_if_absent= */ false), |
| SQLSTATE_GENERAL_ERROR); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::GetInfo(TGetInfoResp& return_val, |
| const TGetInfoReq& request) { |
| VLOG_QUERY << "GetInfo(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR); |
| ScopedSessionState session_handle(this); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Session(secret), &session), |
| SQLSTATE_GENERAL_ERROR); |
| |
| switch (request.infoType) { |
| case TGetInfoType::CLI_SERVER_NAME: |
| case TGetInfoType::CLI_DBMS_NAME: |
| return_val.infoValue.__set_stringValue("Impala"); |
| break; |
| case TGetInfoType::CLI_DBMS_VER: |
| return_val.infoValue.__set_stringValue(GetDaemonBuildVersion()); |
| break; |
| default: |
| HS2_RETURN_ERROR(return_val, "Unsupported operation", |
| SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED); |
| } |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, |
| const TExecuteStatementReq& request) { |
| VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| // We ignore the runAsync flag here: Impala's queries will always run asynchronously, |
| // and will block on fetch. To the client, this looks like Hive's synchronous mode; the |
| // difference is that rows are not available when ExecuteStatement() returns. |
| TQueryCtx query_ctx; |
| Status status = TExecuteStatementReqToTQueryContext(request, &query_ctx); |
| HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); |
| |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR); |
| ScopedSessionState session_handle(this); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Session(secret), &session), |
| SQLSTATE_GENERAL_ERROR); |
| if (session == NULL) { |
| string err_msg = Substitute("Invalid session id: $0", PrintId(session_id)); |
| VLOG(1) << err_msg; |
| HS2_RETURN_IF_ERROR(return_val, Status::Expected(err_msg), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| // Optionally enable result caching to allow restarting fetches. |
| int64_t cache_num_rows = -1; |
| if (request.__isset.confOverlay) { |
| map<string, string>::const_iterator iter = |
| request.confOverlay.find(IMPALA_RESULT_CACHING_OPT); |
| if (iter != request.confOverlay.end()) { |
| StringParser::ParseResult parse_result; |
| cache_num_rows = StringParser::StringToInt<int64_t>( |
| iter->second.c_str(), iter->second.size(), &parse_result); |
| if (parse_result != StringParser::PARSE_SUCCESS) { |
| HS2_RETURN_IF_ERROR( |
| return_val, Status::Expected(Substitute("Invalid value '$0' for '$1' option.", |
| iter->second, IMPALA_RESULT_CACHING_OPT)), SQLSTATE_GENERAL_ERROR); |
| } |
| } |
| } |
| |
| shared_ptr<ClientRequestState> request_state; |
| status = Execute(&query_ctx, session, &request_state); |
| HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); |
| |
| // Start thread to wait for results to become available. |
| status = request_state->WaitAsync(); |
| if (!status.ok()) goto return_error; |
| |
| // Optionally enable result caching on the ClientRequestState. |
| if (cache_num_rows > 0) { |
| status = request_state->SetResultCache( |
| QueryResultSet::CreateHS2ResultSet( |
| session->hs2_version, *request_state->result_metadata(), nullptr), |
| cache_num_rows); |
| if (!status.ok()) goto return_error; |
| } |
| // Once the query is running do a final check for session closure and add it to the |
| // set of in-flight queries. |
| status = SetQueryInflight(session, request_state); |
| if (!status.ok()) goto return_error; |
| return_val.__isset.operationHandle = true; |
| return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT); |
| return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set()); |
| // Secret is inherited from session. |
| TUniqueIdToTHandleIdentifier(request_state->query_id(), secret, |
| &return_val.operationHandle.operationId); |
| return_val.status.__set_statusCode( |
| apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS); |
| |
| VLOG_QUERY << "ExecuteStatement(): return_val=" << ThriftDebugString(return_val); |
| return; |
| |
| return_error: |
| discard_result(UnregisterQuery(request_state->query_id(), false, &status)); |
| HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val, |
| const TGetTypeInfoReq& request) { |
| VLOG_QUERY << "GetTypeInfo(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_TYPE_INFO); |
| req.__set_get_type_info_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_TYPE_INFO); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetTypeInfo(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val, |
| const TGetCatalogsReq& request) { |
| VLOG_QUERY << "GetCatalogs(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_CATALOGS); |
| req.__set_get_catalogs_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_CATALOGS); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetCatalogs(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetSchemas(TGetSchemasResp& return_val, |
| const TGetSchemasReq& request) { |
| VLOG_QUERY << "GetSchemas(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_SCHEMAS); |
| req.__set_get_schemas_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_SCHEMAS); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetSchemas(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetTables(TGetTablesResp& return_val, |
| const TGetTablesReq& request) { |
| VLOG_QUERY << "GetTables(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_TABLES); |
| req.__set_get_tables_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_TABLES); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetTables(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val, |
| const TGetTableTypesReq& request) { |
| VLOG_QUERY << "GetTableTypes(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_TABLE_TYPES); |
| req.__set_get_table_types_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_TABLE_TYPES); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetTableTypes(): return_val=" << ThriftDebugString(return_val); |
| |
| } |
| |
| void ImpalaServer::GetColumns(TGetColumnsResp& return_val, |
| const TGetColumnsReq& request) { |
| VLOG_QUERY << "GetColumns(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_COLUMNS); |
| req.__set_get_columns_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_COLUMNS); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetColumns(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val, |
| const TGetFunctionsReq& request) { |
| VLOG_QUERY << "GetFunctions(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_FUNCTIONS); |
| req.__set_get_functions_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| handle.__set_operationType(TOperationType::GET_FUNCTIONS); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetFunctions(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetPrimaryKeys(TGetPrimaryKeysResp& return_val, |
| const TGetPrimaryKeysReq& request) { |
| VLOG_QUERY << "GetPrimaryKeys(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_PRIMARY_KEYS); |
| req.__set_get_primary_keys_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetPrimaryKeys(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetCrossReference(TGetCrossReferenceResp& return_val, |
| const TGetCrossReferenceReq& request) { |
| VLOG_QUERY << "GetCrossReference(): request=" << ThriftDebugString(request); |
| HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); |
| |
| TMetadataOpRequest req; |
| req.__set_opcode(TMetadataOpcode::GET_CROSS_REFERENCE); |
| req.__set_get_cross_reference_req(request); |
| |
| TOperationHandle handle; |
| thrift::TStatus status; |
| ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status); |
| return_val.__set_operationHandle(handle); |
| return_val.__set_status(status); |
| |
| VLOG_QUERY << "GetCrossReference(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val, |
| const TGetOperationStatusReq& request) { |
| if (request.operationHandle.operationId.guid.size() == 0) { |
| // An empty operation handle identifier means no execution and no result for this |
| // query (USE <database>). |
| VLOG_ROW << "GetOperationStatus(): guid size 0"; |
| return_val.operationState = TOperationState::FINISHED_STATE; |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| return; |
| } |
| |
| // Secret is inherited from session. |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id); |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state.get() == nullptr)) { |
| // No handle was found |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession( |
| session_id, SecretArg::Operation(op_secret, query_id), &session), |
| SQLSTATE_GENERAL_ERROR); |
| |
| { |
| lock_guard<mutex> l(*request_state->lock()); |
| TOperationState::type operation_state = request_state->operation_state(); |
| return_val.__set_operationState(operation_state); |
| if (operation_state == TOperationState::ERROR_STATE) { |
| DCHECK(!request_state->query_status().ok()); |
| return_val.__set_errorMessage(request_state->query_status().GetDetail()); |
| return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR); |
| } else { |
| DCHECK(request_state->query_status().ok()); |
| } |
| } |
| } |
| |
| void ImpalaServer::CancelOperation(TCancelOperationResp& return_val, |
| const TCancelOperationReq& request) { |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id); |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state.get() == nullptr)) { |
| // No handle was found |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)), |
| SQLSTATE_GENERAL_ERROR); |
| HS2_RETURN_IF_ERROR(return_val, CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::CloseOperation(TCloseOperationResp& return_val, |
| const TCloseOperationReq& request) { |
| TCloseImpalaOperationReq request2; |
| request2.operationHandle = request.operationHandle; |
| TCloseImpalaOperationResp tmp_resp; |
| CloseImpalaOperation(tmp_resp, request2); |
| return_val.status = tmp_resp.status; |
| } |
| |
| void ImpalaServer::CloseImpalaOperation(TCloseImpalaOperationResp& return_val, |
| const TCloseImpalaOperationReq& request) { |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id); |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state.get() == nullptr)) { |
| // No handle was found |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)), |
| SQLSTATE_GENERAL_ERROR); |
| if (request_state->stmt_type() == TStmtType::DML) { |
| Status query_status; |
| if (request_state->GetDmlStats(&return_val.dml_result, &query_status)) { |
| return_val.__isset.dml_result = true; |
| } |
| } |
| |
| // TODO: use timeout to get rid of unwanted request_state. |
| HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true), |
| SQLSTATE_GENERAL_ERROR); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val, |
| const TGetResultSetMetadataReq& request) { |
| // Convert Operation id to TUniqueId and get the query exec state. |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id); |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state.get() == nullptr)) { |
| VLOG_QUERY << "GetResultSetMetadata(): invalid query handle"; |
| // No handle was found |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)), |
| SQLSTATE_GENERAL_ERROR); |
| { |
| lock_guard<mutex> l(*request_state->lock()); |
| |
| // Convert TResultSetMetadata to TGetResultSetMetadataResp |
| const TResultSetMetadata* result_set_md = request_state->result_metadata(); |
| DCHECK(result_set_md != NULL); |
| if (result_set_md->columns.size() > 0) { |
| return_val.__isset.schema = true; |
| return_val.schema.columns.resize(result_set_md->columns.size()); |
| for (int i = 0; i < result_set_md->columns.size(); ++i) { |
| return_val.schema.columns[i].__set_columnName( |
| result_set_md->columns[i].columnName); |
| return_val.schema.columns[i].position = i; |
| return_val.schema.columns[i].typeDesc.types.resize(1); |
| ColumnType t = ColumnType::FromThrift(result_set_md->columns[i].columnType); |
| return_val.schema.columns[i].typeDesc.types[0] = t.ToHs2Type(); |
| } |
| } |
| } |
| |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| VLOG_QUERY << "GetResultSetMetadata(): return_val=" << ThriftDebugString(return_val); |
| } |
| |
| void ImpalaServer::FetchResults(TFetchResultsResp& return_val, |
| const TFetchResultsReq& request) { |
| if (request.orientation != TFetchOrientation::FETCH_NEXT |
| && request.orientation != TFetchOrientation::FETCH_FIRST) { |
| HS2_RETURN_ERROR(return_val, "Unsupported operation", |
| SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED); |
| } |
| bool fetch_first = request.orientation == TFetchOrientation::FETCH_FIRST; |
| |
| // Convert Operation id to TUniqueId and get the query exec state. |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id) |
| << " fetch_size=" << request.maxRows; |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state == nullptr)) { |
| string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id)); |
| VLOG(1) << err_msg; |
| HS2_RETURN_ERROR(return_val, err_msg, SQLSTATE_GENERAL_ERROR); |
| } |
| |
| // Validate the secret and keep the session that originated the query alive. |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession( |
| session_id, SecretArg::Operation(op_secret, query_id), &session), |
| SQLSTATE_GENERAL_ERROR); |
| |
| int32_t num_results = 0; |
| Status status = FetchInternal(request_state.get(), session.get(), request.maxRows, |
| fetch_first, &return_val, &num_results); |
| |
| VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id) |
| << " #results=" << num_results |
| << " has_more=" << (return_val.hasMoreRows ? "true" : "false"); |
| if (!status.ok()) { |
| // Only unregister the query if the underlying error is unrecoverable. |
| // Clients are expected to understand that a failed FETCH_FIRST is recoverable, |
| // and hence, the query must eventually be closed by the client. |
| // It is important to ensure FETCH_NEXT does not return recoverable errors to |
| // preserve compatibility with clients written against Impala versions < 1.3. |
| if (status.IsRecoverableError()) { |
| DCHECK(fetch_first); |
| } else { |
| discard_result(UnregisterQuery(query_id, false, &status)); |
| } |
| HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| } |
| |
| void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { |
| TUniqueId query_id; |
| TUniqueId op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| |
| shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); |
| if (UNLIKELY(request_state.get() == nullptr)) { |
| // No handle was found |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| // GetLog doesn't have an associated session handle, so we presume that this request |
| // should keep alive the same session that orignated the query. |
| ScopedSessionState session_handle(this); |
| const TUniqueId session_id = request_state->session_id(); |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)), |
| SQLSTATE_GENERAL_ERROR); |
| |
| stringstream ss; |
| Coordinator* coord = request_state->GetCoordinator(); |
| if (coord != nullptr) { |
| // Report progress |
| ss << coord->progress().ToString() << "\n"; |
| } |
| // Report the query status, if the query failed. |
| { |
| // Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is |
| // guaranteed to see the error query_status. |
| lock_guard<mutex> l(*request_state->lock()); |
| Status query_status = request_state->query_status(); |
| DCHECK_EQ(request_state->operation_state() == TOperationState::ERROR_STATE, |
| !query_status.ok()); |
| // If the query status is !ok, include the status error message at the top of the log. |
| if (!query_status.ok()) ss << query_status.GetDetail(); |
| } |
| |
| // Report analysis errors |
| ss << join(request_state->GetAnalysisWarnings(), "\n"); |
| // Report queuing reason if the admission controller queued the query. |
| const string* admission_result = request_state->summary_profile()->GetInfoString( |
| AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT); |
| if (admission_result != nullptr) { |
| if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) { |
| ss << AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT << " : " |
| << *admission_result << "\n"; |
| const string* queued_reason = request_state->summary_profile()->GetInfoString( |
| AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON); |
| if (queued_reason != nullptr) { |
| ss << AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON << " : " |
| << *queued_reason << "\n"; |
| } |
| } |
| } |
| if (coord != nullptr) { |
| // Report execution errors |
| ss << coord->GetErrorLog(); |
| } |
| return_val.log = ss.str(); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val, |
| const TGetExecSummaryReq& request) { |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.sessionHandle.sessionId, &session_id, &secret), |
| SQLSTATE_GENERAL_ERROR); |
| ScopedSessionState session_handle(this); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession( |
| session_id, SecretArg::Session(secret), &session), |
| SQLSTATE_GENERAL_ERROR); |
| if (session == NULL) { |
| HS2_RETURN_ERROR(return_val, Substitute("Invalid session id: $0", |
| PrintId(session_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| TUniqueId query_id, op_secret; |
| HS2_RETURN_IF_ERROR(return_val, |
| THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| |
| TExecSummary summary; |
| Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &summary); |
| HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); |
| return_val.__set_summary(summary); |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::GetRuntimeProfile( |
| TGetRuntimeProfileResp& return_val, const TGetRuntimeProfileReq& request) { |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR); |
| ScopedSessionState session_handle(this); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession( |
| session_id, SecretArg::Session(secret), &session), |
| SQLSTATE_GENERAL_ERROR); |
| if (session == NULL) { |
| HS2_RETURN_ERROR(return_val, Substitute("Invalid session id: $0", |
| PrintId(session_id)), SQLSTATE_GENERAL_ERROR); |
| } |
| |
| TUniqueId query_id, op_secret; |
| HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( |
| request.operationHandle.operationId, &query_id, &op_secret), |
| SQLSTATE_GENERAL_ERROR); |
| |
| stringstream ss; |
| TRuntimeProfileTree thrift_profile; |
| rapidjson::Document json_profile(rapidjson::kObjectType); |
| HS2_RETURN_IF_ERROR(return_val, |
| GetRuntimeProfileOutput(query_id, GetEffectiveUser(*session), request.format, &ss, |
| &thrift_profile, &json_profile), |
| SQLSTATE_GENERAL_ERROR); |
| if (request.format == TRuntimeProfileFormat::THRIFT) { |
| return_val.__set_thrift_profile(thrift_profile); |
| } else if (request.format == TRuntimeProfileFormat::JSON) { |
| rapidjson::StringBuffer sb; |
| rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb); |
| json_profile.Accept(writer); |
| ss << sb.GetString(); |
| return_val.__set_profile(ss.str()); |
| } else { |
| DCHECK(request.format == TRuntimeProfileFormat::STRING |
| || request.format == TRuntimeProfileFormat::BASE64); |
| return_val.__set_profile(ss.str()); |
| } |
| return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); |
| } |
| |
| void ImpalaServer::GetDelegationToken(TGetDelegationTokenResp& return_val, |
| const TGetDelegationTokenReq& req) { |
| return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| return_val.status.__set_errorMessage("Not implemented"); |
| } |
| |
| void ImpalaServer::CancelDelegationToken(TCancelDelegationTokenResp& return_val, |
| const TCancelDelegationTokenReq& req) { |
| return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| return_val.status.__set_errorMessage("Not implemented"); |
| } |
| |
| void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val, |
| const TRenewDelegationTokenReq& req) { |
| return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); |
| return_val.status.__set_errorMessage("Not implemented"); |
| } |
| |
| void ImpalaServer::AddSessionToConnection( |
| const TUniqueId& session_id, SessionState* session) { |
| const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId(); |
| { |
| boost::lock_guard<boost::mutex> l(connection_to_sessions_map_lock_); |
| connection_to_sessions_map_[connection_id].insert(session_id); |
| } |
| |
| boost::lock_guard<boost::mutex> session_lock(session->lock); |
| if (session->connections.empty()) { |
| // This session was previously disconnected but now has an associated |
| // connection. It should no longer be considered for the disconnected timeout. |
| UnregisterSessionTimeout(FLAGS_disconnected_session_timeout); |
| } |
| session->connections.insert(connection_id); |
| } |
| |
| void ImpalaServer::PingImpalaHS2Service(TPingImpalaHS2ServiceResp& return_val, |
| const TPingImpalaHS2ServiceReq& req) { |
| VLOG_QUERY << "PingImpalaHS2Service(): request=" << ThriftDebugString(req); |
| TUniqueId session_id; |
| TUniqueId secret; |
| HS2_RETURN_IF_ERROR(return_val, |
| THandleIdentifierToTUniqueId(req.sessionHandle.sessionId, &session_id, &secret), |
| SQLSTATE_GENERAL_ERROR); |
| ScopedSessionState session_handle(this); |
| shared_ptr<SessionState> session; |
| HS2_RETURN_IF_ERROR(return_val, |
| session_handle.WithSession(session_id, SecretArg::Session(secret), &session), |
| SQLSTATE_GENERAL_ERROR); |
| if (session == NULL) { |
| HS2_RETURN_ERROR(return_val, |
| Substitute("Invalid session id: $0", PrintId(session_id)), |
| SQLSTATE_GENERAL_ERROR); |
| } |
| |
| return_val.__set_version(GetVersionString(true)); |
| return_val.__set_webserver_address(ExecEnv::GetInstance()->webserver()->url()); |
| VLOG_RPC << "PingImpalaHS2Service(): return_val=" << ThriftDebugString(return_val); |
| } |
| } |