| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.IO; |
| using Xunit; |
| using System.Diagnostics; |
| using System.Threading; |
| using Xunit.Abstractions; |
| |
| namespace Apache.Geode.Client.IntegrationTests |
| { |
| public class MyOrder : IPdxSerializable |
| { |
| private const string ORDER_ID_KEY_ = "order_id"; |
| private const string NAME_KEY_ = "name"; |
| private const string QUANTITY_KEY_ = "quantity"; |
| public long OrderId { get; set; } |
| public string Name { get; set; } |
| public short Quantity { get; set; } |
| // A default constructor is required for deserialization |
| public MyOrder() { } |
| public MyOrder(int orderId, string name, short quantity) |
| { |
| OrderId = orderId; |
| Name = name; |
| Quantity = quantity; |
| } |
| public override string ToString() |
| { |
| return string.Format("Order: [{0}, {1}, {2}]", OrderId, Name, Quantity); |
| } |
| public void ToData(IPdxWriter output) |
| { |
| output.WriteLong(ORDER_ID_KEY_, OrderId); |
| output.MarkIdentityField(ORDER_ID_KEY_); |
| output.WriteString(NAME_KEY_, Name); |
| output.MarkIdentityField(NAME_KEY_); |
| output.WriteInt(QUANTITY_KEY_, Quantity); |
| output.MarkIdentityField(QUANTITY_KEY_); |
| } |
| public void FromData(IPdxReader input) |
| { |
| OrderId = input.ReadLong(ORDER_ID_KEY_); |
| Name = input.ReadString(NAME_KEY_); |
| Quantity = (short)input.ReadInt(QUANTITY_KEY_); |
| } |
| public static IPdxSerializable CreateDeserializable() |
| { |
| return new MyOrder(); |
| } |
| } |
| |
| public abstract class CqListener<TKey, TResult> : ICqListener<TKey, TResult> |
| { |
| public AutoResetEvent RegionClearEvent { get; private set; } |
| public AutoResetEvent CreatedEvent { get; private set; } |
| public AutoResetEvent UpdatedEvent { get; private set; } |
| public AutoResetEvent DestroyedNonNullEvent { get; private set; } |
| public AutoResetEvent DestroyedNullEvent { get; private set; } |
| public AutoResetEvent InvalidatedEvent { get; private set; } |
| public bool ReceivedUnknownEventType { get; internal set; } |
| |
| public CqListener() |
| { |
| CreatedEvent = new AutoResetEvent(false); |
| UpdatedEvent = new AutoResetEvent(false); |
| DestroyedNullEvent = new AutoResetEvent(false); |
| DestroyedNonNullEvent = new AutoResetEvent(false); |
| RegionClearEvent = new AutoResetEvent(false); |
| InvalidatedEvent = new AutoResetEvent(false); |
| ReceivedUnknownEventType = false; |
| } |
| |
| public abstract void OnEvent(CqEvent<TKey, TResult> ev); |
| |
| public virtual void OnError(CqEvent<TKey, TResult> ev) |
| { |
| } |
| public virtual void Close() |
| { |
| |
| } |
| } |
| |
| public class PdxCqListener<TKey, TResult> : CqListener<TKey, TResult> |
| { |
| public override void OnEvent(CqEvent<TKey, TResult> ev) |
| { |
| Debug.WriteLine("PdxCqListener::OnEvent called"); |
| var val = ev.getNewValue() as MyOrder; |
| TKey key = ev.getKey(); |
| |
| switch (ev.getQueryOperation()) |
| { |
| case CqOperation.OP_TYPE_REGION_CLEAR: |
| RegionClearEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_CREATE: |
| CreatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_UPDATE: |
| UpdatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_INVALIDATE: |
| InvalidatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_DESTROY: |
| if (val == null) |
| { |
| DestroyedNullEvent.Set(); |
| } |
| else |
| { |
| DestroyedNonNullEvent.Set(); |
| } |
| break; |
| default: |
| ReceivedUnknownEventType = true; |
| break; |
| } |
| } |
| } |
| |
| public class DataCqListener<TKey, TResult> : CqListener<TKey, TResult> |
| { |
| public override void OnEvent(CqEvent<TKey, TResult> ev) |
| { |
| Debug.WriteLine("CqListener::OnEvent called"); |
| var val = ev.getNewValue() as Position; |
| TKey key = ev.getKey(); |
| |
| switch (ev.getQueryOperation()) |
| { |
| case CqOperation.OP_TYPE_REGION_CLEAR: |
| RegionClearEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_CREATE: |
| CreatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_UPDATE: |
| UpdatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_INVALIDATE: |
| InvalidatedEvent.Set(); |
| break; |
| case CqOperation.OP_TYPE_DESTROY: |
| if (val == null) |
| { |
| DestroyedNullEvent.Set(); |
| } |
| else |
| { |
| DestroyedNonNullEvent.Set(); |
| } |
| break; |
| default: |
| ReceivedUnknownEventType = true; |
| break; |
| } |
| } |
| } |
| |
| [Trait("Category", "Integration")] |
| public class CqOperationTest : TestBase |
| { |
| private static int waitInterval_ = 1000; |
| |
| public CqOperationTest(ITestOutputHelper testOutputHelper) : base(testOutputHelper) |
| { |
| } |
| |
| [Fact] |
| public void PdxSerializableNotificationsHaveCorrectValues() |
| { |
| using (var cluster = new Cluster(output, CreateTestCaseDirectoryName(), 1, 1)) |
| { |
| Assert.True(cluster.Start()); |
| Assert.Equal(0, cluster.Gfsh.create() |
| .region() |
| .withName("cqTestRegion") |
| .withType("REPLICATE") |
| .execute()); |
| |
| var cache = cluster.CreateCache(); |
| |
| cache.TypeRegistry.RegisterPdxType(MyOrder.CreateDeserializable); |
| |
| var pool = cluster.ApplyLocators(cache.GetPoolFactory()) |
| .SetSubscriptionEnabled(true) |
| .Create("pool"); |
| |
| var regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY) |
| .SetPoolName("pool"); |
| |
| var region = regionFactory.Create<string, MyOrder>("cqTestRegion"); |
| |
| var queryService = pool.GetQueryService(); |
| var cqAttributesFactory = new CqAttributesFactory<string, MyOrder>(); |
| var cqListener = new PdxCqListener<string, MyOrder>(); |
| cqAttributesFactory.AddCqListener(cqListener); |
| var cqAttributes = cqAttributesFactory.Create(); |
| |
| var query = queryService.NewCq("MyCq", "SELECT * FROM /cqTestRegion WHERE quantity > 30", cqAttributes, false); |
| Debug.WriteLine("Executing continuous query"); |
| query.Execute(); |
| |
| Debug.WriteLine("Putting and changing Position objects in the region"); |
| var order1 = new MyOrder(1, "product x", 23); |
| var order2 = new MyOrder(2, "product y", 37); |
| var order3 = new MyOrder(3, "product z", 101); |
| |
| region.Put("order1", order1); |
| |
| region.Put("order2", order2); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| order1.Quantity = 60; |
| region.Put("order1", order1); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| order2.Quantity = 45; |
| region.Put("order2", order2); |
| Assert.True(cqListener.UpdatedEvent.WaitOne(waitInterval_), "Didn't receive expected UPDATE event"); |
| |
| order2.Quantity = 11; |
| region.Put("order2", order2); |
| Assert.True(cqListener.DestroyedNonNullEvent.WaitOne(waitInterval_), "Didn't receive expected DESTROY event"); |
| |
| region.Remove("order1"); |
| Assert.True(cqListener.DestroyedNullEvent.WaitOne(waitInterval_), "Didn't receive expected DESTROY event"); |
| |
| region.Put("order3", order3); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| region.Clear(); |
| Assert.True(cqListener.RegionClearEvent.WaitOne(waitInterval_), "Didn't receive expected CLEAR event"); |
| |
| Assert.False(cqListener.ReceivedUnknownEventType, "An unknown event was received by CQ listener"); |
| |
| cache.Close(); |
| } |
| } |
| |
| [Fact] |
| public void DataSerializableNotificationsHaveCorrectValues() |
| { |
| using (var cluster = new Cluster(output, CreateTestCaseDirectoryName(), 1, 1)) |
| { |
| Assert.True(cluster.Start()); |
| Assert.Equal(0, cluster.Gfsh.deploy() |
| .withJar(Config.JavaobjectJarPath) |
| .execute()); |
| Assert.Equal(0, cluster.Gfsh.create() |
| .region() |
| .withName("cqTestRegion") |
| .withType("REPLICATE") |
| .execute()); |
| Assert.Equal(0, cluster.Gfsh.executeFunction() |
| .withId("InstantiateDataSerializable") |
| .withMember("DataSerializableNotificationsHaveCorrectValues_server_0") |
| .execute()); |
| |
| var cache = cluster.CreateCache(); |
| |
| cache.TypeRegistry.RegisterType(Position.CreateDeserializable, 22); |
| |
| var pool = cluster.ApplyLocators(cache.GetPoolFactory()) |
| .SetSubscriptionEnabled(true) |
| .Create("pool"); |
| |
| var regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY) |
| .SetPoolName("pool"); |
| |
| var region = regionFactory.Create<string, Position>("cqTestRegion"); |
| |
| var queryService = pool.GetQueryService(); |
| var cqAttributesFactory = new CqAttributesFactory<string, Position>(); |
| var cqListener = new DataCqListener<string, Position>(); |
| cqAttributesFactory.AddCqListener(cqListener); |
| var cqAttributes = cqAttributesFactory.Create(); |
| |
| var query = queryService.NewCq("MyCq", "SELECT * FROM /cqTestRegion WHERE sharesOutstanding > 30", cqAttributes, false); |
| Debug.WriteLine("Executing continuous query"); |
| query.Execute(); |
| |
| Debug.WriteLine("Putting and changing Position objects in the region"); |
| var order1 = new Position("GOOG", 23); |
| var order2 = new Position("IBM", 37); |
| var order3 = new Position("PVTL", 101); |
| |
| region.Put("order1", order1); |
| var Value = region["order1"]; |
| |
| region.Put("order2", order2); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| order1.SharesOutstanding = 55; |
| region.Put("order1", order1); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| order2.SharesOutstanding = 77; |
| region.Put("order2", order2); |
| Assert.True(cqListener.UpdatedEvent.WaitOne(waitInterval_), "Didn't receive expected UPDATE event"); |
| |
| order2.SharesOutstanding = 11; |
| region.Put("order2", order2); |
| Assert.True(cqListener.DestroyedNonNullEvent.WaitOne(waitInterval_), "Didn't receive expected DESTROY event"); |
| |
| region.Remove("order1"); |
| Assert.True(cqListener.DestroyedNullEvent.WaitOne(waitInterval_), "Didn't receive expected DESTROY event"); |
| |
| region.Put("order3", order3); |
| Assert.True(cqListener.CreatedEvent.WaitOne(waitInterval_), "Didn't receive expected CREATE event"); |
| |
| region.Clear(); |
| Assert.True(cqListener.RegionClearEvent.WaitOne(waitInterval_), "Didn't receive expected CLEAR event"); |
| |
| Assert.False(cqListener.ReceivedUnknownEventType, "An unknown event was received by CQ listener"); |
| |
| cache.Close(); |
| } |
| } |
| } |
| } |