blob: a7e6c1cfa2b56e24ee95fa907efa711f50a950cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef _WIN32
# include <windows.h>
#endif
#include <sql.h>
#include <sqlext.h>
#include <vector>
#include <string>
#include <boost/test/unit_test.hpp>
#include "ignite/ignite.h"
#include "ignite/ignition.h"
#include "ignite/impl/binary/binary_utils.h"
#include "test_type.h"
#include "test_utils.h"
#include "odbc_test_suite.h"
using namespace ignite;
using namespace ignite::common;
using namespace ignite_test;
using namespace boost::unit_test;
/**
* Test setup fixture.
*/
struct StreamingTestSuiteFixture : odbc::OdbcTestSuite
{
/**
* Constructor.
*/
StreamingTestSuiteFixture() :
grid(StartPlatformNode("queries-test.xml", "NodeMain")),
cache(grid.GetCache<int32_t, TestType>("cache"))
{
// No-op.
}
/**
* Destructor.
*/
~StreamingTestSuiteFixture()
{
Ignition::StopAll(true);
}
void InsertTestStrings(int32_t begin, int32_t end)
{
InsertTestStrings(stmt, begin, end);
}
void InsertTestStrings2(int32_t begin, int32_t end)
{
InsertTestStrings2(stmt, begin, end);
}
void InsertTestStrings(SQLHSTMT stmt0, int32_t begin, int32_t end)
{
SQLCHAR req[] = "INSERT INTO TestType(_key, strField) VALUES(?, ?)";
InsertTestStrings(req, stmt0, begin, end);
}
void InsertTestStrings2(SQLHSTMT stmt0, int32_t begin, int32_t end)
{
SQLCHAR req[] = "INSERT INTO TestType(_key, i32Field, strField) VALUES(?, 42, ?)";
InsertTestStrings(req, stmt0, begin, end);
}
void InsertTestStrings(SQLCHAR* req, SQLHSTMT stmt0, int32_t begin, int32_t end)
{
SQLRETURN ret = SQLPrepare(stmt0, req, SQL_NTS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
int64_t key = 0;
char strField[1024] = {0};
SQLLEN strFieldLen = 0;
// Binding parameters.
ret = SQLBindParameter(stmt0, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0));
ret = SQLBindParameter(stmt0, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, sizeof(strField),
sizeof(strField), &strField, sizeof(strField), &strFieldLen);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0));
// Inserting values.
for (int32_t i = begin; i < end; ++i)
{
key = i;
std::string val = GetTestString(i);
CopyStringToBuffer(strField, val, sizeof(strField));
strFieldLen = SQL_NTS;
ret = SQLExecute(stmt0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0));
}
// Resetting parameters.
ret = SQLFreeStmt(stmt0, SQL_RESET_PARAMS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0));
}
int32_t GetI32Field(int32_t key)
{
SQLCHAR req[] = "SELECT i32Field FROM TestType WHERE _key = ?";
SQLRETURN ret = SQLPrepare(stmt, req, SQL_NTS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
int64_t p1 = key;
// Binding parameters.
ret = SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p1, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
int32_t val = 0;
ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &val, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
ret = SQLExecute(stmt);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Fetching value.
ret = SQLFetch(stmt);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Resetting parameters.
ret = SQLFreeStmt(stmt, SQL_RESET_PARAMS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Resetting columns.
ret = SQLFreeStmt(stmt, SQL_UNBIND);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Closing cursor.
ret = SQLFreeStmt(stmt, SQL_CLOSE);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
return val;
}
void CheckValues(int32_t begin, int32_t end)
{
SQLCHAR req[] = "SELECT _key, strField FROM TestType WHERE _key >= ? AND _key < ? ORDER BY _key";
SQLRETURN ret = SQLPrepare(stmt, req, SQL_NTS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
int64_t p1 = begin;
int64_t p2 = end;
// Binding parameters.
ret = SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p1, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
ret = SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p2, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
int64_t keyVal = 0;
char strField[1024] = {0};
SQLLEN strFieldLen = 0;
ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &keyVal, 0, 0);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
ret = SQLBindCol(stmt, 2, SQL_C_CHAR, &strField, sizeof(strField), &strFieldLen);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
ret = SQLExecute(stmt);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Fetching values.
for (int32_t i = begin; i < end; ++i)
{
ret = SQLFetch(stmt);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(i, keyVal);
BOOST_CHECK_EQUAL(GetTestString(i), std::string(strField, static_cast<size_t>(strFieldLen)));
}
// Resetting parameters.
ret = SQLFreeStmt(stmt, SQL_RESET_PARAMS);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Resetting columns.
ret = SQLFreeStmt(stmt, SQL_UNBIND);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
// Closing cursor.
ret = SQLFreeStmt(stmt, SQL_CLOSE);
if (!SQL_SUCCEEDED(ret))
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
}
/** Node started during the test. */
Ignite grid;
/** Cache. */
cache::Cache<int32_t, TestType> cache;
};
BOOST_FIXTURE_TEST_SUITE(StreamingTestSuite, StreamingTestSuiteFixture)
BOOST_AUTO_TEST_CASE(TestStreamingSimple)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery("set streaming on batch_size 100 flush_frequency 100");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
BOOST_CHECK_EQUAL(cache.Size(), 0);
InsertTestStrings(10, 110);
res = ExecQuery("set streaming off");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 110);
CheckValues(0, 110);
}
BOOST_AUTO_TEST_CASE(TestStreamingAllOptions)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery(
"set streaming 1 "
"allow_overwrite on "
"batch_size 512 "
"per_node_buffer_size 500 "
"per_node_parallel_operations 4 "
"flush_frequency 100 "
"ordered");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
BOOST_CHECK_EQUAL(cache.Size(), 0);
InsertTestStrings(0, 512);
res = ExecQuery("set streaming off");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 512);
}
BOOST_AUTO_TEST_CASE(TestStreamingNotAllowedOverwrite)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery("set streaming 1 allow_overwrite off batch_size 10");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
BOOST_CHECK_EQUAL(cache.Size(), 0);
InsertTestStrings(0, 10);
res = ExecQuery("set streaming off");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 10);
}
BOOST_AUTO_TEST_CASE(TestStreamingReset)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
BOOST_CHECK_EQUAL(cache.Size(), 0);
InsertTestStrings(10, 20);
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = ExecQuery("set streaming 1 batch_size 10 flush_frequency 100");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 20);
InsertTestStrings(20, 50);
BOOST_CHECK_EQUAL(cache.Size(), 20);
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 50);
}
BOOST_AUTO_TEST_CASE(TestStreamingClosingStatement)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
BOOST_CHECK_EQUAL(cache.Size(), 0);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 10);
}
BOOST_AUTO_TEST_CASE(TestStreamingSeveralStatements)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLHSTMT stmt2;
SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
InsertTestStrings(stmt2, 10, 20);
InsertTestStrings(20, 30);
InsertTestStrings(stmt2, 30, 50);
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 50);
res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2));
}
BOOST_AUTO_TEST_CASE(TestStreamingSeveralStatementsClosing)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLHSTMT stmt2;
SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, 10);
InsertTestStrings(stmt2, 10, 20);
InsertTestStrings(20, 30);
InsertTestStrings(stmt2, 30, 50);
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = SQLFreeHandle(SQL_HANDLE_STMT, stmt);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2));
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 50);
}
BOOST_AUTO_TEST_CASE(TestStreamingDifferentStatements)
{
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLHSTMT stmt2;
SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings2(0, 10);
InsertTestStrings(stmt2, 10, 20);
InsertTestStrings(20, 30);
InsertTestStrings2(stmt2, 30, 50);
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2));
BOOST_CHECK_EQUAL(cache.Size(), 0);
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), 50);
CheckValues(0, 50);
BOOST_CHECK_EQUAL(GetI32Field(8), 42);
BOOST_CHECK_EQUAL(GetI32Field(13), 0);
BOOST_CHECK_EQUAL(GetI32Field(42), 42);
}
BOOST_AUTO_TEST_CASE(TestStreamingManyObjects)
{
const static int32_t OBJECT_NUM = 100000;
Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache");
SQLRETURN res = ExecQuery("set streaming on");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
InsertTestStrings(0, OBJECT_NUM);
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
res = ExecQuery("set streaming 0");
if (res != SQL_SUCCESS)
BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
BOOST_CHECK_EQUAL(cache.Size(), OBJECT_NUM);
CheckValues(0, OBJECT_NUM);
}
BOOST_AUTO_TEST_SUITE_END()