// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <mutex>

#include "common/status.h"
#include "gen-cpp/ErrorCodes_types.h"
#include "gen-cpp/Query_types.h"
#include "gen-cpp/Types_types.h"
#include "rpc/thrift-server.h"
#include "runtime/query-driver.h"
#include "service/client-request-state.h"
#include "service/impala-server.h"
#include "util/debug-util.h"
#include "util/uid-util.h"

using namespace std;

namespace impala {

Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session_id,
    const TQueryOptions& query_opts) {
  shared_ptr<ThriftServer::ConnectionContext> conn_ctx =
      make_shared<ThriftServer::ConnectionContext>();
  conn_ctx->connection_id = RandomUniqueID();
  conn_ctx->server_name = ImpalaServer::INTERNAL_SERVER_NAME;
  conn_ctx->username = user_name;
  conn_ctx->network_address.hostname = "in-memory.localhost";

  ConnectionStart(*conn_ctx.get());

  {
    lock_guard<mutex> l(connection_to_sessions_map_lock_);
    new_session_id = *connection_to_sessions_map_[conn_ctx->connection_id].cbegin();
  }

  {
    lock_guard<mutex> l(internal_server_connections_lock_);
    internal_server_connections_.insert(make_pair(new_session_id, conn_ctx));
  }

  shared_ptr<ImpalaServer::SessionState> session_state;
  {
    lock_guard<mutex> l(session_state_map_lock_);
    session_state = session_state_map_[new_session_id];
    std::map<string, string> query_opts_map;
    TQueryOptionsToMap(query_opts, &query_opts_map);
    for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend(); iter++) {
      if (!iter->second.empty()) {
        RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second,
        &session_state->set_query_options, &session_state->set_query_options_mask));
      }
    }
  }

  MarkSessionActive(session_state);

  return Status::OK();
} // ImpalaServer::OpenSession

bool ImpalaServer::CloseSession(const TUniqueId& session_id) {
  {
    lock_guard<mutex> l(session_state_map_lock_);

    auto iter = session_state_map_.find(session_id);
    if (iter == session_state_map_.end()) {
      return false;
    }

    MarkSessionInactive(iter->second);
  }

  {
    lock_guard<mutex> l(internal_server_connections_lock_, adopt_lock);
    internal_server_connections_lock_.lock();

    const auto iter = internal_server_connections_.find(session_id);
    if (iter != internal_server_connections_.end()) {
      internal_server_connections_lock_.unlock();
      ConnectionEnd(*iter->second.get());
      internal_server_connections_lock_.lock();
      internal_server_connections_.erase(iter);
    }
  }

  return true;
} // ImpalaServer::CloseSession

Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const string& sql,
    const TQueryOptions& query_opts, const bool persist_in_db, TUniqueId* query_id) {
  TUniqueId session_id;
  TUniqueId internal_query_id;
  Status result;

  result = SubmitAndWait(user_name, sql, session_id, internal_query_id, query_opts,
      persist_in_db);

  if (query_id != nullptr) {
    *query_id = internal_query_id;
  }

  if (!UUIDEmpty(internal_query_id)) {
    CloseQuery(internal_query_id);
  }

  CloseSession(session_id);

  return result;
} //ImpalaServer::ExecuteIgnoreResults

Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name,
    const std::string& sql, query_results& results, results_columns* columns,
    TUniqueId* query_id){
  TUniqueId session_id;
  TUniqueId internal_query_id;
  Status result;

  result = SubmitAndWait(user_name, sql, session_id, internal_query_id, TQueryOptions());

  if (query_id != nullptr) {
    *query_id = internal_query_id;
  }

  if (result.ok()) {
    result = FetchAllRows(internal_query_id, results, columns);
  }

  if (!UUIDEmpty(internal_query_id)) {
    CloseQuery(internal_query_id);
  }

  CloseSession(session_id);

  return result;
} // ImpalaServer::ExecuteAndFetchAllText

Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql,
    TUniqueId& new_session_id, TUniqueId& new_query_id, const TQueryOptions& query_opts,
    const bool persist_in_db) {

  RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts));
  RETURN_IF_ERROR(SubmitQuery(sql, new_session_id, new_query_id, persist_in_db));

  return WaitForResults(new_query_id);
} // ImpalaServer::SubmitAndWait

Status ImpalaServer::WaitForResults(TUniqueId& query_id) {
  QueryHandle query_handle;
  RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));

  RETURN_IF_ERROR(query_handle->WaitAsync());

  int64_t block_wait_time;
  bool timed_out;
  RETURN_IF_ERROR(WaitForResults(query_handle->query_id(), &query_handle,
      &block_wait_time, &timed_out));
  query_id = query_handle->query_id();

  if (timed_out) {
    return Status::Expected("query timed out waiting for results");
  }

  return Status::OK();
} // ImpalaServer::WaitForResults

Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id,
    TUniqueId& new_query_id, const bool persist_in_db) {

  // build a query context
  TQueryCtx query_context;
  query_context.client_request.stmt = sql;

  // locate the previously opened session
  shared_ptr<SessionState> session_state;
  {
    lock_guard<mutex> l(session_state_map_lock_);

    const auto iter = session_state_map_.find(session_id);
    if (iter == session_state_map_.end()) {
      return Status::Expected(TErrorCode::GENERAL, PrintId(session_id));
    }

    session_state = iter->second;
  }

  session_state->ToThrift(session_state->session_id, &query_context.session);

  QueryOptionsMask set_query_options_mask;
  query_context.client_request.query_options = session_state->QueryOptions();
  set_query_options_mask = session_state->set_query_options_mask;

  AddPoolConfiguration(&query_context, ~set_query_options_mask);

  QueryHandle query_handle;
  RETURN_IF_ERROR(Execute(&query_context, session_state, &query_handle, nullptr,
      persist_in_db));
  new_query_id = query_handle->query_id();

  return SetQueryInflight(session_state, query_handle);
} // ImpalaServer::SubmitQuery

Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& results,
    results_columns* columns) {
  QueryResultSet* result_set;
  const TResultSetMetadata* results_metadata;
  vector<string> row_set;

  QueryHandle query_handle;
  RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));

  {
    lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
    lock_guard<mutex> l2(*query_handle->lock());

    if (query_handle->num_rows_fetched() == 0) {
      query_handle->set_fetched_rows();
    }

    results_metadata = query_handle->result_metadata();

    // populate column vector if provided by the user
    if (columns != nullptr) {
      for (int i=0; i<results_metadata->columns.size(); i++) {
        // TODO: As of today, the ODBC driver does not support boolean and timestamp data
        // type but it should. This is tracked by ODBC-189. We should verify that our
        // boolean and timestamp type are correctly recognized when ODBC-189 is closed.
        // TODO: Handle complex types.
        const TColumnType& type = results_metadata->columns[i].columnType;
        columns->emplace_back(make_pair(results_metadata->columns[i].columnName,
            ColumnTypeToBeeswaxTypeString(type)));
      }
    }

    result_set = QueryResultSet::CreateAsciiQueryResultSet(
        *results_metadata, &row_set, true);
  }

  int64_t block_wait_time = 30000000;
  while (!query_handle->eos()) {
    lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
    lock_guard<mutex> l2(*query_handle->lock());

    row_set.clear();

    RETURN_IF_ERROR(query_handle->FetchRows(10, result_set, block_wait_time));
    results->insert(results->cend(), row_set.cbegin(), row_set.cend());
  }

  return Status::OK();
} // ImpalaServer::FetchAllRows

void ImpalaServer::CloseQuery(const TUniqueId& query_id) {
  QueryHandle query_handle;
  Status found = GetActiveQueryHandle(query_id, &query_handle);

  if (!found.ok()) {
    return;
  }

  UnregisterQueryDiscardResult(query_handle->query_id(), false);
} // ImpalaServer::CloseQuery

void ImpalaServer::GetConnectionContextList(
    ThriftServer::ConnectionContextList* connection_contexts) {
  lock_guard<mutex> l(internal_server_connections_lock_);

  for(auto iter = internal_server_connections_.cbegin();
      iter != internal_server_connections_.cend(); iter++) {
    connection_contexts->push_back(iter->second);
  }
} // ImpalaServer::GetConnectionContextList

} // namespace impala
