blob: 4304ce3b5a7a428f1a957b6d61e72cefe6ffe8f8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/impala-server.h"
#include "common/logging.h"
#include "gen-cpp/Frontend_types.h"
#include "rpc/thrift-util.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/raw-value.inline.h"
#include "runtime/timestamp-value.h"
#include "service/client-request-state.h"
#include "service/frontend.h"
#include "service/query-options.h"
#include "service/query-result-set.h"
#include "util/auth-util.h"
#include "util/webserver.h"
#include "util/runtime-profile.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using namespace apache::thrift;
using namespace apache::hive::service::cli::thrift;
using namespace beeswax;
#define RAISE_IF_ERROR(stmt, ex_type) \
do { \
const Status& _status = (stmt); \
if (UNLIKELY(!_status.ok())) { \
RaiseBeeswaxException(_status.GetDetail(), ex_type); \
} \
} while (false)
namespace impala {
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
VLOG_QUERY << "query(): query=" << query.query;
RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(
session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
SQLSTATE_GENERAL_ERROR);
TQueryCtx query_ctx;
// raise general error for request conversion error;
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
// raise Syntax error or access violation; it's likely to be syntax/analysis error
// TODO: that may not be true; fix this
shared_ptr<ClientRequestState> request_state;
RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
// start thread to wait for results to become available, which will allow
// us to advance query state to FINISHED or EXCEPTION
Status status = request_state->WaitAsync();
if (!status.ok()) {
discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
status = SetQueryInflight(session, request_state);
if (!status.ok()) {
discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
}
void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
const LogContextId& client_ctx) {
VLOG_QUERY << "executeAndWait(): query=" << query.query;
RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(
session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
SQLSTATE_GENERAL_ERROR);
TQueryCtx query_ctx;
// raise general error for request conversion error;
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
shared_ptr<ClientRequestState> request_state;
DCHECK(session != nullptr); // The session should exist.
{
// The session is created when the client connects. Depending on the underlying
// transport, the username may be known at that time. If the username hasn't been set
// yet, set it now.
lock_guard<mutex> l(session->lock);
if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
}
// raise Syntax error or access violation; it's likely to be syntax/analysis error
// TODO: that may not be true; fix this
RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
Status status = SetQueryInflight(session, request_state);
if (!status.ok()) {
discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
// block until results are ready
request_state->Wait();
{
lock_guard<mutex> l(*request_state->lock());
status = request_state->query_status();
}
if (!status.ok()) {
discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
// If the input log context id is an empty string, then create a new number and
// set it to _return. Otherwise, set _return with the input log context
query_handle.log_context = client_ctx.empty() ? query_handle.id : client_ctx;
}
void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& query) {
// Translate Beeswax Query to Impala's QueryRequest and then set the explain plan bool
// before shipping to FE
VLOG_QUERY << "explain(): query=" << query.query;
RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
TQueryCtx query_ctx;
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
RAISE_IF_ERROR(
exec_env_->frontend()->GetExplainPlan(query_ctx, &query_explanation.textual),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
query_explanation.__isset.textual = true;
VLOG_QUERY << "explain():\nstmt=" << query_ctx.client_request.stmt
<< "\nplan: " << query_explanation.textual;
}
void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle,
const bool start_over, const int32_t fetch_size) {
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
if (start_over) {
// We can't start over. Raise "Optional feature not implemented"
RaiseBeeswaxException(
"Does not support start over", SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
}
TUniqueId query_id;
QueryHandleToTUniqueId(query_handle, &query_id);
VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
VLOG(1) << err_msg;
RaiseBeeswaxException(err_msg, SQLSTATE_GENERAL_ERROR);
}
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
query_id), SQLSTATE_GENERAL_ERROR);
Status status =
FetchInternal(request_state.get(), start_over, fetch_size, &query_results);
VLOG_ROW << "fetch result: #results=" << query_results.data.size()
<< " has_more=" << (query_results.has_more ? "true" : "false");
if (!status.ok()) {
discard_result(UnregisterQuery(query_id, false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
}
// TODO: Handle complex types.
void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
const QueryHandle& handle) {
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
// Convert QueryHandle to TUniqueId and get the query exec state.
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state.get() == nullptr)) {
RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
SQLSTATE_GENERAL_ERROR);
}
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
query_id), SQLSTATE_GENERAL_ERROR);
{
lock_guard<mutex> l(*request_state->lock());
// Convert TResultSetMetadata to Beeswax.ResultsMetadata
const TResultSetMetadata* result_set_md = request_state->result_metadata();
results_metadata.__isset.schema = true;
results_metadata.schema.__isset.fieldSchemas = true;
results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
for (int i = 0; i < results_metadata.schema.fieldSchemas.size(); ++i) {
const TColumnType& type = result_set_md->columns[i].columnType;
DCHECK_EQ(1, type.types.size());
DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
DCHECK(type.types[0].__isset.scalar_type);
TPrimitiveType::type col_type = type.types[0].scalar_type.type;
results_metadata.schema.fieldSchemas[i].__set_type(
TypeToOdbcString(ThriftToType(col_type)));
// Fill column name
results_metadata.schema.fieldSchemas[i].__set_name(
result_set_md->columns[i].columnName);
}
}
// ODBC-187 - ODBC can only take "\t" as the delimiter and ignores whatever is set here.
results_metadata.__set_delim("\t");
// results_metadata.table_dir and in_tablename are not applicable.
}
void ImpalaServer::close(const QueryHandle& handle) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
// Impala-shell and administrative tools can call this from a different connection,
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
// proceed without validating the session/query relation so that workflows don't
// get broken. In future we could check that the users match OR that the user has
// admin priviliges on the server.
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
// TODO: do we need to raise an exception if the query state is EXCEPTION?
// TODO: use timeout to get rid of unwanted request_state.
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
}
beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
VLOG_QUERY << "ImpalaServer::get_state invalid handle";
RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
SQLSTATE_GENERAL_ERROR);
}
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
query_id), SQLSTATE_GENERAL_ERROR);
// Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
beeswax::QueryState::type query_state = request_state->BeeswaxQueryState();
DCHECK_EQ(query_state == beeswax::QueryState::EXCEPTION,
!request_state->query_status().ok());
return query_state;
}
void ImpalaServer::echo(string& echo_string, const string& input_string) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
echo_string = input_string;
}
void ImpalaServer::clean(const LogContextId& log_context) {
}
void ImpalaServer::get_log(string& log, const LogContextId& context) {
shared_ptr<SessionState> session;
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
// LogContextId is the same as QueryHandle.id
QueryHandle handle;
handle.__set_id(context);
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (request_state.get() == nullptr) {
stringstream str;
str << "unknown query id: " << PrintId(query_id);
LOG(ERROR) << str.str();
return;
}
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
query_id), SQLSTATE_GENERAL_ERROR);
stringstream error_log_ss;
{
// Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
DCHECK_EQ(request_state->BeeswaxQueryState() == beeswax::QueryState::EXCEPTION,
!request_state->query_status().ok());
// If the query status is !ok, include the status error message at the top of the log.
if (!request_state->query_status().ok()) {
error_log_ss << request_state->query_status().GetDetail() << "\n";
}
}
// Add warnings from analysis
for (const string& warning : request_state->GetAnalysisWarnings()) {
error_log_ss << warning << "\n";
}
// Add warnings from execution
if (request_state->GetCoordinator() != nullptr) {
const std::string coord_errors = request_state->GetCoordinator()->GetErrorLog();
if (!coord_errors.empty()) error_log_ss << coord_errors << "\n";
}
log = error_log_ss.str();
}
void ImpalaServer::get_default_configuration(vector<ConfigVariable>& configurations,
const bool include_hadoop) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
configurations.insert(configurations.end(), default_configs_.begin(),
default_configs_.end());
}
void ImpalaServer::dump_config(string& config) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
config = "";
}
void ImpalaServer::Cancel(impala::TStatus& tstatus,
const beeswax::QueryHandle& query_handle) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
// Convert QueryHandle to TUniqueId and get the query exec state.
TUniqueId query_id;
QueryHandleToTUniqueId(query_handle, &query_id);
// Impala-shell and administrative tools can call this from a different connection,
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
// proceed without validating the session/query relation so that workflows don't
// get broken. In future we could check that the users match OR that the user has
// admin priviliges on the server.
RAISE_IF_ERROR(CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
tstatus.status_code = TErrorCode::OK;
}
void ImpalaServer::CloseInsert(TDmlResult& dml_result,
const QueryHandle& query_handle) {
ScopedSessionState session_handle(this);
shared_ptr<SessionState> session;
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
TUniqueId query_id;
QueryHandleToTUniqueId(query_handle, &query_id);
VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
// CloseInsertInternal() will validates that 'session' has access to 'query_id'.
Status status = CloseInsertInternal(session.get(), query_id, &dml_result);
if (!status.ok()) {
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
}
// Gets the runtime profile string for the given query handle and stores the result in
// the profile_output parameter. Raises a BeeswaxException if there are any errors
// getting the profile, such as no matching queries found.
void ImpalaServer::GetRuntimeProfile(string& profile_output, const QueryHandle& handle) {
ScopedSessionState session_handle(this);
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
stringstream ss;
shared_ptr<SessionState> session;
RAISE_IF_ERROR(
session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
if (session == NULL) {
ss << Substitute("Invalid session id: $0", PrintId(session_id));
RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
}
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
// GetRuntimeProfile() will validate that the user has access to 'query_id'.
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
Status status = GetRuntimeProfileOutput(
query_id, GetEffectiveUser(*session), TRuntimeProfileFormat::STRING,
&ss, nullptr, nullptr);
if (!status.ok()) {
ss << "GetRuntimeProfile error: " << status.GetDetail();
RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
}
profile_output = ss.str();
}
void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
const beeswax::QueryHandle& handle) {
ScopedSessionState session_handle(this);
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
shared_ptr<SessionState> session;
RAISE_IF_ERROR(
session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
if (session == NULL) {
stringstream ss;
ss << Substitute("Invalid session id: $0", PrintId(session_id));
RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
}
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
// GetExecSummary() will validate that the user has access to 'query_id'.
Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
ScopedSessionState session_handle(this);
RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
SQLSTATE_GENERAL_ERROR);
VLOG_RPC << "PingImpalaService()";
return_val.version = GetVersionString(true);
return_val.webserver_address = ExecEnv::GetInstance()->webserver()->url();
VLOG_RPC << "PingImpalaService(): return_val=" << ThriftDebugString(return_val);
}
void ImpalaServer::ResetCatalog(impala::TStatus& status) {
Status::DEPRECATED_RPC.ToThrift(&status);
}
void ImpalaServer::ResetTable(impala::TStatus& status, const TResetTableReq& request) {
Status::DEPRECATED_RPC.ToThrift(&status);
}
Status ImpalaServer::QueryToTQueryContext(const Query& query,
TQueryCtx* query_ctx) {
query_ctx->client_request.stmt = query.query;
VLOG_QUERY << "query: " << ThriftDebugString(query);
QueryOptionsMask set_query_options_mask;
{
shared_ptr<SessionState> session;
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
// OK to skip secret validation since 'session_id' comes from connection
// and is trusted.
RETURN_IF_ERROR(GetSessionState(session_id, SecretArg::SkipSecretCheck(), &session,
/* mark_active= */ false));
DCHECK(session != nullptr);
{
// The session is created when the client connects. Depending on the underlying
// transport, the username may be known at that time. If the username hasn't been
// set yet, set it now.
lock_guard<mutex> l(session->lock);
if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
query_ctx->client_request.query_options = session->QueryOptions();
set_query_options_mask = session->set_query_options_mask;
}
session->ToThrift(session_id, &query_ctx->session);
}
// Override default query options with Query.Configuration
if (query.__isset.configuration) {
TQueryOptions overlay;
QueryOptionsMask overlay_mask;
for (const string& option: query.configuration) {
RETURN_IF_ERROR(ParseQueryOptions(option, &overlay, &overlay_mask));
}
OverlayQueryOptions(overlay, overlay_mask, &query_ctx->client_request.query_options);
RETURN_IF_ERROR(ValidateQueryOptions(&overlay));
set_query_options_mask |= overlay_mask;
}
// Only query options not set in the session or confOverlay can be overridden by the
// pool options.
AddPoolConfiguration(query_ctx, ~set_query_options_mask);
VLOG_QUERY << "TClientRequest.queryOptions: "
<< ThriftDebugString(query_ctx->client_request.query_options);
return Status::OK();
}
inline void ImpalaServer::TUniqueIdToQueryHandle(const TUniqueId& query_id,
QueryHandle* handle) {
string query_id_str = PrintId(query_id);
handle->__set_id(query_id_str);
handle->__set_log_context(query_id_str);
}
inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
TUniqueId* query_id) {
ParseId(handle.id, query_id);
}
[[noreturn]] void ImpalaServer::RaiseBeeswaxException(
const string& msg, const char* sql_state) {
BeeswaxException exc;
exc.__set_message(msg);
exc.__set_SQLState(sql_state);
throw exc;
}
Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
// Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
// ensures that rows are ready to be fetched (e.g., Wait() opens
// ClientRequestState::output_exprs_, which are evaluated in
// ClientRequestState::FetchRows() below).
int64_t block_on_wait_time_us = 0;
if (!request_state->BlockOnWait(
request_state->fetch_rows_timeout_us(), &block_on_wait_time_us)) {
query_results->__set_ready(false);
query_results->__set_has_more(true);
query_results->__isset.columns = false;
query_results->__isset.data = false;
return Status::OK();
}
lock_guard<mutex> frl(*request_state->fetch_rows_lock());
lock_guard<mutex> l(*request_state->lock());
if (request_state->num_rows_fetched() == 0) {
request_state->set_fetched_rows();
}
// Check for cancellation or an error.
RETURN_IF_ERROR(request_state->query_status());
// ODBC-190: set Beeswax's Results.columns to work around bug ODBC-190;
// TODO: remove the block of code when ODBC-190 is resolved.
const TResultSetMetadata* result_metadata = request_state->result_metadata();
query_results->columns.resize(result_metadata->columns.size());
for (int i = 0; i < result_metadata->columns.size(); ++i) {
// TODO: As of today, the ODBC driver does not support boolean and timestamp data
// type but it should. This is tracked by ODBC-189. We should verify that our
// boolean and timestamp type are correctly recognized when ODBC-189 is closed.
// TODO: Handle complex types.
const TColumnType& type = result_metadata->columns[i].columnType;
DCHECK_EQ(1, type.types.size());
DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
DCHECK(type.types[0].__isset.scalar_type);
TPrimitiveType::type col_type = type.types[0].scalar_type.type;
query_results->columns[i] = TypeToOdbcString(ThriftToType(col_type));
}
query_results->__isset.columns = true;
// Results are always ready because we're blocking.
query_results->__set_ready(true);
// It's likely that ODBC doesn't care about start_row, but Hue needs it. For Hue,
// start_row starts from zero, not one.
query_results->__set_start_row(request_state->num_rows_fetched());
Status fetch_rows_status;
query_results->data.clear();
if (!request_state->eos()) {
scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
*request_state->result_metadata(), &query_results->data));
fetch_rows_status =
request_state->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us);
}
query_results->__set_has_more(!request_state->eos());
query_results->__isset.data = true;
return fetch_rows_status;
}
Status ImpalaServer::CloseInsertInternal(SessionState* session, const TUniqueId& query_id,
TDmlResult* dml_result) {
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
VLOG(1) << err_msg;
return Status::Expected(err_msg);
}
RETURN_IF_ERROR(
CheckClientRequestSession(session, request_state->effective_user(), query_id));
Status query_status;
request_state->GetDmlStats(dml_result, &query_status);
RETURN_IF_ERROR(UnregisterQuery(query_id, true));
return query_status;
}
}