blob: 829b79225e0915a4b219b3ba7ccb964df58887dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <mutex>
#include "common/status.h"
#include "gen-cpp/ErrorCodes_types.h"
#include "gen-cpp/Query_types.h"
#include "gen-cpp/Types_types.h"
#include "rpc/thrift-server.h"
#include "runtime/query-driver.h"
#include "service/client-request-state.h"
#include "service/impala-server.h"
#include "util/debug-util.h"
#include "util/uid-util.h"
using namespace std;
namespace impala {
Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session_id,
const TQueryOptions& query_opts) {
shared_ptr<ThriftServer::ConnectionContext> conn_ctx =
make_shared<ThriftServer::ConnectionContext>();
conn_ctx->connection_id = RandomUniqueID();
conn_ctx->server_name = ImpalaServer::INTERNAL_SERVER_NAME;
conn_ctx->username = user_name;
conn_ctx->network_address.hostname = "in-memory.localhost";
ConnectionStart(*conn_ctx.get());
{
lock_guard<mutex> l(connection_to_sessions_map_lock_);
new_session_id = *connection_to_sessions_map_[conn_ctx->connection_id].cbegin();
}
{
lock_guard<mutex> l(internal_server_connections_lock_);
internal_server_connections_.insert(make_pair(new_session_id, conn_ctx));
}
shared_ptr<ImpalaServer::SessionState> session_state;
{
lock_guard<mutex> l(session_state_map_lock_);
session_state = session_state_map_[new_session_id];
std::map<string, string> query_opts_map;
TQueryOptionsToMap(query_opts, &query_opts_map);
for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend(); iter++) {
if (!iter->second.empty()) {
RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second,
&session_state->set_query_options, &session_state->set_query_options_mask));
}
}
}
MarkSessionActive(session_state);
return Status::OK();
} // ImpalaServer::OpenSession
bool ImpalaServer::CloseSession(const TUniqueId& session_id) {
{
lock_guard<mutex> l(session_state_map_lock_);
auto iter = session_state_map_.find(session_id);
if (iter == session_state_map_.end()) {
return false;
}
MarkSessionInactive(iter->second);
}
{
lock_guard<mutex> l(internal_server_connections_lock_, adopt_lock);
internal_server_connections_lock_.lock();
const auto iter = internal_server_connections_.find(session_id);
if (iter != internal_server_connections_.end()) {
internal_server_connections_lock_.unlock();
ConnectionEnd(*iter->second.get());
internal_server_connections_lock_.lock();
internal_server_connections_.erase(iter);
}
}
return true;
} // ImpalaServer::CloseSession
Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const string& sql,
const TQueryOptions& query_opts, const bool persist_in_db, TUniqueId* query_id) {
TUniqueId session_id;
TUniqueId internal_query_id;
Status result;
result = SubmitAndWait(user_name, sql, session_id, internal_query_id, query_opts,
persist_in_db);
if (query_id != nullptr) {
*query_id = internal_query_id;
}
if (!UUIDEmpty(internal_query_id)) {
CloseQuery(internal_query_id);
}
CloseSession(session_id);
return result;
} //ImpalaServer::ExecuteIgnoreResults
Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name,
const std::string& sql, query_results& results, results_columns* columns,
TUniqueId* query_id){
TUniqueId session_id;
TUniqueId internal_query_id;
Status result;
result = SubmitAndWait(user_name, sql, session_id, internal_query_id, TQueryOptions());
if (query_id != nullptr) {
*query_id = internal_query_id;
}
if (result.ok()) {
result = FetchAllRows(internal_query_id, results, columns);
}
if (!UUIDEmpty(internal_query_id)) {
CloseQuery(internal_query_id);
}
CloseSession(session_id);
return result;
} // ImpalaServer::ExecuteAndFetchAllText
Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql,
TUniqueId& new_session_id, TUniqueId& new_query_id, const TQueryOptions& query_opts,
const bool persist_in_db) {
RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts));
RETURN_IF_ERROR(SubmitQuery(sql, new_session_id, new_query_id, persist_in_db));
return WaitForResults(new_query_id);
} // ImpalaServer::SubmitAndWait
Status ImpalaServer::WaitForResults(TUniqueId& query_id) {
QueryHandle query_handle;
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
RETURN_IF_ERROR(query_handle->WaitAsync());
int64_t block_wait_time;
bool timed_out;
RETURN_IF_ERROR(WaitForResults(query_handle->query_id(), &query_handle,
&block_wait_time, &timed_out));
query_id = query_handle->query_id();
if (timed_out) {
return Status::Expected("query timed out waiting for results");
}
return Status::OK();
} // ImpalaServer::WaitForResults
Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id,
TUniqueId& new_query_id, const bool persist_in_db) {
// build a query context
TQueryCtx query_context;
query_context.client_request.stmt = sql;
// locate the previously opened session
shared_ptr<SessionState> session_state;
{
lock_guard<mutex> l(session_state_map_lock_);
const auto iter = session_state_map_.find(session_id);
if (iter == session_state_map_.end()) {
return Status::Expected(TErrorCode::GENERAL, PrintId(session_id));
}
session_state = iter->second;
}
session_state->ToThrift(session_state->session_id, &query_context.session);
QueryOptionsMask set_query_options_mask;
query_context.client_request.query_options = session_state->QueryOptions();
set_query_options_mask = session_state->set_query_options_mask;
AddPoolConfiguration(&query_context, ~set_query_options_mask);
QueryHandle query_handle;
RETURN_IF_ERROR(Execute(&query_context, session_state, &query_handle, nullptr,
persist_in_db));
new_query_id = query_handle->query_id();
return SetQueryInflight(session_state, query_handle);
} // ImpalaServer::SubmitQuery
Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& results,
results_columns* columns) {
QueryResultSet* result_set;
const TResultSetMetadata* results_metadata;
vector<string> row_set;
QueryHandle query_handle;
RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
{
lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
lock_guard<mutex> l2(*query_handle->lock());
if (query_handle->num_rows_fetched() == 0) {
query_handle->set_fetched_rows();
}
results_metadata = query_handle->result_metadata();
// populate column vector if provided by the user
if (columns != nullptr) {
for (int i=0; i<results_metadata->columns.size(); i++) {
// TODO: As of today, the ODBC driver does not support boolean and timestamp data
// type but it should. This is tracked by ODBC-189. We should verify that our
// boolean and timestamp type are correctly recognized when ODBC-189 is closed.
// TODO: Handle complex types.
const TColumnType& type = results_metadata->columns[i].columnType;
columns->emplace_back(make_pair(results_metadata->columns[i].columnName,
ColumnTypeToBeeswaxTypeString(type)));
}
}
result_set = QueryResultSet::CreateAsciiQueryResultSet(
*results_metadata, &row_set, true);
}
int64_t block_wait_time = 30000000;
while (!query_handle->eos()) {
lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
lock_guard<mutex> l2(*query_handle->lock());
row_set.clear();
RETURN_IF_ERROR(query_handle->FetchRows(10, result_set, block_wait_time));
results->insert(results->cend(), row_set.cbegin(), row_set.cend());
}
return Status::OK();
} // ImpalaServer::FetchAllRows
void ImpalaServer::CloseQuery(const TUniqueId& query_id) {
QueryHandle query_handle;
Status found = GetActiveQueryHandle(query_id, &query_handle);
if (!found.ok()) {
return;
}
UnregisterQueryDiscardResult(query_handle->query_id(), false);
} // ImpalaServer::CloseQuery
void ImpalaServer::GetConnectionContextList(
ThriftServer::ConnectionContextList* connection_contexts) {
lock_guard<mutex> l(internal_server_connections_lock_);
for(auto iter = internal_server_connections_.cbegin();
iter != internal_server_connections_.cend(); iter++) {
connection_contexts->push_back(iter->second);
}
} // ImpalaServer::GetConnectionContextList
} // namespace impala