blob: 22ffd5443c613b4bb09cbfefdceacb0318ae1a6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// ReSharper disable UnusedAutoPropertyAccessor.Global
// ReSharper disable MemberCanBePrivate.Global
namespace Apache.Ignite.Core.Tests.Cache.Query
{
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Resource;
using NUnit.Framework;
/// <summary>
/// Queries tests.
/// </summary>
public class CacheQueriesTest
{
/** Grid count. */
private const int GridCnt = 2;
/** Cache name. */
private const string CacheName = "cache";
/** Path to XML configuration. */
private const string CfgPath = "Config\\cache-query.xml";
/** Maximum amount of items in cache. */
private const int MaxItemCnt = 100;
/// <summary>
/// Fixture setup.
/// </summary>
[TestFixtureSetUp]
public void StartGrids()
{
for (int i = 0; i < GridCnt; i++)
{
Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
BinaryConfiguration = new BinaryConfiguration
{
NameMapper = GetNameMapper()
},
SpringConfigUrl = CfgPath,
IgniteInstanceName = "grid-" + i
});
}
}
/// <summary>
/// Gets the name mapper.
/// </summary>
protected virtual IBinaryNameMapper GetNameMapper()
{
return new BinaryBasicNameMapper {IsSimpleName = false};
}
/// <summary>
/// Fixture teardown.
/// </summary>
[TestFixtureTearDown]
public void StopGrids()
{
Ignition.StopAll(true);
}
/// <summary>
///
/// </summary>
[SetUp]
public void BeforeTest()
{
Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
}
/// <summary>
///
/// </summary>
[TearDown]
public void AfterTest()
{
var cache = Cache();
for (int i = 0; i < GridCnt; i++)
{
cache.Clear();
Assert.IsTrue(cache.IsEmpty());
}
TestUtils.AssertHandleRegistryIsEmpty(300,
Enumerable.Range(0, GridCnt).Select(x => Ignition.GetIgnite("grid-" + x)).ToArray());
Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
}
/// <summary>
/// Gets the ignite.
/// </summary>
private static IIgnite GetIgnite()
{
return Ignition.GetIgnite("grid-0");
}
/// <summary>
///
/// </summary>
/// <returns></returns>
private static ICache<int, QueryPerson> Cache()
{
return GetIgnite().GetCache<int, QueryPerson>(CacheName);
}
/// <summary>
/// Test arguments validation for SQL queries.
/// </summary>
[Test]
public void TestValidationSql()
{
#pragma warning disable 618
// 1. No sql.
Assert.Throws<ArgumentException>(() =>
{ Cache().Query(new SqlQuery(typeof(QueryPerson), null)); });
// 2. No type.
Assert.Throws<ArgumentException>(() =>
{ Cache().Query(new SqlQuery((string)null, "age >= 50")); });
#pragma warning restore 618
}
/// <summary>
/// Test arguments validation for SQL fields queries.
/// </summary>
[Test]
public void TestValidationSqlFields()
{
// 1. No sql.
Assert.Throws<ArgumentException>(() => { Cache().Query(new SqlFieldsQuery(null)); });
}
/// <summary>
/// Test arguments validation for TEXT queries.
/// </summary>
[Test]
public void TestValidationText()
{
// 1. No text.
Assert.Throws<ArgumentException>(() =>
{ Cache().Query(new TextQuery(typeof(QueryPerson), null)); });
// 2. No type.
Assert.Throws<ArgumentException>(() =>
{ Cache().Query(new TextQuery((string)null, "Ivanov")); });
}
/// <summary>
/// Cursor tests.
/// </summary>
[Test]
[SuppressMessage("ReSharper", "ReturnValueOfPureMethodIsNotUsed")]
public void TestCursor()
{
Cache().Put(1, new QueryPerson("Ivanov", 30));
Cache().Put(1, new QueryPerson("Petrov", 40));
Cache().Put(1, new QueryPerson("Sidorov", 50));
#pragma warning disable 618
SqlQuery qry = new SqlQuery(typeof(QueryPerson), "age >= 20");
#pragma warning restore 618
// 1. Test GetAll().
using (IQueryCursor<ICacheEntry<int, QueryPerson>> cursor = Cache().Query(qry))
{
cursor.GetAll();
Assert.Throws<InvalidOperationException>(() => { cursor.GetAll(); });
Assert.Throws<InvalidOperationException>(() => { cursor.GetEnumerator(); });
}
// 2. Test GetEnumerator.
using (IQueryCursor<ICacheEntry<int, QueryPerson>> cursor = Cache().Query(qry))
{
cursor.GetEnumerator();
Assert.Throws<InvalidOperationException>(() => { cursor.GetAll(); });
Assert.Throws<InvalidOperationException>(() => { cursor.GetEnumerator(); });
}
}
/// <summary>
/// Test enumerator.
/// </summary>
[Test]
[SuppressMessage("ReSharper", "UnusedVariable")]
public void TestEnumerator()
{
#pragma warning disable 618
Cache().Put(1, new QueryPerson("Ivanov", 30));
Cache().Put(2, new QueryPerson("Petrov", 40));
Cache().Put(3, new QueryPerson("Sidorov", 50));
Cache().Put(4, new QueryPerson("Unknown", 60));
// 1. Empty result set.
using (IQueryCursor<ICacheEntry<int, QueryPerson>> cursor =
Cache().Query(new SqlQuery(typeof(QueryPerson), "age = 100")))
{
IEnumerator<ICacheEntry<int, QueryPerson>> e = cursor.GetEnumerator();
Assert.Throws<InvalidOperationException>(() =>
{ ICacheEntry<int, QueryPerson> entry = e.Current; });
Assert.IsFalse(e.MoveNext());
Assert.Throws<InvalidOperationException>(() =>
{ ICacheEntry<int, QueryPerson> entry = e.Current; });
Assert.Throws<NotSupportedException>(() => e.Reset());
e.Dispose();
}
SqlQuery qry = new SqlQuery(typeof (QueryPerson), "age < 60");
Assert.AreEqual(QueryBase.DefaultPageSize, qry.PageSize);
// 2. Page size is bigger than result set.
qry.PageSize = 4;
CheckEnumeratorQuery(qry);
// 3. Page size equal to result set.
qry.PageSize = 3;
CheckEnumeratorQuery(qry);
// 4. Page size if less than result set.
qry.PageSize = 2;
CheckEnumeratorQuery(qry);
#pragma warning restore 618
}
/// <summary>
/// Test SQL query arguments passing.
/// </summary>
[Test]
public void TestSqlQueryArguments()
{
#pragma warning disable 618
Cache().Put(1, new QueryPerson("Ivanov", 30));
Cache().Put(2, new QueryPerson("Petrov", 40));
Cache().Put(3, new QueryPerson("Sidorov", 50));
// 1. Empty result set.
using (var cursor = Cache().Query(new SqlQuery(typeof(QueryPerson), "age < ?", 50)))
{
foreach (ICacheEntry<int, QueryPerson> entry in cursor.GetAll())
Assert.IsTrue(entry.Key == 1 || entry.Key == 2);
}
#pragma warning restore 618
}
/// <summary>
/// Test SQL fields query arguments passing.
/// </summary>
[Test]
public void TestSqlFieldsQueryArguments()
{
Cache().Put(1, new QueryPerson("Ivanov", 30));
Cache().Put(2, new QueryPerson("Petrov", 40));
Cache().Put(3, new QueryPerson("Sidorov", 50));
// 1. Empty result set.
using (var cursor = Cache().Query(new SqlFieldsQuery("SELECT age FROM QueryPerson WHERE age < ?", 50)))
{
foreach (var entry in cursor.GetAll())
Assert.IsTrue((int) entry[0] < 50);
}
}
/// <summary>
/// Check query result for enumerator test.
/// </summary>
/// <param name="qry">QUery.</param>
#pragma warning disable 618
private void CheckEnumeratorQuery(SqlQuery qry)
{
using (IQueryCursor<ICacheEntry<int, QueryPerson>> cursor = Cache().Query(qry))
{
bool first = false;
bool second = false;
bool third = false;
foreach (var entry in cursor)
{
if (entry.Key == 1)
{
first = true;
Assert.AreEqual("Ivanov", entry.Value.Name);
Assert.AreEqual(30, entry.Value.Age);
}
else if (entry.Key == 2)
{
second = true;
Assert.AreEqual("Petrov", entry.Value.Name);
Assert.AreEqual(40, entry.Value.Age);
}
else if (entry.Key == 3)
{
third = true;
Assert.AreEqual("Sidorov", entry.Value.Name);
Assert.AreEqual(50, entry.Value.Age);
}
else
Assert.Fail("Unexpected value: " + entry);
}
Assert.IsTrue(first && second && third);
}
}
#pragma warning restore 618
/// <summary>
/// Check SQL query.
/// </summary>
[Test]
public void TestSqlQuery([Values(true, false)] bool loc, [Values(true, false)] bool keepBinary,
[Values(true, false)] bool distrJoin)
{
var cache = Cache();
// 1. Populate cache with data, calculating expected count in parallel.
var exp = PopulateCache(cache, loc, MaxItemCnt, x => x < 50);
// 2. Validate results.
#pragma warning disable 618
var qry = new SqlQuery(typeof(QueryPerson), "age < 50", loc)
{
EnableDistributedJoins = distrJoin,
ReplicatedOnly = false,
Timeout = TimeSpan.FromSeconds(3)
};
#pragma warning restore 618
Assert.AreEqual(string.Format("SqlQuery [Sql=age < 50, Arguments=[], Local={0}, " +
"PageSize=1024, EnableDistributedJoins={1}, Timeout={2}, " +
"ReplicatedOnly=False]", loc, distrJoin, qry.Timeout), qry.ToString());
ValidateQueryResults(cache, qry, exp, keepBinary);
}
/// <summary>
/// Check SQL fields query.
/// </summary>
[Test]
public void TestSqlFieldsQuery([Values(true, false)] bool loc, [Values(true, false)] bool distrJoin,
[Values(true, false)] bool enforceJoinOrder, [Values(true, false)] bool lazy)
{
int cnt = MaxItemCnt;
var cache = Cache();
// 1. Populate cache with data, calculating expected count in parallel.
var exp = PopulateCache(cache, loc, cnt, x => x < 50);
// 2. Validate results.
var qry = new SqlFieldsQuery("SELECT name, age FROM QueryPerson WHERE age < 50")
{
EnableDistributedJoins = distrJoin,
EnforceJoinOrder = enforceJoinOrder,
Colocated = !distrJoin,
#pragma warning disable 618
ReplicatedOnly = false,
#pragma warning restore 618
Local = loc,
Timeout = TimeSpan.FromSeconds(2),
Lazy = lazy
};
using (var cursor = cache.Query(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
foreach (var entry in cursor.GetAll())
{
Assert.AreEqual(2, entry.Count);
Assert.AreEqual(entry[0].ToString(), entry[1].ToString());
exp0.Remove((int)entry[1]);
}
Assert.AreEqual(0, exp0.Count);
Assert.AreEqual(new[] {"NAME", "AGE"}, cursor.FieldNames);
}
// Test old API as well.
#pragma warning disable 618
using (var cursor = cache.QueryFields(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
foreach (var entry in cursor)
{
Assert.AreEqual(entry[0].ToString(), entry[1].ToString());
exp0.Remove((int)entry[1]);
}
Assert.AreEqual(0, exp0.Count);
}
#pragma warning restore 618
}
/// <summary>
/// Tests that query configuration propagates from Spring XML correctly.
/// </summary>
[Test]
public void TestQueryConfiguration()
{
var qe = Cache().GetConfiguration().QueryEntities.Single();
Assert.AreEqual(typeof(QueryPerson).FullName, qe.ValueTypeName);
var age = qe.Fields.First();
Assert.AreEqual("age", age.Name);
Assert.AreEqual(typeof(int), age.FieldType);
Assert.IsFalse(age.IsKeyField);
var name = qe.Fields.Last();
Assert.AreEqual("name", name.Name);
Assert.AreEqual(typeof(string), name.FieldType);
Assert.IsFalse(name.IsKeyField);
var textIdx = qe.Indexes.First();
Assert.AreEqual(QueryIndexType.FullText, textIdx.IndexType);
Assert.AreEqual("name", textIdx.Fields.Single().Name);
Assert.AreEqual(QueryIndex.DefaultInlineSize, textIdx.InlineSize);
var sqlIdx = qe.Indexes.Last();
Assert.AreEqual(QueryIndexType.Sorted, sqlIdx.IndexType);
Assert.AreEqual("age", sqlIdx.Fields.Single().Name);
Assert.AreEqual(2345, sqlIdx.InlineSize);
}
/// <summary>
/// Check text query.
/// </summary>
[Test]
public void TestTextQuery([Values(true, false)] bool loc, [Values(true, false)] bool keepBinary)
{
var cache = Cache();
// 1. Populate cache with data, calculating expected count in parallel.
var exp = PopulateCache(cache, loc, MaxItemCnt, x => x.ToString().StartsWith("1"));
// 2. Validate results.
var qry = new TextQuery(typeof(QueryPerson), "1*", loc);
ValidateQueryResults(cache, qry, exp, keepBinary);
}
/// <summary>
/// Check scan query.
/// </summary>
[Test]
public void TestScanQuery([Values(true, false)] bool loc)
{
CheckScanQuery<QueryPerson>(loc, false);
}
/// <summary>
/// Check scan query in binary mode.
/// </summary>
[Test]
public void TestScanQueryBinary([Values(true, false)] bool loc)
{
CheckScanQuery<IBinaryObject>(loc, true);
}
/// <summary>
/// Check scan query with partitions.
/// </summary>
[Test]
public void TestScanQueryPartitions([Values(true, false)] bool loc)
{
CheckScanQueryPartitions<QueryPerson>(loc, false);
}
/// <summary>
/// Check scan query with partitions in binary mode.
/// </summary>
[Test]
public void TestScanQueryPartitionsBinary([Values(true, false)] bool loc)
{
CheckScanQueryPartitions<IBinaryObject>(loc, true);
}
/// <summary>
/// Checks that scan query is thread-safe and throws correct exception when disposed from another thread.
/// </summary>
[Test]
public void TestScanQueryDisposedFromAnotherThreadThrowsObjectDisposedException()
{
var cache = GetIgnite().GetOrCreateCache<int, int>(TestUtils.TestName);
const int totalCount = 10000;
cache.PutAll(Enumerable.Range(1, totalCount).ToDictionary(x => x, x => x));
var scanQuery = new ScanQuery<int, int>
{
Filter = new ScanQueryFilter<int> {AcceptAll = true}
};
var cursor = cache.Query(scanQuery);
long count = 0;
Task.Factory.StartNew(() =>
{
// ReSharper disable once AccessToModifiedClosure
while (Interlocked.Read(ref count) < totalCount / 10) { }
cursor.Dispose();
});
Assert.Throws<ObjectDisposedException>(() =>
{
foreach (var unused in cursor)
{
Interlocked.Increment(ref count);
}
});
}
/// <summary>
/// Tests that query attempt on non-indexed cache causes an exception.
/// </summary>
[Test]
public void TestIndexingDisabledError()
{
var cache = GetIgnite().GetOrCreateCache<int, QueryPerson>("nonindexed_cache");
// Text query.
var err = Assert.Throws<IgniteException>(() => cache.Query(new TextQuery(typeof(QueryPerson), "1*")));
Assert.AreEqual("Indexing is disabled for cache: nonindexed_cache. " +
"Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.", err.Message);
// SQL query.
#pragma warning disable 618
err = Assert.Throws<IgniteException>(() => cache.Query(new SqlQuery(typeof(QueryPerson), "age < 50")));
#pragma warning restore 618
Assert.AreEqual("Failed to find SQL table for type: QueryPerson", err.Message);
}
/// <summary>
/// Check scan query.
/// </summary>
/// <param name="loc">Local query flag.</param>
/// <param name="keepBinary">Keep binary flag.</param>
private static void CheckScanQuery<TV>(bool loc, bool keepBinary)
{
var cache = Cache();
int cnt = MaxItemCnt;
// No predicate
var exp = PopulateCache(cache, loc, cnt, x => true);
var qry = new ScanQuery<int, TV>();
ValidateQueryResults(cache, qry, exp, keepBinary);
// Serializable
exp = PopulateCache(cache, loc, cnt, x => x < 50);
qry = new ScanQuery<int, TV>(new ScanQueryFilter<TV>());
ValidateQueryResults(cache, qry, exp, keepBinary);
// Binarizable
exp = PopulateCache(cache, loc, cnt, x => x < 50);
qry = new ScanQuery<int, TV>(new BinarizableScanQueryFilter<TV>());
ValidateQueryResults(cache, qry, exp, keepBinary);
// Invalid
exp = PopulateCache(cache, loc, cnt, x => x < 50);
qry = new ScanQuery<int, TV>(new InvalidScanQueryFilter<TV>());
Assert.Throws<BinaryObjectException>(() => ValidateQueryResults(cache, qry, exp, keepBinary));
// Exception
exp = PopulateCache(cache, loc, cnt, x => x < 50);
qry = new ScanQuery<int, TV>(new ScanQueryFilter<TV> {ThrowErr = true});
var ex = Assert.Throws<IgniteException>(() => ValidateQueryResults(cache, qry, exp, keepBinary));
Assert.AreEqual(ScanQueryFilter<TV>.ErrMessage, ex.Message);
}
/// <summary>
/// Checks scan query with partitions.
/// </summary>
/// <param name="loc">Local query flag.</param>
/// <param name="keepBinary">Keep binary flag.</param>
private void CheckScanQueryPartitions<TV>(bool loc, bool keepBinary)
{
StopGrids();
StartGrids();
var cache = Cache();
int cnt = MaxItemCnt;
var aff = cache.Ignite.GetAffinity(CacheName);
var exp = PopulateCache(cache, loc, cnt, x => true); // populate outside the loop (slow)
for (var part = 0; part < aff.Partitions; part++)
{
//var exp0 = new HashSet<int>(exp.Where(x => aff.Partition(x) == part)); // filter expected keys
var exp0 = new HashSet<int>();
foreach (var x in exp)
if (aff.GetPartition(x) == part)
exp0.Add(x);
var qry = new ScanQuery<int, TV> { Partition = part };
ValidateQueryResults(cache, qry, exp0, keepBinary);
}
// Partitions with predicate
exp = PopulateCache(cache, loc, cnt, x => x < 50); // populate outside the loop (slow)
for (var part = 0; part < aff.Partitions; part++)
{
//var exp0 = new HashSet<int>(exp.Where(x => aff.Partition(x) == part)); // filter expected keys
var exp0 = new HashSet<int>();
foreach (var x in exp)
if (aff.GetPartition(x) == part)
exp0.Add(x);
var qry = new ScanQuery<int, TV>(new ScanQueryFilter<TV>()) { Partition = part };
ValidateQueryResults(cache, qry, exp0, keepBinary);
}
}
/// <summary>
/// Tests custom schema name.
/// </summary>
[Test]
public void TestCustomSchema()
{
var doubles = GetIgnite().GetOrCreateCache<int, double>(new CacheConfiguration("doubles",
new QueryEntity(typeof(int), typeof(double))));
var strings = GetIgnite().GetOrCreateCache<int, string>(new CacheConfiguration("strings",
new QueryEntity(typeof(int), typeof(string))));
doubles[1] = 36.6;
strings[1] = "foo";
// Default schema.
var res = doubles.Query(new SqlFieldsQuery(
"select S._val from double as D join \"strings\".string as S on S._key = D._key"))
.Select(x => (string) x[0])
.Single();
Assert.AreEqual("foo", res);
// Custom schema.
res = doubles.Query(new SqlFieldsQuery(
"select S._val from \"doubles\".double as D join string as S on S._key = D._key")
{
Schema = strings.Name
})
.Select(x => (string)x[0])
.Single();
Assert.AreEqual("foo", res);
}
/// <summary>
/// Tests the distributed joins flag.
/// </summary>
[Test]
public void TestDistributedJoins()
{
var cache = GetIgnite().GetOrCreateCache<int, QueryPerson>(
new CacheConfiguration("replicatedCache")
{
QueryEntities = new[]
{
new QueryEntity(typeof(int), typeof(QueryPerson))
{
Fields = new[] {new QueryField("age", "int")}
}
}
});
const int count = 100;
cache.PutAll(Enumerable.Range(0, count).ToDictionary(x => x, x => new QueryPerson("Name" + x, x)));
// Test non-distributed join: returns partial results
var sql = "select T0.Age from QueryPerson as T0 " +
"inner join QueryPerson as T1 on ((? - T1.Age - 1) = T0._key)";
var res = cache.Query(new SqlFieldsQuery(sql, count)).GetAll().Distinct().Count();
Assert.Greater(res, 0);
Assert.Less(res, count);
// Test distributed join: returns complete results
res = cache.Query(new SqlFieldsQuery(sql, count) {EnableDistributedJoins = true})
.GetAll().Distinct().Count();
Assert.AreEqual(count, res);
}
/// <summary>
/// Tests the get configuration.
/// </summary>
[Test]
public void TestGetConfiguration()
{
var entity = Cache().GetConfiguration().QueryEntities.Single();
var ageField = entity.Fields.Single(x => x.Name == "age");
Assert.AreEqual(typeof(int), ageField.FieldType);
Assert.IsFalse(ageField.NotNull);
Assert.IsFalse(ageField.IsKeyField);
var nameField = entity.Fields.Single(x => x.Name == "name");
Assert.AreEqual(typeof(string), nameField.FieldType);
Assert.IsTrue(nameField.NotNull);
Assert.IsFalse(nameField.IsKeyField);
}
/// <summary>
/// Tests custom key and value field names.
/// </summary>
[Test]
public void TestCustomKeyValueFieldNames()
{
// Check select * with default config - does not include _key, _val.
var cache = Cache();
cache[1] = new QueryPerson("Joe", 48);
var row = cache.Query(new SqlFieldsQuery("select * from QueryPerson")).GetAll()[0];
Assert.AreEqual(2, row.Count);
Assert.AreEqual(48, row[0]);
Assert.AreEqual("Joe", row[1]);
// Check select * with custom names - fields are included.
cache = GetIgnite().GetOrCreateCache<int, QueryPerson>(
new CacheConfiguration("customKeyVal")
{
QueryEntities = new[]
{
new QueryEntity(typeof(int), typeof(QueryPerson))
{
Fields = new[]
{
new QueryField("age", "int"),
new QueryField("FullKey", "int"),
new QueryField("FullVal", "QueryPerson")
},
KeyFieldName = "FullKey",
ValueFieldName = "FullVal"
}
}
});
cache[1] = new QueryPerson("John", 33);
row = cache.Query(new SqlFieldsQuery("select * from QueryPerson")).GetAll()[0];
Assert.AreEqual(3, row.Count);
Assert.AreEqual(33, row[0]);
Assert.AreEqual(1, row[1]);
var person = (QueryPerson) row[2];
Assert.AreEqual("John", person.Name);
// Check explicit select.
row = cache.Query(new SqlFieldsQuery("select FullKey from QueryPerson")).GetAll()[0];
Assert.AreEqual(1, row[0]);
}
/// <summary>
/// Tests query timeouts.
/// </summary>
[Test]
[Category(TestUtils.CategoryIntensive)]
public void TestSqlQueryTimeout()
{
var cache = Cache();
PopulateCache(cache, false, 30000, x => true);
#pragma warning disable 618
var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 2000")
{
Timeout = TimeSpan.FromMilliseconds(1)
};
#pragma warning restore 618
// ReSharper disable once ReturnValueOfPureMethodIsNotUsed
var ex = Assert.Throws<CacheException>(() => cache.Query(sqlQry).ToArray());
Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
}
/// <summary>
/// Tests fields query timeouts.
/// </summary>
[Test]
[Category(TestUtils.CategoryIntensive)]
public void TestSqlFieldsQueryTimeout()
{
var cache = Cache();
PopulateCache(cache, false, 20000, x => true);
var fieldsQry = new SqlFieldsQuery("SELECT * FROM QueryPerson WHERE age < 5000 AND name like '%0%'")
{
Timeout = TimeSpan.FromMilliseconds(3)
};
// ReSharper disable once ReturnValueOfPureMethodIsNotUsed
var ex = Assert.Throws<CacheException>(() => cache.Query(fieldsQry).ToArray());
Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
}
/// <summary>
/// Tests the FieldNames property.
/// </summary>
[Test]
public void TestFieldNames()
{
var cache = Cache();
PopulateCache(cache, false, 5, x => true);
// Get before iteration.
var qry = new SqlFieldsQuery("SELECT * FROM QueryPerson");
var cur = cache.Query(qry);
var names = cur.FieldNames;
Assert.AreEqual(new[] {"AGE", "NAME" }, names);
cur.Dispose();
Assert.AreSame(names, cur.FieldNames);
Assert.Throws<NotSupportedException>(() => cur.FieldNames.Add("x"));
// Custom order, key-val, get after iteration.
qry.Sql = "SELECT NAME, _key, AGE, _val FROM QueryPerson";
cur = cache.Query(qry);
cur.GetAll();
Assert.AreEqual(new[] { "NAME", "_KEY", "AGE", "_VAL" }, cur.FieldNames);
// Get after disposal.
qry.Sql = "SELECT 1, AGE FROM QueryPerson";
cur = cache.Query(qry);
cur.Dispose();
Assert.AreEqual(new[] { "1", "AGE" }, cur.FieldNames);
}
/// <summary>
/// Tests the FieldsMetadata property.
/// </summary>
[Test]
public void TestFieldsMetadata()
{
var cache = Cache();
PopulateCache(cache, false, 5, x => true);
// Get before iteration.
var qry = new SqlFieldsQuery("SELECT * FROM QueryPerson");
var cur = cache.Query(qry);
var metas = cur.Fields;
ValidateFieldsMetadata(
metas,
new[] {"AGE", "NAME"},
new[] {typeof(int), typeof(string)},
new[] {"java.lang.Integer", "java.lang.String"}
);
cur.Dispose();
Assert.AreSame(metas, cur.Fields);
Assert.Throws<NotSupportedException>(() => cur.Fields.Add(default(IQueryCursorField)));
// Custom order, key-val, get after iteration.
qry.Sql = "SELECT NAME, _key, AGE, _val FROM QueryPerson";
cur = cache.Query(qry);
cur.GetAll();
ValidateFieldsMetadata(
cur.Fields,
new[] {"NAME", "_KEY", "AGE", "_VAL"},
new[] {typeof(string), typeof(object), typeof(int), typeof(object)},
new[] {"java.lang.String", "java.lang.Object", "java.lang.Integer", "java.lang.Object"}
);
// Get after disposal.
qry.Sql = "SELECT 1, AGE FROM QueryPerson";
cur = cache.Query(qry);
cur.Dispose();
ValidateFieldsMetadata(
cur.Fields,
new[] {"1", "AGE"},
new[] {typeof(int), typeof(int)},
new[] {"java.lang.Integer", "java.lang.Integer"}
);
}
/// <summary>
/// Tests <see cref="SqlFieldsQuery.Partitions"/> argument propagation and validation.
/// </summary>
[Test]
public void TestPartitionsValidation()
{
var cache = Cache();
var qry = new SqlFieldsQuery("SELECT * FROM QueryPerson") { Partitions = new int[0] };
var ex = Assert.Throws<ArgumentException>(() => cache.Query(qry).GetAll());
StringAssert.EndsWith("Partitions must not be empty.", ex.Message);
qry.Partitions = new[] {-1, -2};
ex = Assert.Throws<ArgumentException>(() => cache.Query(qry).GetAll());
StringAssert.EndsWith("Illegal partition", ex.Message);
}
/// <summary>
/// Tests <see cref="SqlFieldsQuery.UpdateBatchSize"/> argument propagation and validation.
/// </summary>
[Test]
public void TestUpdateBatchSizeValidation()
{
var cache = Cache();
var qry = new SqlFieldsQuery("SELECT * FROM QueryPerson") { UpdateBatchSize = -1 };
var ex = Assert.Throws<ArgumentException>(() => cache.Query(qry).GetAll());
StringAssert.EndsWith("updateBatchSize cannot be lower than 1", ex.Message);
}
/// <summary>
/// Validates fields metadata collection
/// </summary>
/// <param name="metadata">Metadata</param>
/// <param name="expectedNames">Expected field names</param>
/// <param name="expectedTypes">Expected field types</param>
/// <param name="expectedJavaTypeNames">Expected java type names</param>
private static void ValidateFieldsMetadata(
IList<IQueryCursorField> metadata,
string[] expectedNames,
Type[] expectedTypes,
string[] expectedJavaTypeNames
)
{
Assert.AreEqual(expectedNames, metadata.Select(m => m.Name));
Assert.AreEqual(expectedTypes, metadata.Select(m => m.Type));
Assert.AreEqual(expectedJavaTypeNames, metadata.Select(m => m.JavaTypeName));
}
/// <summary>
/// Validates the query results.
/// </summary>
/// <param name="cache">Cache.</param>
/// <param name="qry">Query.</param>
/// <param name="exp">Expected keys.</param>
/// <param name="keepBinary">Keep binary flag.</param>
private static void ValidateQueryResults(ICache<int, QueryPerson> cache, QueryBase qry, HashSet<int> exp,
bool keepBinary)
{
if (keepBinary)
{
var cache0 = cache.WithKeepBinary<int, IBinaryObject>();
using (var cursor = cache0.Query(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
var all = new List<ICacheEntry<int, object>>();
foreach (var entry in cursor.GetAll())
{
all.Add(entry);
Assert.AreEqual(entry.Key.ToString(), entry.Value.GetField<string>("name"));
Assert.AreEqual(entry.Key, entry.Value.GetField<int>("age"));
exp0.Remove(entry.Key);
}
AssertMissingExpectedKeys(exp0, cache, all);
}
using (var cursor = cache0.Query(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
var all = new List<ICacheEntry<int, object>>();
foreach (var entry in cursor)
{
all.Add(entry);
Assert.AreEqual(entry.Key.ToString(), entry.Value.GetField<string>("name"));
Assert.AreEqual(entry.Key, entry.Value.GetField<int>("age"));
exp0.Remove(entry.Key);
}
AssertMissingExpectedKeys(exp0, cache, all);
}
}
else
{
using (var cursor = cache.Query(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
var all = new List<ICacheEntry<int, object>>();
foreach (var entry in cursor.GetAll())
{
all.Add(entry);
Assert.AreEqual(entry.Key.ToString(), entry.Value.Name);
Assert.AreEqual(entry.Key, entry.Value.Age);
exp0.Remove(entry.Key);
}
AssertMissingExpectedKeys(exp0, cache, all);
}
using (var cursor = cache.Query(qry))
{
HashSet<int> exp0 = new HashSet<int>(exp);
var all = new List<ICacheEntry<int, object>>();
foreach (var entry in cursor)
{
all.Add(entry);
Assert.AreEqual(entry.Key.ToString(), entry.Value.Name);
Assert.AreEqual(entry.Key, entry.Value.Age);
exp0.Remove(entry.Key);
}
AssertMissingExpectedKeys(exp0, cache, all);
}
}
}
/// <summary>
/// Asserts that all expected entries have been received.
/// </summary>
private static void AssertMissingExpectedKeys(ICollection<int> exp, ICache<int, QueryPerson> cache,
IList<ICacheEntry<int, object>> all)
{
if (exp.Count == 0)
return;
var sb = new StringBuilder();
var aff = cache.Ignite.GetAffinity(cache.Name);
foreach (var key in exp)
{
var part = aff.GetPartition(key);
sb.AppendFormat(
"Query did not return expected key '{0}' (exists: {1}), partition '{2}', partition nodes: ",
key, cache.Get(key) != null, part);
var partNodes = aff.MapPartitionToPrimaryAndBackups(part);
foreach (var node in partNodes)
sb.Append(node).Append(" ");
sb.AppendLine(";");
}
sb.Append("Returned keys: ");
foreach (var e in all)
sb.Append(e.Key).Append(' ');
sb.AppendLine(";");
Assert.Fail(sb.ToString());
}
/// <summary>
/// Populates the cache with random entries and returns expected results set according to filter.
/// </summary>
/// <param name="cache">The cache.</param>
/// <param name="cnt">Amount of cache entries to create.</param>
/// <param name="loc">Local query flag.</param>
/// <param name="expectedEntryFilter">The expected entry filter.</param>
/// <returns>Expected results set.</returns>
private static HashSet<int> PopulateCache(ICache<int, QueryPerson> cache, bool loc, int cnt,
Func<int, bool> expectedEntryFilter)
{
var rand = new Random();
for (var i = 0; i < cnt; i++)
{
var val = rand.Next(cnt);
cache.Put(val, new QueryPerson(val.ToString(), val));
}
var entries = loc
? cache.GetLocalEntries(CachePeekMode.Primary)
: cache;
return new HashSet<int>(entries.Select(x => x.Key).Where(expectedEntryFilter));
}
}
/// <summary>
/// Person.
/// </summary>
public class QueryPerson
{
/// <summary>
/// Constructor.
/// </summary>
/// <param name="name">Name.</param>
/// <param name="age">Age.</param>
public QueryPerson(string name, int age)
{
Name = name;
Age = age % 2000;
Birthday = DateTime.UtcNow.AddYears(-Age);
}
/// <summary>
/// Name.
/// </summary>
public string Name { get; set; }
/// <summary>
/// Age.
/// </summary>
public int Age { get; set; }
/// <summary>
/// Gets or sets the birthday.
/// </summary>
[QuerySqlField] // Enforce Timestamp serialization
public DateTime Birthday { get; set; }
}
/// <summary>
/// Query filter.
/// </summary>
[Serializable]
public class ScanQueryFilter<TV> : ICacheEntryFilter<int, TV>
{
// Error message
public const string ErrMessage = "Error in ScanQueryFilter.Invoke";
// Error flag
public bool ThrowErr { get; set; }
// Error flag
public bool AcceptAll { get; set; }
// Injection test
[InstanceResource]
public IIgnite Ignite { get; set; }
/** <inheritdoc /> */
public bool Invoke(ICacheEntry<int, TV> entry)
{
Assert.IsNotNull(Ignite);
if (ThrowErr)
throw new Exception(ErrMessage);
return entry.Key < 50 || AcceptAll;
}
}
/// <summary>
/// binary query filter.
/// </summary>
public class BinarizableScanQueryFilter<TV> : ScanQueryFilter<TV>, IBinarizable
{
/** <inheritdoc /> */
public void WriteBinary(IBinaryWriter writer)
{
var w = writer.GetRawWriter();
w.WriteBoolean(ThrowErr);
}
/** <inheritdoc /> */
public void ReadBinary(IBinaryReader reader)
{
var r = reader.GetRawReader();
ThrowErr = r.ReadBoolean();
}
}
/// <summary>
/// Filter that can't be serialized.
/// </summary>
public class InvalidScanQueryFilter<TV> : ScanQueryFilter<TV>, IBinarizable
{
public void WriteBinary(IBinaryWriter writer)
{
throw new BinaryObjectException("Expected");
}
public void ReadBinary(IBinaryReader reader)
{
throw new BinaryObjectException("Expected");
}
}
}