blob: 5b56da1eb820a1de1fbc1c78d9f25f66668df148 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma warning disable 618
namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Cache.Query.Continuous;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Cache.Event;
using Apache.Ignite.Core.Resource;
using NUnit.Framework;
/// <summary>
/// Tests for continuous query.
/// </summary>
[SuppressMessage("ReSharper", "InconsistentNaming")]
[SuppressMessage("ReSharper", "PossibleNullReferenceException")]
[SuppressMessage("ReSharper", "StaticMemberInGenericType")]
public abstract class ContinuousQueryAbstractTest
{
/** Cache name: ATOMIC, backup. */
protected const string CACHE_ATOMIC_BACKUP = "atomic_backup";
/** Cache name: ATOMIC, no backup. */
protected const string CACHE_ATOMIC_NO_BACKUP = "atomic_no_backup";
/** Cache name: TRANSACTIONAL, backup. */
protected const string CACHE_TX_BACKUP = "transactional_backup";
/** Cache name: TRANSACTIONAL, no backup. */
protected const string CACHE_TX_NO_BACKUP = "transactional_no_backup";
/** Listener events. */
public static BlockingCollection<CallbackEvent> CB_EVTS = new BlockingCollection<CallbackEvent>();
/** Listener events. */
public static BlockingCollection<FilterEvent> FILTER_EVTS = new BlockingCollection<FilterEvent>();
/** First node. */
private IIgnite grid1;
/** Second node. */
private IIgnite grid2;
/** Cache on the first node. */
private ICache<int, BinarizableEntry> cache1;
/** Cache on the second node. */
private ICache<int, BinarizableEntry> cache2;
/** Cache name. */
private readonly string cacheName;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="cacheName">Cache name.</param>
protected ContinuousQueryAbstractTest(string cacheName)
{
this.cacheName = cacheName;
}
/// <summary>
/// Set-up routine.
/// </summary>
[TestFixtureSetUp]
public void SetUp()
{
var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
BinaryConfiguration = new BinaryConfiguration
{
TypeConfigurations = new List<BinaryTypeConfiguration>
{
new BinaryTypeConfiguration(typeof(BinarizableEntry)),
new BinaryTypeConfiguration(typeof(BinarizableFilter)),
new BinaryTypeConfiguration(typeof(KeepBinaryFilter))
}
},
SpringConfigUrl = "config\\cache-query-continuous.xml",
IgniteInstanceName = "grid-1"
};
grid1 = Ignition.Start(cfg);
cache1 = grid1.GetCache<int, BinarizableEntry>(cacheName);
cfg.IgniteInstanceName = "grid-2";
grid2 = Ignition.Start(cfg);
cache2 = grid2.GetCache<int, BinarizableEntry>(cacheName);
}
/// <summary>
/// Tear-down routine.
/// </summary>
[TestFixtureTearDown]
public void TearDown()
{
Ignition.StopAll(true);
}
/// <summary>
/// Before-test routine.
/// </summary>
[SetUp]
public void BeforeTest()
{
CB_EVTS = new BlockingCollection<CallbackEvent>();
FILTER_EVTS = new BlockingCollection<FilterEvent>();
AbstractFilter<BinarizableEntry>.res = true;
AbstractFilter<BinarizableEntry>.err = false;
AbstractFilter<BinarizableEntry>.marshErr = false;
AbstractFilter<BinarizableEntry>.unmarshErr = false;
cache1.Remove(PrimaryKey(cache1));
cache1.Remove(PrimaryKey(cache2));
Assert.AreEqual(0, cache1.GetSize());
Assert.AreEqual(0, cache2.GetSize());
Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
}
/// <summary>
/// Test arguments validation.
/// </summary>
[Test]
public void TestValidation()
{
Assert.Throws<ArgumentException>(() => { cache1.QueryContinuous(new ContinuousQuery<int, BinarizableEntry>(null)); });
}
/// <summary>
/// Test multiple closes.
/// </summary>
[Test]
public void TestMultipleClose()
{
int key1 = PrimaryKey(cache1);
int key2 = PrimaryKey(cache2);
Assert.AreNotEqual(key1, key2);
ContinuousQuery<int, BinarizableEntry> qry =
new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>());
IDisposable qryHnd;
using (qryHnd = cache1.QueryContinuous(qry))
{
// Put from local node.
cache1.GetAndPut(key1, Entry(key1));
CheckCallbackSingle(key1, null, Entry(key1), CacheEntryEventType.Created);
// Put from remote node.
cache2.GetAndPut(key2, Entry(key2));
CheckCallbackSingle(key2, null, Entry(key2), CacheEntryEventType.Created);
}
qryHnd.Dispose();
}
/// <summary>
/// Test regular callback operations.
/// </summary>
[Test]
public void TestCallback()
{
CheckCallback(false);
}
/// <summary>
/// Check regular callback execution.
/// </summary>
/// <param name="loc"></param>
protected void CheckCallback(bool loc)
{
int key1 = PrimaryKey(cache1);
int key2 = PrimaryKey(cache2);
ContinuousQuery<int, BinarizableEntry> qry = loc ?
new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>(), true) :
new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>());
using (cache1.QueryContinuous(qry))
{
// Put from local node.
cache1.GetAndPut(key1, Entry(key1));
CheckCallbackSingle(key1, null, Entry(key1), CacheEntryEventType.Created);
cache1.GetAndPut(key1, Entry(key1 + 1));
CheckCallbackSingle(key1, Entry(key1), Entry(key1 + 1), CacheEntryEventType.Updated);
cache1.Remove(key1);
CheckCallbackSingle(key1, Entry(key1 + 1), Entry(key1 + 1), CacheEntryEventType.Removed);
// Put from remote node.
cache2.GetAndPut(key2, Entry(key2));
if (loc)
CheckNoCallback(100);
else
CheckCallbackSingle(key2, null, Entry(key2), CacheEntryEventType.Created);
cache1.GetAndPut(key2, Entry(key2 + 1));
if (loc)
CheckNoCallback(100);
else
CheckCallbackSingle(key2, Entry(key2), Entry(key2 + 1), CacheEntryEventType.Updated);
cache1.Remove(key2);
if (loc)
CheckNoCallback(100);
else
CheckCallbackSingle(key2, Entry(key2 + 1), Entry(key2 + 1), CacheEntryEventType.Removed);
}
cache1.Put(key1, Entry(key1));
CheckNoCallback(100);
cache1.Put(key2, Entry(key2));
CheckNoCallback(100);
}
/// <summary>
/// Test Ignite injection into callback.
/// </summary>
[Test]
public void TestCallbackInjection()
{
Listener<BinarizableEntry> cb = new Listener<BinarizableEntry>();
Assert.IsNull(cb.ignite);
using (cache1.QueryContinuous(new ContinuousQuery<int, BinarizableEntry>(cb)))
{
Assert.IsNotNull(cb.ignite);
}
}
/// <summary>
/// Test binarizable filter logic.
/// </summary>
[Test]
public void TestFilterBinarizable()
{
CheckFilter(true, false);
}
/// <summary>
/// Test serializable filter logic.
/// </summary>
[Test]
public void TestFilterSerializable()
{
CheckFilter(false, false);
}
/// <summary>
/// Tests the defaults.
/// </summary>
[Test]
public void TestDefaults()
{
var qry = new ContinuousQuery<int, int>(null);
Assert.AreEqual(ContinuousQuery.DefaultAutoUnsubscribe, qry.AutoUnsubscribe);
Assert.AreEqual(ContinuousQuery.DefaultBufferSize, qry.BufferSize);
Assert.AreEqual(ContinuousQuery.DefaultTimeInterval, qry.TimeInterval);
Assert.IsFalse(qry.Local);
}
/// <summary>
/// Check filter.
/// </summary>
/// <param name="binarizable">Binarizable.</param>
/// <param name="loc">Local cache flag.</param>
protected void CheckFilter(bool binarizable, bool loc)
{
ICacheEntryEventListener<int, BinarizableEntry> lsnr = new Listener<BinarizableEntry>();
ICacheEntryEventFilter<int, BinarizableEntry> filter =
binarizable ? (AbstractFilter<BinarizableEntry>) new BinarizableFilter() : new SerializableFilter();
ContinuousQuery<int, BinarizableEntry> qry = loc ?
new ContinuousQuery<int, BinarizableEntry>(lsnr, filter, true) :
new ContinuousQuery<int, BinarizableEntry>(lsnr, filter);
using (cache1.QueryContinuous(qry))
{
// Put from local node.
int key1 = PrimaryKey(cache1);
cache1.GetAndPut(key1, Entry(key1));
CheckFilterSingle(key1, null, Entry(key1));
CheckCallbackSingle(key1, null, Entry(key1), CacheEntryEventType.Created);
// Put from remote node.
int key2 = PrimaryKey(cache2);
cache1.GetAndPut(key2, Entry(key2));
if (loc)
{
CheckNoFilter(key2);
CheckNoCallback(key2);
}
else
{
CheckFilterSingle(key2, null, Entry(key2));
CheckCallbackSingle(key2, null, Entry(key2), CacheEntryEventType.Created);
}
AbstractFilter<BinarizableEntry>.res = false;
// Ignored put from local node.
cache1.GetAndPut(key1, Entry(key1 + 1));
CheckFilterSingle(key1, Entry(key1), Entry(key1 + 1));
CheckNoCallback(100);
// Ignored put from remote node.
cache1.GetAndPut(key2, Entry(key2 + 1));
if (loc)
CheckNoFilter(100);
else
CheckFilterSingle(key2, Entry(key2), Entry(key2 + 1));
CheckNoCallback(100);
}
}
/// <summary>
/// Test binarizable filter error during invoke.
/// </summary>
[Ignore("IGNITE-521")]
[Test]
public void TestFilterInvokeErrorBinarizable()
{
CheckFilterInvokeError(true);
}
/// <summary>
/// Test serializable filter error during invoke.
/// </summary>
[Ignore("IGNITE-521")]
[Test]
public void TestFilterInvokeErrorSerializable()
{
CheckFilterInvokeError(false);
}
/// <summary>
/// Check filter error handling logic during invoke.
/// </summary>
private void CheckFilterInvokeError(bool binarizable)
{
AbstractFilter<BinarizableEntry>.err = true;
ICacheEntryEventListener<int, BinarizableEntry> lsnr = new Listener<BinarizableEntry>();
ICacheEntryEventFilter<int, BinarizableEntry> filter =
binarizable ? (AbstractFilter<BinarizableEntry>) new BinarizableFilter() : new SerializableFilter();
ContinuousQuery<int, BinarizableEntry> qry = new ContinuousQuery<int, BinarizableEntry>(lsnr, filter);
using (cache1.QueryContinuous(qry))
{
// Put from local node.
Assert.Throws<IgniteException>(() => cache1.GetAndPut(PrimaryKey(cache1), Entry(1)));
// Put from remote node.
Assert.Throws<IgniteException>(() => cache1.GetAndPut(PrimaryKey(cache2), Entry(1)));
}
}
/// <summary>
/// Test binarizable filter marshalling error.
/// </summary>
[Test]
public void TestFilterMarshalErrorBinarizable()
{
CheckFilterMarshalError(true);
}
/// <summary>
/// Test serializable filter marshalling error.
/// </summary>
[Test]
public void TestFilterMarshalErrorSerializable()
{
CheckFilterMarshalError(false);
}
/// <summary>
/// Check filter marshal error handling.
/// </summary>
/// <param name="binarizable">Binarizable flag.</param>
private void CheckFilterMarshalError(bool binarizable)
{
AbstractFilter<BinarizableEntry>.marshErr = true;
ICacheEntryEventListener<int, BinarizableEntry> lsnr = new Listener<BinarizableEntry>();
ICacheEntryEventFilter<int, BinarizableEntry> filter =
binarizable ? (AbstractFilter<BinarizableEntry>)new BinarizableFilter() : new SerializableFilter();
ContinuousQuery<int, BinarizableEntry> qry = new ContinuousQuery<int, BinarizableEntry>(lsnr, filter);
Assert.Throws<Exception>(() =>
{
using (cache1.QueryContinuous(qry))
{
// No-op.
}
});
}
/// <summary>
/// Test non-serializable filter error.
/// </summary>
[Test]
public void TestFilterNonSerializable()
{
CheckFilterNonSerializable(false);
}
/// <summary>
/// Test non-serializable filter behavior.
/// </summary>
/// <param name="loc"></param>
protected void CheckFilterNonSerializable(bool loc)
{
AbstractFilter<BinarizableEntry>.unmarshErr = true;
ICacheEntryEventListener<int, BinarizableEntry> lsnr = new Listener<BinarizableEntry>();
ICacheEntryEventFilter<int, BinarizableEntry> filter = new LocalFilter();
ContinuousQuery<int, BinarizableEntry> qry = loc
? new ContinuousQuery<int, BinarizableEntry>(lsnr, filter, true)
: new ContinuousQuery<int, BinarizableEntry>(lsnr, filter);
if (loc)
{
using (cache1.QueryContinuous(qry))
{
// Local put must be fine.
int key1 = PrimaryKey(cache1);
cache1.GetAndPut(key1, Entry(key1));
CheckFilterSingle(key1, null, Entry(key1));
}
}
else
{
Assert.Throws<BinaryObjectException>(() =>
{
using (cache1.QueryContinuous(qry))
{
// No-op.
}
});
}
}
/// <summary>
/// Test binarizable filter unmarshalling error.
/// </summary>
[Ignore("IGNITE-521")]
[Test]
public void TestFilterUnmarshalErrorBinarizable()
{
CheckFilterUnmarshalError(true);
}
/// <summary>
/// Test serializable filter unmarshalling error.
/// </summary>
[Ignore("IGNITE-521")]
[Test]
public void TestFilterUnmarshalErrorSerializable()
{
CheckFilterUnmarshalError(false);
}
/// <summary>
/// Check filter unmarshal error handling.
/// </summary>
/// <param name="binarizable">Binarizable flag.</param>
private void CheckFilterUnmarshalError(bool binarizable)
{
AbstractFilter<BinarizableEntry>.unmarshErr = true;
ICacheEntryEventListener<int, BinarizableEntry> lsnr = new Listener<BinarizableEntry>();
ICacheEntryEventFilter<int, BinarizableEntry> filter =
binarizable ? (AbstractFilter<BinarizableEntry>) new BinarizableFilter() : new SerializableFilter();
ContinuousQuery<int, BinarizableEntry> qry = new ContinuousQuery<int, BinarizableEntry>(lsnr, filter);
using (cache1.QueryContinuous(qry))
{
// Local put must be fine.
int key1 = PrimaryKey(cache1);
cache1.GetAndPut(key1, Entry(key1));
CheckFilterSingle(key1, null, Entry(key1));
// Remote put must fail.
Assert.Throws<IgniteException>(() => cache1.GetAndPut(PrimaryKey(cache2), Entry(1)));
}
}
/// <summary>
/// Test Ignite injection into filters.
/// </summary>
[Test]
public void TestFilterInjection()
{
Listener<BinarizableEntry> cb = new Listener<BinarizableEntry>();
BinarizableFilter filter = new BinarizableFilter();
Assert.IsNull(filter.ignite);
using (cache1.QueryContinuous(new ContinuousQuery<int, BinarizableEntry>(cb, filter)))
{
// Local injection.
Assert.IsNotNull(filter.ignite);
// Remote injection.
cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
FilterEvent evt;
Assert.IsTrue(FILTER_EVTS.TryTake(out evt, 500));
Assert.IsNotNull(evt.ignite);
}
}
/// <summary>
/// Test "keep-binary" scenario.
/// </summary>
[Test]
public void TestKeepBinary()
{
var cache = cache1.WithKeepBinary<int, IBinaryObject>();
ContinuousQuery<int, IBinaryObject> qry = new ContinuousQuery<int, IBinaryObject>(
new Listener<IBinaryObject>(), new KeepBinaryFilter());
using (cache.QueryContinuous(qry))
{
// 1. Local put.
cache1.GetAndPut(PrimaryKey(cache1), Entry(1));
CallbackEvent cbEvt;
FilterEvent filterEvt;
Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
Assert.AreEqual(PrimaryKey(cache1), filterEvt.entry.Key);
Assert.AreEqual(null, filterEvt.entry.OldValue);
Assert.AreEqual(Entry(1), (filterEvt.entry.Value as IBinaryObject)
.Deserialize<BinarizableEntry>());
Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
Assert.AreEqual(1, cbEvt.entries.Count);
Assert.AreEqual(PrimaryKey(cache1), cbEvt.entries.First().Key);
Assert.AreEqual(null, cbEvt.entries.First().OldValue);
Assert.AreEqual(Entry(1), (cbEvt.entries.First().Value as IBinaryObject)
.Deserialize<BinarizableEntry>());
// 2. Remote put.
ClearEvents();
cache1.GetAndPut(PrimaryKey(cache2), Entry(2));
Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
Assert.AreEqual(PrimaryKey(cache2), filterEvt.entry.Key);
Assert.AreEqual(null, filterEvt.entry.OldValue);
Assert.AreEqual(Entry(2), (filterEvt.entry.Value as IBinaryObject)
.Deserialize<BinarizableEntry>());
Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
Assert.AreEqual(1, cbEvt.entries.Count);
Assert.AreEqual(PrimaryKey(cache2), cbEvt.entries.First().Key);
Assert.AreEqual(null, cbEvt.entries.First().OldValue);
Assert.AreEqual(Entry(2),
(cbEvt.entries.First().Value as IBinaryObject).Deserialize<BinarizableEntry>());
}
}
/// <summary>
/// Test value types (special handling is required for nulls).
/// </summary>
[Test]
public void TestValueTypes()
{
var cache = grid1.GetCache<int, int>(cacheName);
var qry = new ContinuousQuery<int, int>(new Listener<int>());
var key = PrimaryKey(cache);
using (cache.QueryContinuous(qry))
{
// First update
cache.Put(key, 1);
CallbackEvent cbEvt;
Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
var cbEntry = cbEvt.entries.Single();
Assert.IsFalse(cbEntry.HasOldValue);
Assert.IsTrue(cbEntry.HasValue);
Assert.AreEqual(key, cbEntry.Key);
Assert.AreEqual(null, cbEntry.OldValue);
Assert.AreEqual(1, cbEntry.Value);
// Second update
cache.Put(key, 2);
Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
cbEntry = cbEvt.entries.Single();
Assert.IsTrue(cbEntry.HasOldValue);
Assert.IsTrue(cbEntry.HasValue);
Assert.AreEqual(key, cbEntry.Key);
Assert.AreEqual(1, cbEntry.OldValue);
Assert.AreEqual(2, cbEntry.Value);
// Remove
cache.Remove(key);
Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
cbEntry = cbEvt.entries.Single();
Assert.IsTrue(cbEntry.HasOldValue);
Assert.IsTrue(cbEntry.HasValue);
Assert.AreEqual(key, cbEntry.Key);
Assert.AreEqual(2, cbEntry.OldValue);
Assert.AreEqual(2, cbEntry.Value);
}
}
/// <summary>
/// Test whether buffer size works fine.
/// </summary>
[Test]
public void TestBufferSize()
{
// Put two remote keys in advance.
var rmtKeys = TestUtils.GetPrimaryKeys(cache2.Ignite, cache2.Name).Take(2).ToList();
ContinuousQuery<int, BinarizableEntry> qry = new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>());
qry.BufferSize = 2;
qry.TimeInterval = TimeSpan.FromMilliseconds(1000000);
using (cache1.QueryContinuous(qry))
{
qry.BufferSize = 2;
cache1.GetAndPut(rmtKeys[0], Entry(rmtKeys[0]));
CheckNoCallback(100);
cache1.GetAndPut(rmtKeys[1], Entry(rmtKeys[1]));
CallbackEvent evt;
Assert.IsTrue(CB_EVTS.TryTake(out evt, 1000));
Assert.AreEqual(2, evt.entries.Count);
var entryRmt0 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[0]); });
var entryRmt1 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[1]); });
Assert.AreEqual(rmtKeys[0], entryRmt0.Key);
Assert.IsNull(entryRmt0.OldValue);
Assert.AreEqual(Entry(rmtKeys[0]), entryRmt0.Value);
Assert.AreEqual(rmtKeys[1], entryRmt1.Key);
Assert.IsNull(entryRmt1.OldValue);
Assert.AreEqual(Entry(rmtKeys[1]), entryRmt1.Value);
}
cache1.Remove(rmtKeys[0]);
cache1.Remove(rmtKeys[1]);
}
/// <summary>
/// Test whether timeout works fine.
/// </summary>
[Test]
public void TestTimeout()
{
int key1 = PrimaryKey(cache1);
int key2 = PrimaryKey(cache2);
ContinuousQuery<int, BinarizableEntry> qry =
new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>());
qry.BufferSize = 2;
qry.TimeInterval = TimeSpan.FromMilliseconds(500);
using (cache1.QueryContinuous(qry))
{
// Put from local node.
cache1.GetAndPut(key1, Entry(key1));
CheckCallbackSingle(key1, null, Entry(key1), CacheEntryEventType.Created);
// Put from remote node.
cache1.GetAndPut(key2, Entry(key2));
CheckNoCallback(100);
CheckCallbackSingle(key2, null, Entry(key2), CacheEntryEventType.Created);
}
}
/// <summary>
/// Test whether nested Ignite API call from callback works fine.
/// </summary>
[Test]
public void TestNestedCallFromCallback()
{
var cache = cache1.WithKeepBinary<int, IBinaryObject>();
int key = PrimaryKey(cache1);
NestedCallListener cb = new NestedCallListener();
using (cache.QueryContinuous(new ContinuousQuery<int, IBinaryObject>(cb)))
{
cache1.GetAndPut(key, Entry(key));
cb.countDown.Wait();
}
cache.Remove(key);
}
/// <summary>
/// Tests the initial query.
/// </summary>
[Test]
public void TestInitialQuery()
{
// Scan query, GetAll
TestInitialQuery(new ScanQuery<int, BinarizableEntry>(new InitialQueryScanFilter()), cur => cur.GetAll());
// Scan query, iterator
TestInitialQuery(new ScanQuery<int, BinarizableEntry>(new InitialQueryScanFilter()), cur => cur.ToList());
// Sql query, GetAll
TestInitialQuery(new SqlQuery(typeof(BinarizableEntry), "val < 33"), cur => cur.GetAll());
// Sql query, iterator
TestInitialQuery(new SqlQuery(typeof(BinarizableEntry), "val < 33"), cur => cur.ToList());
// Text query, GetAll
TestInitialQuery(new TextQuery(typeof(BinarizableEntry), "1*"), cur => cur.GetAll());
// Text query, iterator
TestInitialQuery(new TextQuery(typeof(BinarizableEntry), "1*"), cur => cur.ToList());
// Test exception: invalid initial query
var ex = Assert.Throws<IgniteException>(
() => TestInitialQuery(new TextQuery(typeof (BinarizableEntry), "*"), cur => cur.GetAll()));
Assert.AreEqual("Cannot parse '*': '*' or '?' not allowed as first character in WildcardQuery", ex.Message);
}
/// <summary>
/// Tests the initial query.
/// </summary>
private void TestInitialQuery(QueryBase initialQry, Func<IQueryCursor<ICacheEntry<int, BinarizableEntry>>,
IEnumerable<ICacheEntry<int, BinarizableEntry>>> getAllFunc)
{
var qry = new ContinuousQuery<int, BinarizableEntry>(new Listener<BinarizableEntry>());
cache1.Put(11, Entry(11));
cache1.Put(12, Entry(12));
cache1.Put(33, Entry(33));
try
{
IContinuousQueryHandle<ICacheEntry<int, BinarizableEntry>> contQry;
using (contQry = cache1.QueryContinuous(qry, initialQry))
{
// Check initial query
var initialEntries =
getAllFunc(contQry.GetInitialQueryCursor()).Distinct().OrderBy(x => x.Key).ToList();
Assert.Throws<InvalidOperationException>(() => contQry.GetInitialQueryCursor());
Assert.AreEqual(2, initialEntries.Count);
for (int i = 0; i < initialEntries.Count; i++)
{
Assert.AreEqual(i + 11, initialEntries[i].Key);
Assert.AreEqual(i + 11, initialEntries[i].Value.val);
}
// Check continuous query
cache1.Put(44, Entry(44));
CheckCallbackSingle(44, null, Entry(44), CacheEntryEventType.Created);
}
Assert.Throws<ObjectDisposedException>(() => contQry.GetInitialQueryCursor());
contQry.Dispose(); // multiple dispose calls are ok
}
finally
{
cache1.Clear();
}
}
/// <summary>
/// Check single filter event.
/// </summary>
/// <param name="expKey">Expected key.</param>
/// <param name="expOldVal">Expected old value.</param>
/// <param name="expVal">Expected value.</param>
private void CheckFilterSingle(int expKey, BinarizableEntry expOldVal, BinarizableEntry expVal)
{
CheckFilterSingle(expKey, expOldVal, expVal, 1000);
ClearEvents();
}
/// <summary>
/// Check single filter event.
/// </summary>
/// <param name="expKey">Expected key.</param>
/// <param name="expOldVal">Expected old value.</param>
/// <param name="expVal">Expected value.</param>
/// <param name="timeout">Timeout.</param>
private static void CheckFilterSingle(int expKey, BinarizableEntry expOldVal, BinarizableEntry expVal, int timeout)
{
FilterEvent evt;
Assert.IsTrue(FILTER_EVTS.TryTake(out evt, timeout));
Assert.AreEqual(expKey, evt.entry.Key);
Assert.AreEqual(expOldVal, evt.entry.OldValue);
Assert.AreEqual(expVal, evt.entry.Value);
ClearEvents();
}
/// <summary>
/// Clears the events collection.
/// </summary>
private static void ClearEvents()
{
while (FILTER_EVTS.Count > 0)
FILTER_EVTS.Take();
}
/// <summary>
/// Ensure that no filter events are logged.
/// </summary>
/// <param name="timeout">Timeout.</param>
private static void CheckNoFilter(int timeout)
{
FilterEvent evt;
Assert.IsFalse(FILTER_EVTS.TryTake(out evt, timeout));
}
/// <summary>
/// Check single callback event.
/// </summary>
/// <param name="expKey">Expected key.</param>
/// <param name="expOldVal">Expected old value.</param>
/// <param name="expVal">Expected new value.</param>
/// <param name="expType">Expected type.</param>
/// <param name="timeout">Timeout.</param>
private static void CheckCallbackSingle(int expKey, BinarizableEntry expOldVal, BinarizableEntry expVal,
CacheEntryEventType expType, int timeout = 1000)
{
CallbackEvent evt;
Assert.IsTrue(CB_EVTS.TryTake(out evt, timeout));
Assert.AreEqual(0, CB_EVTS.Count);
var e = evt.entries.Single();
Assert.AreEqual(expKey, e.Key);
Assert.AreEqual(expOldVal, e.OldValue);
Assert.AreEqual(expVal, e.Value);
Assert.AreEqual(expType, e.EventType);
}
/// <summary>
/// Ensure that no callback events are logged.
/// </summary>
/// <param name="timeout">Timeout.</param>
private void CheckNoCallback(int timeout)
{
CallbackEvent evt;
Assert.IsFalse(CB_EVTS.TryTake(out evt, timeout));
}
/// <summary>
/// Create entry.
/// </summary>
/// <param name="val">Value.</param>
/// <returns>Entry.</returns>
private static BinarizableEntry Entry(int val)
{
return new BinarizableEntry(val);
}
/// <summary>
/// Get primary key for cache.
/// </summary>
/// <param name="cache">Cache.</param>
/// <returns>Primary key.</returns>
private static int PrimaryKey<T>(ICache<int, T> cache)
{
return TestUtils.GetPrimaryKey(cache.Ignite, cache.Name);
}
/// <summary>
/// Creates object-typed event.
/// </summary>
private static ICacheEntryEvent<object, object> CreateEvent<T, V>(ICacheEntryEvent<T,V> e)
{
switch (e.EventType)
{
case CacheEntryEventType.Created:
return new CacheEntryCreateEvent<object, object>(e.Key, e.Value);
case CacheEntryEventType.Updated:
return new CacheEntryUpdateEvent<object, object>(e.Key, e.OldValue, e.Value);
default:
return new CacheEntryRemoveEvent<object, object>(e.Key, e.OldValue);
}
}
/// <summary>
/// Binarizable entry.
/// </summary>
public class BinarizableEntry
{
/** Value. */
public readonly int val;
/** <inheritDot /> */
public override int GetHashCode()
{
return val;
}
/// <summary>
/// Constructor.
/// </summary>
/// <param name="val">Value.</param>
public BinarizableEntry(int val)
{
this.val = val;
}
/** <inheritDoc /> */
public override bool Equals(object obj)
{
return obj != null && obj is BinarizableEntry && ((BinarizableEntry)obj).val == val;
}
/** <inheritDoc /> */
public override string ToString()
{
return string.Format("BinarizableEntry [Val: {0}]", val);
}
}
/// <summary>
/// Abstract filter.
/// </summary>
[Serializable]
public abstract class AbstractFilter<V> : ICacheEntryEventFilter<int, V>
{
/** Result. */
public static volatile bool res = true;
/** Throw error on invocation. */
public static volatile bool err;
/** Throw error during marshalling. */
public static volatile bool marshErr;
/** Throw error during unmarshalling. */
public static volatile bool unmarshErr;
/** Grid. */
[InstanceResource]
public IIgnite ignite;
/** <inheritDoc /> */
public bool Evaluate(ICacheEntryEvent<int, V> evt)
{
if (err)
throw new Exception("Filter error.");
FILTER_EVTS.Add(new FilterEvent(ignite, CreateEvent(evt)));
return res;
}
}
/// <summary>
/// Filter which cannot be serialized.
/// </summary>
public class LocalFilter : AbstractFilter<BinarizableEntry>, IBinarizable
{
/** <inheritDoc /> */
public void WriteBinary(IBinaryWriter writer)
{
throw new BinaryObjectException("Expected");
}
/** <inheritDoc /> */
public void ReadBinary(IBinaryReader reader)
{
throw new BinaryObjectException("Expected");
}
}
/// <summary>
/// Binarizable filter.
/// </summary>
public class BinarizableFilter : AbstractFilter<BinarizableEntry>, IBinarizable
{
/** <inheritDoc /> */
public void WriteBinary(IBinaryWriter writer)
{
if (marshErr)
throw new Exception("Filter marshalling error.");
}
/** <inheritDoc /> */
public void ReadBinary(IBinaryReader reader)
{
if (unmarshErr)
throw new Exception("Filter unmarshalling error.");
}
}
/// <summary>
/// Serializable filter.
/// </summary>
[Serializable]
public class SerializableFilter : AbstractFilter<BinarizableEntry>, ISerializable
{
/// <summary>
/// Constructor.
/// </summary>
public SerializableFilter()
{
// No-op.
}
/// <summary>
/// Serialization constructor.
/// </summary>
/// <param name="info">Info.</param>
/// <param name="context">Context.</param>
protected SerializableFilter(SerializationInfo info, StreamingContext context)
{
if (unmarshErr)
throw new Exception("Filter unmarshalling error.");
}
/** <inheritDoc /> */
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
if (marshErr)
throw new Exception("Filter marshalling error.");
}
}
/// <summary>
/// Filter for "keep-binary" scenario.
/// </summary>
public class KeepBinaryFilter : AbstractFilter<IBinaryObject>
{
// No-op.
}
/// <summary>
/// Listener.
/// </summary>
public class Listener<V> : ICacheEntryEventListener<int, V>
{
[InstanceResource]
public IIgnite ignite;
/** <inheritDoc /> */
public void OnEvent(IEnumerable<ICacheEntryEvent<int, V>> evts)
{
CB_EVTS.Add(new CallbackEvent(evts.Select(CreateEvent).ToList()));
}
}
/// <summary>
/// Listener with nested Ignite API call.
/// </summary>
public class NestedCallListener : ICacheEntryEventListener<int, IBinaryObject>
{
/** Event. */
public readonly CountdownEvent countDown = new CountdownEvent(1);
public void OnEvent(IEnumerable<ICacheEntryEvent<int, IBinaryObject>> evts)
{
foreach (ICacheEntryEvent<int, IBinaryObject> evt in evts)
{
IBinaryObject val = evt.Value;
IBinaryType meta = val.GetBinaryType();
Assert.AreEqual(typeof(BinarizableEntry).FullName, meta.TypeName);
}
countDown.Signal();
}
}
/// <summary>
/// Filter event.
/// </summary>
public class FilterEvent
{
/** Grid. */
public IIgnite ignite;
/** Entry. */
public ICacheEntryEvent<object, object> entry;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="ignite">Grid.</param>
/// <param name="entry">Entry.</param>
public FilterEvent(IIgnite ignite, ICacheEntryEvent<object, object> entry)
{
this.ignite = ignite;
this.entry = entry;
}
}
/// <summary>
/// Callbakc event.
/// </summary>
public class CallbackEvent
{
/** Entries. */
public ICollection<ICacheEntryEvent<object, object>> entries;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="entries">Entries.</param>
public CallbackEvent(ICollection<ICacheEntryEvent<object, object>> entries)
{
this.entries = entries;
}
}
/// <summary>
/// ScanQuery filter for InitialQuery test.
/// </summary>
[Serializable]
private class InitialQueryScanFilter : ICacheEntryFilter<int, BinarizableEntry>
{
/** <inheritdoc /> */
public bool Invoke(ICacheEntry<int, BinarizableEntry> entry)
{
return entry.Key < 33;
}
}
}
}