blob: cc508e3d704558740bef2343e27a363dec24c6e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <limits>
#include "ignite/odbc/system/odbc_constants.h"
#include "ignite/odbc/query/batch_query.h"
#include "ignite/odbc/query/data_query.h"
#include "ignite/odbc/query/column_metadata_query.h"
#include "ignite/odbc/query/table_metadata_query.h"
#include "ignite/odbc/query/foreign_keys_query.h"
#include "ignite/odbc/query/primary_keys_query.h"
#include "ignite/odbc/query/type_info_query.h"
#include "ignite/odbc/query/special_columns_query.h"
#include "ignite/odbc/query/streaming_query.h"
#include "ignite/odbc/query/internal_query.h"
#include "ignite/odbc/connection.h"
#include "ignite/odbc/utility.h"
#include "ignite/odbc/message.h"
#include "ignite/odbc/statement.h"
#include "ignite/odbc/log.h"
#include "ignite/odbc/odbc_error.h"
#include "ignite/odbc/sql/sql_utils.h"
#include "ignite/odbc/sql/sql_parser.h"
#include "ignite/odbc/sql/sql_set_streaming_command.h"
namespace ignite
{
namespace odbc
{
Statement::Statement(Connection& parent) :
connection(parent),
columnBindings(),
currentQuery(),
rowsFetched(0),
rowStatuses(0),
columnBindOffset(0),
rowArraySize(1),
parameters(),
timeout(0)
{
// No-op.
}
Statement::~Statement()
{
// No-op.
}
void Statement::BindColumn(uint16_t columnIdx, int16_t targetType, void* targetValue, SqlLen bufferLength, SqlLen* strLengthOrIndicator)
{
IGNITE_ODBC_API_CALL(InternalBindColumn(columnIdx, targetType, targetValue, bufferLength, strLengthOrIndicator));
}
SqlResult::Type Statement::InternalBindColumn(uint16_t columnIdx, int16_t targetType, void* targetValue, SqlLen bufferLength, SqlLen* strLengthOrIndicator)
{
using namespace type_traits;
OdbcNativeType::Type driverType = ToDriverType(targetType);
if (driverType == OdbcNativeType::AI_UNSUPPORTED)
{
AddStatusRecord(SqlState::SHY003_INVALID_APPLICATION_BUFFER_TYPE, "The argument TargetType was not a valid data type.");
return SqlResult::AI_ERROR;
}
if (bufferLength < 0)
{
AddStatusRecord(SqlState::SHY090_INVALID_STRING_OR_BUFFER_LENGTH,
"The value specified for the argument BufferLength was less than 0.");
return SqlResult::AI_ERROR;
}
if (targetValue || strLengthOrIndicator)
{
app::ApplicationDataBuffer dataBuffer(driverType, targetValue, bufferLength, strLengthOrIndicator);
SafeBindColumn(columnIdx, dataBuffer);
}
else
SafeUnbindColumn(columnIdx);
return SqlResult::AI_SUCCESS;
}
void Statement::SafeBindColumn(uint16_t columnIdx, const app::ApplicationDataBuffer& buffer)
{
columnBindings[columnIdx] = buffer;
}
void Statement::SafeUnbindColumn(uint16_t columnIdx)
{
columnBindings.erase(columnIdx);
}
void Statement::SafeUnbindAllColumns()
{
columnBindings.clear();
}
void Statement::SetColumnBindOffsetPtr(int * ptr)
{
columnBindOffset = ptr;
}
int* Statement::GetColumnBindOffsetPtr()
{
return columnBindOffset;
}
int32_t Statement::GetColumnNumber()
{
int32_t res;
IGNITE_ODBC_API_CALL(InternalGetColumnNumber(res));
return res;
}
SqlResult::Type Statement::InternalGetColumnNumber(int32_t &res)
{
const meta::ColumnMetaVector* meta = GetMeta();
if (!meta)
return SqlResult::AI_ERROR;
res = static_cast<int32_t>(meta->size());
return SqlResult::AI_SUCCESS;
}
void Statement::BindParameter(uint16_t paramIdx, int16_t ioType, int16_t bufferType, int16_t paramSqlType,
SqlUlen columnSize, int16_t decDigits, void* buffer, SqlLen bufferLen, SqlLen* resLen)
{
IGNITE_ODBC_API_CALL(InternalBindParameter(paramIdx, ioType, bufferType, paramSqlType, columnSize,
decDigits, buffer, bufferLen, resLen));
}
SqlResult::Type Statement::InternalBindParameter(uint16_t paramIdx, int16_t ioType, int16_t bufferType,
int16_t paramSqlType, SqlUlen columnSize, int16_t decDigits, void* buffer, SqlLen bufferLen, SqlLen* resLen)
{
using namespace type_traits;
using app::ApplicationDataBuffer;
using app::Parameter;
if (paramIdx == 0)
{
std::stringstream builder;
builder << "The value specified for the argument ParameterNumber was less than 1. [ParameterNumber="
<< paramIdx << ']';
AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, builder.str());
return SqlResult::AI_ERROR;
}
if (ioType != SQL_PARAM_INPUT)
{
std::stringstream builder;
builder << "The value specified for the argument InputOutputType was not SQL_PARAM_INPUT. [ioType="
<< ioType << ']';
AddStatusRecord(SqlState::SHY105_INVALID_PARAMETER_TYPE, builder.str());
return SqlResult::AI_ERROR;
}
if (!IsSqlTypeSupported(paramSqlType))
{
std::stringstream builder;
builder << "Data type is not supported. [typeId=" << paramSqlType << ']';
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, builder.str());
return SqlResult::AI_ERROR;
}
OdbcNativeType::Type driverType = ToDriverType(bufferType);
if (driverType == OdbcNativeType::AI_UNSUPPORTED)
{
std::stringstream builder;
builder << "The argument TargetType was not a valid data type. [TargetType=" << bufferType << ']';
AddStatusRecord(SqlState::SHY003_INVALID_APPLICATION_BUFFER_TYPE, builder.str());
return SqlResult::AI_ERROR;
}
if (!buffer && !resLen)
{
AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER,
"ParameterValuePtr and StrLen_or_IndPtr are both null pointers");
return SqlResult::AI_ERROR;
}
ApplicationDataBuffer dataBuffer(driverType, buffer, bufferLen, resLen);
Parameter param(dataBuffer, paramSqlType, columnSize, decDigits);
parameters.BindParameter(paramIdx, param);
return SqlResult::AI_SUCCESS;
}
void Statement::SetAttribute(int attr, void* value, SQLINTEGER valueLen)
{
IGNITE_ODBC_API_CALL(InternalSetAttribute(attr, value, valueLen));
}
SqlResult::Type Statement::InternalSetAttribute(int attr, void* value, SQLINTEGER)
{
switch (attr)
{
case SQL_ATTR_ROW_ARRAY_SIZE:
{
SqlUlen val = reinterpret_cast<SqlUlen>(value);
LOG_MSG("SQL_ATTR_ROW_ARRAY_SIZE: " << val);
if (val < 1)
{
AddStatusRecord(SqlState::SHY092_OPTION_TYPE_OUT_OF_RANGE,
"Array size value can not be less than 1");
return SqlResult::AI_ERROR;
}
rowArraySize = val;
break;
}
case SQL_ATTR_ROW_BIND_TYPE:
{
SqlUlen rowBindType = reinterpret_cast<SqlUlen>(value);
if (rowBindType != SQL_BIND_BY_COLUMN)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Only binding by column is currently supported");
return SqlResult::AI_ERROR;
}
break;
}
case SQL_ATTR_ROWS_FETCHED_PTR:
{
SetRowsFetchedPtr(reinterpret_cast<SQLINTEGER*>(value));
break;
}
case SQL_ATTR_ROW_STATUS_PTR:
{
SetRowStatusesPtr(reinterpret_cast<SQLUSMALLINT*>(value));
break;
}
case SQL_ATTR_PARAM_BIND_TYPE:
{
SqlUlen paramBindType = reinterpret_cast<SqlUlen>(value);
if (paramBindType != SQL_PARAM_BIND_BY_COLUMN)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Only binding by column is currently supported");
return SqlResult::AI_ERROR;
}
break;
}
case SQL_ATTR_PARAM_BIND_OFFSET_PTR:
{
SetParamBindOffsetPtr(reinterpret_cast<int*>(value));
break;
}
case SQL_ATTR_ROW_BIND_OFFSET_PTR:
{
SetColumnBindOffsetPtr(reinterpret_cast<int*>(value));
break;
}
case SQL_ATTR_PARAMSET_SIZE:
{
SqlUlen size = reinterpret_cast<SqlUlen>(value);
if (size > 1 && IsStreamingActive())
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Batching is not supported in streaming mode.");
return SqlResult::AI_ERROR;
}
parameters.SetParamSetSize(size);
break;
}
case SQL_ATTR_PARAMS_PROCESSED_PTR:
{
parameters.SetParamsProcessedPtr(reinterpret_cast<SqlUlen*>(value));
break;
}
case SQL_ATTR_PARAM_STATUS_PTR:
{
parameters.SetParamsStatusPtr(reinterpret_cast<SQLUSMALLINT*>(value));
break;
}
case SQL_ATTR_QUERY_TIMEOUT:
{
SqlUlen uTimeout = reinterpret_cast<SqlUlen>(value);
if (uTimeout > INT32_MAX)
{
timeout = INT32_MAX;
std::stringstream ss;
ss << "Value is too big: " << uTimeout << ", changing to " << timeout << ".";
std::string msg = ss.str();
AddStatusRecord(SqlState::S01S02_OPTION_VALUE_CHANGED, msg);
return SqlResult::AI_SUCCESS_WITH_INFO;
}
timeout = static_cast<int32_t>(uTimeout);
break;
}
default:
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Specified attribute is not supported.");
return SqlResult::AI_ERROR;
}
}
return SqlResult::AI_SUCCESS;
}
void Statement::GetAttribute(int attr, void* buf, SQLINTEGER bufLen, SQLINTEGER* valueLen)
{
IGNITE_ODBC_API_CALL(InternalGetAttribute(attr, buf, bufLen, valueLen));
}
SqlResult::Type Statement::InternalGetAttribute(int attr, void* buf, SQLINTEGER, SQLINTEGER* valueLen)
{
if (!buf)
{
AddStatusRecord("Data buffer is NULL.");
return SqlResult::AI_ERROR;
}
switch (attr)
{
case SQL_ATTR_APP_ROW_DESC:
case SQL_ATTR_APP_PARAM_DESC:
case SQL_ATTR_IMP_ROW_DESC:
case SQL_ATTR_IMP_PARAM_DESC:
{
SQLPOINTER *val = reinterpret_cast<SQLPOINTER*>(buf);
*val = static_cast<SQLPOINTER>(this);
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_ROW_BIND_TYPE:
{
SqlUlen* val = reinterpret_cast<SqlUlen*>(buf);
*val = SQL_BIND_BY_COLUMN;
break;
}
case SQL_ATTR_ROW_ARRAY_SIZE:
{
SQLINTEGER *val = reinterpret_cast<SQLINTEGER*>(buf);
*val = static_cast<SQLINTEGER>(1);
if (valueLen)
*valueLen = SQL_IS_INTEGER;
break;
}
case SQL_ATTR_ROWS_FETCHED_PTR:
{
SqlUlen** val = reinterpret_cast<SqlUlen**>(buf);
*val = reinterpret_cast<SqlUlen*>(GetRowsFetchedPtr());
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_ROW_STATUS_PTR:
{
SQLUSMALLINT** val = reinterpret_cast<SQLUSMALLINT**>(buf);
*val = reinterpret_cast<SQLUSMALLINT*>(GetRowStatusesPtr());
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_PARAM_BIND_TYPE:
{
SqlUlen* val = reinterpret_cast<SqlUlen*>(buf);
*val = SQL_PARAM_BIND_BY_COLUMN;
break;
}
case SQL_ATTR_PARAM_BIND_OFFSET_PTR:
{
SQLULEN** val = reinterpret_cast<SQLULEN**>(buf);
*val = reinterpret_cast<SQLULEN*>(parameters.GetParamBindOffsetPtr());
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_ROW_BIND_OFFSET_PTR:
{
SqlUlen** val = reinterpret_cast<SqlUlen**>(buf);
*val = reinterpret_cast<SqlUlen*>(GetColumnBindOffsetPtr());
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_PARAMSET_SIZE:
{
SqlUlen* val = reinterpret_cast<SqlUlen*>(buf);
*val = static_cast<SqlUlen>(parameters.GetParamSetSize());
if (valueLen)
*valueLen = SQL_IS_UINTEGER;
break;
}
case SQL_ATTR_PARAMS_PROCESSED_PTR:
{
SqlUlen** val = reinterpret_cast<SqlUlen**>(buf);
*val = parameters.GetParamsProcessedPtr();
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_PARAM_STATUS_PTR:
{
SQLUSMALLINT** val = reinterpret_cast<SQLUSMALLINT**>(buf);
*val = parameters.GetParamsStatusPtr();
if (valueLen)
*valueLen = SQL_IS_POINTER;
break;
}
case SQL_ATTR_QUERY_TIMEOUT:
{
SqlUlen *uTimeout = reinterpret_cast<SqlUlen*>(buf);
*uTimeout = static_cast<SqlUlen>(timeout);
break;
}
default:
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Specified attribute is not supported.");
return SqlResult::AI_ERROR;
}
}
return SqlResult::AI_SUCCESS;
}
void Statement::GetParametersNumber(uint16_t& paramNum)
{
IGNITE_ODBC_API_CALL(InternalGetParametersNumber(paramNum));
}
SqlResult::Type Statement::InternalGetParametersNumber(uint16_t& paramNum)
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not prepared.");
return SqlResult::AI_ERROR;
}
if (currentQuery->GetType() != query::QueryType::DATA)
{
paramNum = 0;
return SqlResult::AI_SUCCESS;
}
if (!parameters.IsMetadataSet())
{
SqlResult::Type res = UpdateParamsMeta();
if (res != SqlResult::AI_SUCCESS)
return res;
}
paramNum = parameters.GetExpectedParamNum();
return SqlResult::AI_SUCCESS;
}
void Statement::SetParamBindOffsetPtr(int* ptr)
{
IGNITE_ODBC_API_CALL_ALWAYS_SUCCESS;
parameters.SetParamBindOffsetPtr(ptr);
}
void Statement::GetColumnData(uint16_t columnIdx, app::ApplicationDataBuffer& buffer)
{
IGNITE_ODBC_API_CALL(InternalGetColumnData(columnIdx, buffer));
}
SqlResult::Type Statement::InternalGetColumnData(uint16_t columnIdx,
app::ApplicationDataBuffer& buffer)
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE,
"Cursor is not in the open state.");
return SqlResult::AI_ERROR;
}
SqlResult::Type res = currentQuery->GetColumn(columnIdx, buffer);
return res;
}
void Statement::PrepareSqlQuery(const std::string& query)
{
IGNITE_ODBC_API_CALL(InternalPrepareSqlQuery(query));
}
SqlResult::Type Statement::ProcessInternalCommand(const std::string& query)
{
try
{
SqlParser parser(query);
std::auto_ptr<SqlCommand> cmd = parser.GetNextCommand();
assert(cmd.get() != 0);
parameters.Prepare();
currentQuery.reset(new query::InternalQuery(*this, query, cmd));
return SqlResult::AI_SUCCESS;
}
catch (const OdbcError& err)
{
AddStatusRecord(err);
return SqlResult::AI_ERROR;
}
}
bool Statement::IsStreamingActive() const
{
return connection.GetStreamingContext().IsEnabled();
}
SqlResult::Type Statement::InternalPrepareSqlQuery(const std::string& query)
{
if (sql_utils::IsInternalCommand(query))
return ProcessInternalCommand(query);
// Resetting parameters types as we are changing the query.
parameters.Prepare();
if (IsStreamingActive())
{
if (!currentQuery.get())
currentQuery.reset(new query::StreamingQuery(*this, connection, parameters));
query::StreamingQuery* currentQuery0 = static_cast<query::StreamingQuery*>(currentQuery.get());
currentQuery0->PrepareQuery(query);
return SqlResult::AI_SUCCESS;
}
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::DataQuery(*this, connection, query, parameters, timeout));
return SqlResult::AI_SUCCESS;
}
void Statement::ExecuteSqlQuery(const std::string& query)
{
IGNITE_ODBC_API_CALL(InternalExecuteSqlQuery(query));
}
SqlResult::Type Statement::InternalExecuteSqlQuery(const std::string& query)
{
SqlResult::Type result = InternalPrepareSqlQuery(query);
if (result != SqlResult::AI_SUCCESS)
return result;
return InternalExecuteSqlQuery();
}
void Statement::ExecuteSqlQuery()
{
IGNITE_ODBC_API_CALL(InternalExecuteSqlQuery());
}
SqlResult::Type Statement::InternalExecuteSqlQuery()
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not prepared.");
return SqlResult::AI_ERROR;
}
if (currentQuery->GetType() == query::QueryType::INTERNAL)
{
ProcessInternalQuery();
return SqlResult::AI_SUCCESS;
}
if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == query::QueryType::DATA)
{
query::DataQuery& qry = static_cast<query::DataQuery&>(*currentQuery);
currentQuery.reset(new query::BatchQuery(*this, connection, qry.GetSql(), parameters, timeout));
}
else if (parameters.GetParamSetSize() == 1 && currentQuery->GetType() == query::QueryType::BATCH)
{
query::BatchQuery& qry = static_cast<query::BatchQuery&>(*currentQuery);
currentQuery.reset(new query::DataQuery(*this, connection, qry.GetSql(), parameters, timeout));
}
if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == query::QueryType::STREAMING)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Batching is not supported in streaming mode.");
return SqlResult::AI_ERROR;
}
if (parameters.IsDataAtExecNeeded())
{
if (currentQuery->GetType() == query::QueryType::BATCH ||
currentQuery->GetType() == query::QueryType::STREAMING)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Data-at-execution is not supported with batching.");
return SqlResult::AI_ERROR;
}
return SqlResult::AI_NEED_DATA;
}
return currentQuery->Execute();
}
SqlResult::Type Statement::ProcessInternalQuery()
{
assert(currentQuery->GetType() == query::QueryType::INTERNAL);
query::InternalQuery* qry = static_cast<query::InternalQuery*>(currentQuery.get());
LOG_MSG("Processing internal query: " << qry->GetQuery());
assert(qry->GetCommand().GetType() == SqlCommandType::SET_STREAMING);
SqlSetStreamingCommand& cmd = static_cast<SqlSetStreamingCommand&>(qry->GetCommand());
StopStreaming();
if (!cmd.IsEnabled())
return SqlResult::AI_SUCCESS;
LOG_MSG("Sending start streaming command");
query::DataQuery enablingQuery(*this, connection, qry->GetQuery(), parameters, timeout);
SqlResult::Type res = enablingQuery.Execute();
if (res != SqlResult::AI_SUCCESS)
return res;
LOG_MSG("Preparing streaming context on client");
connection.GetStreamingContext().Enable(cmd);
std::auto_ptr<query::Query> newQry(new query::StreamingQuery(*this, connection, parameters));
std::swap(currentQuery, newQry);
return SqlResult::AI_SUCCESS;
}
void Statement::ExecuteGetColumnsMetaQuery(const std::string& schema,
const std::string& table, const std::string& column)
{
IGNITE_ODBC_API_CALL(InternalExecuteGetColumnsMetaQuery(schema, table, column));
}
SqlResult::Type Statement::InternalExecuteGetColumnsMetaQuery(const std::string& schema,
const std::string& table, const std::string& column)
{
if (currentQuery.get())
currentQuery->Close();
std::string schema0(schema);
if (schema0.empty())
schema0 = connection.GetSchema();
currentQuery.reset(new query::ColumnMetadataQuery(*this,
connection, schema, table, column));
return currentQuery->Execute();
}
void Statement::ExecuteGetTablesMetaQuery(const std::string& catalog,
const std::string& schema, const std::string& table, const std::string& tableType)
{
IGNITE_ODBC_API_CALL(InternalExecuteGetTablesMetaQuery(
catalog, schema, table, tableType));
}
SqlResult::Type Statement::InternalExecuteGetTablesMetaQuery(const std::string& catalog,
const std::string& schema, const std::string& table, const std::string& tableType)
{
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::TableMetadataQuery(*this,
connection, catalog, schema, table, tableType));
return currentQuery->Execute();
}
void Statement::ExecuteGetForeignKeysQuery(const std::string& primaryCatalog,
const std::string& primarySchema, const std::string& primaryTable,
const std::string& foreignCatalog, const std::string& foreignSchema,
const std::string& foreignTable)
{
IGNITE_ODBC_API_CALL(InternalExecuteGetForeignKeysQuery(primaryCatalog,
primarySchema, primaryTable, foreignCatalog, foreignSchema, foreignTable));
}
SqlResult::Type Statement::InternalExecuteGetForeignKeysQuery(const std::string& primaryCatalog,
const std::string& primarySchema, const std::string& primaryTable,
const std::string& foreignCatalog, const std::string& foreignSchema,
const std::string& foreignTable)
{
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::ForeignKeysQuery(*this, connection, primaryCatalog,
primarySchema, primaryTable, foreignCatalog, foreignSchema, foreignTable));
return currentQuery->Execute();
}
void Statement::ExecuteGetPrimaryKeysQuery(const std::string& catalog,
const std::string& schema, const std::string& table)
{
IGNITE_ODBC_API_CALL(InternalExecuteGetPrimaryKeysQuery(catalog, schema, table));
}
SqlResult::Type Statement::InternalExecuteGetPrimaryKeysQuery(const std::string& catalog,
const std::string& schema, const std::string& table)
{
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::PrimaryKeysQuery(*this,
connection, catalog, schema, table));
return currentQuery->Execute();
}
void Statement::ExecuteSpecialColumnsQuery(int16_t type,
const std::string& catalog, const std::string& schema,
const std::string& table, int16_t scope, int16_t nullable)
{
IGNITE_ODBC_API_CALL(InternalExecuteSpecialColumnsQuery(type,
catalog, schema, table, scope, nullable));
}
SqlResult::Type Statement::InternalExecuteSpecialColumnsQuery(int16_t type,
const std::string& catalog, const std::string& schema,
const std::string& table, int16_t scope, int16_t nullable)
{
if (type != SQL_BEST_ROWID && type != SQL_ROWVER)
{
AddStatusRecord(SqlState::SHY097_COLUMN_TYPE_OUT_OF_RANGE,
"An invalid IdentifierType value was specified.");
return SqlResult::AI_ERROR;
}
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::SpecialColumnsQuery(*this, type,
catalog, schema, table, scope, nullable));
return currentQuery->Execute();
}
void Statement::ExecuteGetTypeInfoQuery(int16_t sqlType)
{
IGNITE_ODBC_API_CALL(InternalExecuteGetTypeInfoQuery(sqlType));
}
SqlResult::Type Statement::InternalExecuteGetTypeInfoQuery(int16_t sqlType)
{
if (sqlType != SQL_ALL_TYPES && !type_traits::IsSqlTypeSupported(sqlType))
{
std::stringstream builder;
builder << "Data type is not supported. [typeId=" << sqlType << ']';
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, builder.str());
return SqlResult::AI_ERROR;
}
if (currentQuery.get())
currentQuery->Close();
currentQuery.reset(new query::TypeInfoQuery(*this, sqlType));
return currentQuery->Execute();
}
void Statement::FreeResources(int16_t option)
{
IGNITE_ODBC_API_CALL(InternalFreeResources(option));
}
SqlResult::Type Statement::InternalFreeResources(int16_t option)
{
switch (option)
{
case SQL_DROP:
{
AddStatusRecord("Deprecated, call SQLFreeHandle instead");
return SqlResult::AI_ERROR;
}
case SQL_CLOSE:
{
return InternalClose();
}
case SQL_UNBIND:
{
SafeUnbindAllColumns();
break;
}
case SQL_RESET_PARAMS:
{
parameters.UnbindAll();
break;
}
default:
{
AddStatusRecord(SqlState::SHY092_OPTION_TYPE_OUT_OF_RANGE, "The value specified for the argument Option was invalid");
return SqlResult::AI_ERROR;
}
}
return SqlResult::AI_SUCCESS;
}
void Statement::Close()
{
IGNITE_ODBC_API_CALL(InternalClose());
}
SqlResult::Type Statement::InternalClose()
{
if (!currentQuery.get())
return SqlResult::AI_SUCCESS;
SqlResult::Type result = currentQuery->Close();
return result;
}
SqlResult::Type Statement::StopStreaming()
{
if (!IsStreamingActive())
return SqlResult::AI_SUCCESS;
LOG_MSG("Stopping streaming");
SqlResult::Type result = connection.GetStreamingContext().Disable();
return result;
}
void Statement::FetchScroll(int16_t orientation, int64_t offset)
{
IGNITE_ODBC_API_CALL(InternalFetchScroll(orientation, offset));
}
SqlResult::Type Statement::InternalFetchScroll(int16_t orientation, int64_t offset)
{
UNREFERENCED_PARAMETER(offset);
if (orientation != SQL_FETCH_NEXT)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Only SQL_FETCH_NEXT FetchOrientation type is supported");
return SqlResult::AI_ERROR;
}
return InternalFetchRow();
}
void Statement::FetchRow()
{
IGNITE_ODBC_API_CALL(InternalFetchRow());
}
SqlResult::Type Statement::InternalFetchRow()
{
if (rowsFetched)
*rowsFetched = 0;
if (!currentQuery.get())
{
AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, "Cursor is not in the open state");
return SqlResult::AI_ERROR;
}
if (columnBindOffset)
{
for (app::ColumnBindingMap::iterator it = columnBindings.begin(); it != columnBindings.end(); ++it)
it->second.SetByteOffset(*columnBindOffset);
}
SQLINTEGER fetched = 0;
SQLINTEGER errors = 0;
for (SqlUlen i = 0; i < rowArraySize; ++i)
{
for (app::ColumnBindingMap::iterator it = columnBindings.begin(); it != columnBindings.end(); ++it)
it->second.SetElementOffset(i);
SqlResult::Type res = currentQuery->FetchNextRow(columnBindings);
if (res == SqlResult::AI_SUCCESS || res == SqlResult::AI_SUCCESS_WITH_INFO)
++fetched;
else if (res != SqlResult::AI_NO_DATA)
++errors;
if (rowStatuses)
rowStatuses[i] = SqlResultToRowResult(res);
}
if (rowsFetched)
*rowsFetched = fetched < 0 ? static_cast<SQLINTEGER>(rowArraySize) : fetched;
if (fetched > 0)
return errors == 0 ? SqlResult::AI_SUCCESS : SqlResult::AI_SUCCESS_WITH_INFO;
return errors == 0 ? SqlResult::AI_NO_DATA : SqlResult::AI_ERROR;
}
const meta::ColumnMetaVector* Statement::GetMeta()
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not executed.");
return 0;
}
return currentQuery->GetMeta();
}
bool Statement::DataAvailable() const
{
return currentQuery.get() && currentQuery->DataAvailable();
}
void Statement::MoreResults()
{
IGNITE_ODBC_API_CALL(InternalMoreResults());
}
SqlResult::Type Statement::InternalMoreResults()
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not executed.");
return SqlResult::AI_ERROR;
}
return currentQuery->NextResultSet();
}
void Statement::GetColumnAttribute(uint16_t colIdx, uint16_t attrId,
char* strbuf, int16_t buflen, int16_t* reslen, SqlLen* numbuf)
{
IGNITE_ODBC_API_CALL(InternalGetColumnAttribute(colIdx, attrId,
strbuf, buflen, reslen, numbuf));
}
SqlResult::Type Statement::InternalGetColumnAttribute(uint16_t colIdx, uint16_t attrId, char* strbuf,
int16_t buflen, int16_t* reslen, SqlLen* numbuf)
{
const meta::ColumnMetaVector *meta = GetMeta();
LOG_MSG("Collumn ID: " << colIdx << ", Attribute ID: " << attrId);
if (!meta)
return SqlResult::AI_ERROR;
if (colIdx > meta->size() || colIdx < 1)
{
AddStatusRecord(SqlState::SHY000_GENERAL_ERROR,
"Column index is out of range.", 0, colIdx);
return SqlResult::AI_ERROR;
}
const meta::ColumnMeta& columnMeta = meta->at(colIdx - 1);
bool found = false;
if (numbuf)
found = columnMeta.GetAttribute(attrId, *numbuf);
if (!found)
{
std::string out;
found = columnMeta.GetAttribute(attrId, out);
size_t outSize = out.size();
if (found && strbuf)
outSize = utility::CopyStringToBuffer(out, strbuf, buflen);
if (found && reslen)
*reslen = static_cast<int16_t>(outSize);
}
if (!found)
{
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Unknown attribute.");
return SqlResult::AI_ERROR;
}
return SqlResult::AI_SUCCESS;
}
int64_t Statement::AffectedRows()
{
int64_t rowCnt = 0;
IGNITE_ODBC_API_CALL(InternalAffectedRows(rowCnt));
return rowCnt;
}
SqlResult::Type Statement::InternalAffectedRows(int64_t& rowCnt)
{
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not executed.");
return SqlResult::AI_ERROR;
}
rowCnt = currentQuery->AffectedRows();
return SqlResult::AI_SUCCESS;
}
void Statement::SetRowsFetchedPtr(SQLINTEGER* ptr)
{
rowsFetched = ptr;
}
SQLINTEGER* Statement::GetRowsFetchedPtr()
{
return rowsFetched;
}
void Statement::SetRowStatusesPtr(SQLUSMALLINT* ptr)
{
rowStatuses = ptr;
}
SQLUSMALLINT * Statement::GetRowStatusesPtr()
{
return rowStatuses;
}
void Statement::SelectParam(void** paramPtr)
{
IGNITE_ODBC_API_CALL(InternalSelectParam(paramPtr));
}
SqlResult::Type Statement::InternalSelectParam(void** paramPtr)
{
if (!paramPtr)
{
AddStatusRecord(SqlState::SHY000_GENERAL_ERROR,
"Invalid parameter: ValuePtrPtr is null.");
return SqlResult::AI_ERROR;
}
if (!currentQuery.get())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not prepared.");
return SqlResult::AI_ERROR;
}
app::Parameter *selected = parameters.GetSelectedParameter();
if (selected && !selected->IsDataReady())
{
AddStatusRecord(SqlState::S22026_DATA_LENGTH_MISMATCH,
"Less data was sent for a parameter than was specified with "
"the StrLen_or_IndPtr argument in SQLBindParameter.");
return SqlResult::AI_ERROR;
}
selected = parameters.SelectNextParameter();
if (selected)
{
*paramPtr = selected->GetBuffer().GetData();
return SqlResult::AI_NEED_DATA;
}
SqlResult::Type res = currentQuery->Execute();
if (res != SqlResult::AI_SUCCESS)
res = SqlResult::AI_SUCCESS_WITH_INFO;
return res;
}
void Statement::PutData(void* data, SqlLen len)
{
IGNITE_ODBC_API_CALL(InternalPutData(data, len));
}
SqlResult::Type Statement::InternalPutData(void* data, SqlLen len)
{
if (!data && len != 0 && len != SQL_DEFAULT_PARAM && len != SQL_NULL_DATA)
{
AddStatusRecord(SqlState::SHY009_INVALID_USE_OF_NULL_POINTER,
"Invalid parameter: DataPtr is null StrLen_or_Ind is not 0, "
"SQL_DEFAULT_PARAM, or SQL_NULL_DATA.");
return SqlResult::AI_ERROR;
}
if (!parameters.IsParameterSelected())
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR,
"Parameter is not selected with the SQLParamData.");
return SqlResult::AI_ERROR;
}
app::Parameter* param = parameters.GetSelectedParameter();
if (!param)
{
AddStatusRecord(SqlState::SHY000_GENERAL_ERROR,
"Selected parameter has been unbound.");
return SqlResult::AI_ERROR;
}
param->PutData(data, len);
return SqlResult::AI_SUCCESS;
}
void Statement::DescribeParam(int16_t paramNum, int16_t* dataType,
SqlUlen* paramSize, int16_t* decimalDigits, int16_t* nullable)
{
IGNITE_ODBC_API_CALL(InternalDescribeParam(paramNum,
dataType, paramSize, decimalDigits, nullable));
}
SqlResult::Type Statement::InternalDescribeParam(int16_t paramNum, int16_t* dataType,
SqlUlen* paramSize, int16_t* decimalDigits, int16_t* nullable)
{
query::Query *qry = currentQuery.get();
if (!qry)
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not prepared.");
return SqlResult::AI_ERROR;
}
if (qry->GetType() != query::QueryType::DATA)
{
AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not SQL data query.");
return SqlResult::AI_ERROR;
}
int8_t type = parameters.GetParamType(paramNum, 0);
LOG_MSG("Type: " << type);
if (!type)
{
SqlResult::Type res = UpdateParamsMeta();
if (res != SqlResult::AI_SUCCESS)
return res;
type = parameters.GetParamType(paramNum, impl::binary::IGNITE_HDR_NULL);
}
if (dataType)
*dataType = type_traits::BinaryToSqlType(type);
if (paramSize)
*paramSize = type_traits::BinaryTypeColumnSize(type);
if (decimalDigits)
*decimalDigits = type_traits::BinaryTypeDecimalDigits(type);
if (nullable)
*nullable = type_traits::BinaryTypeNullability(type);
return SqlResult::AI_SUCCESS;
}
SqlResult::Type Statement::UpdateParamsMeta()
{
query::Query *qry0 = currentQuery.get();
assert(qry0 != 0);
assert(qry0->GetType() == query::QueryType::DATA);
query::DataQuery* qry = static_cast<query::DataQuery*>(qry0);
const std::string& schema = connection.GetSchema();
const std::string& sql = qry->GetSql();
QueryGetParamsMetaRequest req(schema, sql);
QueryGetParamsMetaResponse rsp;
try
{
connection.SyncMessage(req, rsp);
}
catch (const OdbcError& err)
{
AddStatusRecord(err);
return SqlResult::AI_ERROR;
}
catch (const IgniteError& err)
{
AddStatusRecord(err.GetText());
return SqlResult::AI_ERROR;
}
if (rsp.GetStatus() != ResponseStatus::SUCCESS)
{
LOG_MSG("Error: " << rsp.GetError());
AddStatusRecord(ResponseStatusToSqlState(rsp.GetStatus()), rsp.GetError());
return SqlResult::AI_ERROR;
}
parameters.UpdateParamsTypes(rsp.GetTypeIds());
for (size_t i = 0; i < rsp.GetTypeIds().size(); ++i)
{
LOG_MSG("[" << i << "] Parameter type: " << rsp.GetTypeIds()[i]);
}
return SqlResult::AI_SUCCESS;
}
uint16_t Statement::SqlResultToRowResult(SqlResult::Type value)
{
switch (value)
{
case SqlResult::AI_NO_DATA:
return SQL_ROW_NOROW;
case SqlResult::AI_SUCCESS:
return SQL_ROW_SUCCESS;
case SqlResult::AI_SUCCESS_WITH_INFO:
return SQL_ROW_SUCCESS_WITH_INFO;
default:
return SQL_ROW_ERROR;
}
}
}
}