| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Threading; |
| |
| namespace Apache.Geode.Client.UnitTests |
| { |
| using NUnit.Framework; |
| using Apache.Geode.DUnitFramework; |
| using Apache.Geode.Client; |
| |
| |
| using AssertionException = Apache.Geode.Client.AssertionException; |
| |
| [Ignore("broken")] |
| [TestFixture] |
| [Category("group2")] |
| [Category("unicast_only")] |
| [Category("generics")] |
| public class ThinClientDurableTests : ThinClientRegionSteps |
| { |
| #region Private members |
| |
| private UnitProcess m_client1, m_client2, m_feeder; |
| private string[] m_regexes = { "D-Key-.*", "Key-.*" }; |
| private string[] m_mixKeys = { "Key-1", "D-Key-1", "L-Key", "LD-Key" }; |
| private string[] keys = { "Key-1", "Key-2", "Key-3", "Key-4", "Key-5" }; |
| |
| private static string DurableClientId1 = "DurableClientId1"; |
| private static string DurableClientId2 = "DurableClientId2"; |
| |
| private static DurableListener<object, object> m_checker1, m_checker2; |
| |
| #endregion |
| |
| protected override ClientBase[] GetClients() |
| { |
| m_client1 = new UnitProcess(); |
| m_client2 = new UnitProcess(); |
| m_feeder = new UnitProcess(); |
| return new ClientBase[] { m_client1, m_client2, m_feeder }; |
| } |
| |
| [TestFixtureTearDown] |
| public override void EndTests() |
| { |
| CacheHelper.StopJavaServers(); |
| base.EndTests(); |
| } |
| |
| [TearDown] |
| public override void EndTest() |
| { |
| try |
| { |
| m_client1.Call(CacheHelper.Close); |
| m_client2.Call(CacheHelper.Close); |
| m_feeder.Call(CacheHelper.Close); |
| CacheHelper.ClearEndpoints(); |
| CacheHelper.ClearLocators(); |
| } |
| finally |
| { |
| CacheHelper.StopJavaServers(); |
| } |
| base.EndTest(); |
| } |
| |
| #region Common Functions |
| |
| public void InitFeeder(string locators, int redundancyLevel) |
| { |
| CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false); |
| CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, null, |
| locators, "__TESTPOOL1_", false); |
| } |
| |
| public void InitFeeder2(string locators, int redundancyLevel) |
| { |
| CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false); |
| CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, null, |
| locators, "__TESTPOOL1_", false); |
| |
| CacheHelper.CreatePool<object, object>("__TESTPOOL2_", locators, (string)null, redundancyLevel, false); |
| CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[1], false, true, null, |
| locators, "__TESTPOOL2_", false); |
| } |
| |
| public void InitDurableClientWithTwoPools(string locators, |
| int redundancyLevel, string durableClientId, TimeSpan durableTimeout, int expectedQ0, int expectedQ1) |
| { |
| DurableListener<object, object> checker = null; |
| CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel, |
| durableClientId, durableTimeout, TimeSpan.FromSeconds(35), "__TESTPOOL1_"); |
| CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL1_", true); |
| |
| CacheHelper.InitConfigForDurable_Pool2(locators, redundancyLevel, |
| durableClientId, durableTimeout, TimeSpan.FromSeconds(35), "__TESTPOOL2_"); |
| CacheHelper.CreateTCRegion_Pool(RegionNames[1], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL2_", true); |
| |
| IRegion<object, object> region0 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[1]); |
| |
| try |
| { |
| region0.GetSubscriptionService().RegisterAllKeys(true); |
| region1.GetSubscriptionService().RegisterAllKeys(true); |
| } |
| catch (Exception other) |
| { |
| Assert.Fail("RegisterAllKeys threw unexpected exception: {0}", other.Message); |
| } |
| |
| Pool pool0 = CacheHelper.DCache.GetPoolManager().Find(region0.Attributes.PoolName); |
| int pendingEventCount0 = pool0.PendingEventCount; |
| Util.Log("pendingEventCount0 for pool = {0} {1} ", pendingEventCount0, region0.Attributes.PoolName); |
| string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedQ0, pendingEventCount0); |
| Assert.AreEqual(expectedQ0, pendingEventCount0, msg); |
| |
| Pool pool1 = CacheHelper.DCache.GetPoolManager().Find(region1.Attributes.PoolName); |
| int pendingEventCount1 = pool1.PendingEventCount; |
| Util.Log("pendingEventCount1 for pool = {0} {1} ", pendingEventCount1, region1.Attributes.PoolName); |
| string msg1 = string.Format("Expected Value ={0}, Actual = {1}", expectedQ1, pendingEventCount1); |
| Assert.AreEqual(expectedQ1, pendingEventCount1, msg1); |
| |
| CacheHelper.DCache.ReadyForEvents(); |
| Thread.Sleep(10000); |
| |
| CacheHelper.DCache.Close(true); |
| } |
| |
| public void ClearChecker(int client) |
| { |
| if (client == 1) |
| { |
| ThinClientDurableTests.m_checker1 = null; |
| } |
| else // client == 2 |
| { |
| ThinClientDurableTests.m_checker2 = null; |
| } |
| } |
| |
| public void InitDurableClient(int client, string locators, int redundancyLevel, |
| string durableClientId, TimeSpan durableTimeout) |
| { |
| // Create DurableListener for first time and use same afterward. |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| if (ThinClientDurableTests.m_checker1 == null) |
| { |
| ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| if (ThinClientDurableTests.m_checker2 == null) |
| { |
| ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, |
| durableClientId, durableTimeout); |
| CacheHelper.CreateTCRegion_Pool<object, object>(RegionNames[0], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL1_", true); |
| |
| CacheHelper.DCache.ReadyForEvents(); |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); |
| region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false); |
| //CacheableKey[] ldkeys = { new CacheableString(m_mixKeys[3]) }; |
| ICollection<object> lkeys = new List<object>(); |
| lkeys.Add((object)m_mixKeys[3]); |
| region1.GetSubscriptionService().RegisterKeys(lkeys, true, false); |
| |
| ICollection<object> ldkeys = new List<object>(); ; |
| ldkeys.Add((object)m_mixKeys[2]); |
| region1.GetSubscriptionService().RegisterKeys(ldkeys, false, false); |
| } |
| |
| public void InitClientXml(string cacheXml) |
| { |
| CacheHelper.InitConfig(cacheXml); |
| } |
| |
| public void ReadyForEvents() |
| { |
| CacheHelper.DCache.ReadyForEvents(); |
| } |
| |
| public void PendingEventCount(IRegion<object, object> region, int expectedPendingQSize, bool exception) |
| { |
| Util.Log("PendingEventCount regionName = {0} ", region); |
| string poolName = region.Attributes.PoolName; |
| if (poolName != null) |
| { |
| Util.Log("PendingEventCount poolName = {0} ", poolName); |
| Pool pool = CacheHelper.DCache.GetPoolManager().Find(poolName); |
| if (exception) |
| { |
| try |
| { |
| int pendingEventCount = pool.PendingEventCount; |
| Util.Log("PendingEventCount Should have got exception "); |
| Assert.Fail("PendingEventCount Should have got exception"); |
| } |
| catch (IllegalStateException ex) |
| { |
| Util.Log("Got expected exception for PendingEventCount {0} ", ex.Message); |
| } |
| } |
| else |
| { |
| int pendingEventCount = pool.PendingEventCount; |
| Util.Log("pendingEventCount = {0} ", pendingEventCount); |
| string msg = string.Format("Expected Value ={0}, Actual = {1}", expectedPendingQSize, pendingEventCount); |
| Assert.AreEqual(expectedPendingQSize, pendingEventCount, msg); |
| } |
| } |
| } |
| |
| public void FeederUpdate(int value, int sleep) |
| { |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| |
| region1[m_mixKeys[0]] = value; |
| Thread.Sleep(sleep); |
| region1[m_mixKeys[1]] = value; |
| Thread.Sleep(sleep); |
| region1[m_mixKeys[2]] = value; |
| Thread.Sleep(sleep); |
| region1[m_mixKeys[3]] = value; |
| Thread.Sleep(sleep); |
| |
| region1.Remove(m_mixKeys[0]); |
| Thread.Sleep(sleep); |
| region1.Remove(m_mixKeys[1]); |
| Thread.Sleep(sleep); |
| region1.Remove(m_mixKeys[2]); |
| Thread.Sleep(sleep); |
| region1.Remove(m_mixKeys[3]); |
| Thread.Sleep(sleep); |
| } |
| |
| public void FeederUpdate2(int pool1, int pool2) |
| { |
| IRegion<object, object> region0 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[1]); |
| |
| for (int i = 0; i < pool1; i++) |
| { |
| region0[i] = i; |
| } |
| |
| for (int i = 0; i < pool2; i++) |
| { |
| region1[i] = i; |
| } |
| } |
| |
| public void ClientDown(bool keepalive) |
| { |
| if (keepalive) |
| { |
| CacheHelper.CloseKeepAlive(); |
| } |
| else |
| { |
| CacheHelper.Close(); |
| } |
| } |
| |
| public void CrashClient() |
| { |
| // TODO: crash client here. |
| } |
| |
| public void KillServer() |
| { |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| } |
| |
| public delegate void KillServerDelegate(); |
| |
| #endregion |
| |
| |
| public void VerifyTotal(int client, int keys, int total) |
| { |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| |
| if (checker != null) |
| { |
| checker.validate(keys, total); |
| } |
| else |
| { |
| Assert.Fail("Checker is NULL!"); |
| } |
| } |
| |
| public void VerifyBasic(int client, int keyCount, int eventCount, int durableValue, int nonDurableValue) |
| {//1 4 8 1 1 |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| |
| if (checker != null) |
| { |
| try |
| { |
| checker.validateBasic(keyCount, eventCount, durableValue, nonDurableValue);//4 8 1 1 |
| } |
| catch (AssertionException e) |
| { |
| Util.Log("VERIFICATION FAILED for client {0}: {1} ", client, e); |
| throw e; |
| } |
| } |
| else |
| { |
| Assert.Fail("Checker is NULL!"); |
| } |
| } |
| |
| #region Basic Durable Test |
| |
| |
| void runDurableAndNonDurableBasic() |
| { |
| CacheHelper.SetupJavaServers(true, |
| "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml"); |
| CacheHelper.StartJavaLocator(1, "GFELOC"); |
| |
| for (int redundancy = 0; redundancy <= 1; redundancy++) |
| { |
| for (int closeType = 1; closeType <= 2; closeType++) |
| { |
| for (int downtime = 0; downtime <= 1; downtime++) // downtime updates |
| { |
| Util.Log("Starting loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime); |
| |
| CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); |
| Util.Log("Cacheserver 1 started."); |
| |
| if (redundancy == 1) |
| { |
| CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); |
| Util.Log("Cacheserver 2 started."); |
| } |
| |
| m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); |
| Util.Log("Feeder initialized."); |
| |
| m_client1.Call(ClearChecker, 1); |
| m_client2.Call(ClearChecker, 2); |
| |
| m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, TimeSpan.FromSeconds(300)); |
| m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, TimeSpan.FromSeconds(3)); |
| |
| Util.Log("Clients initialized."); |
| |
| m_feeder.Call(FeederUpdate, 1, 10); |
| |
| Util.Log("Feeder performed first update."); |
| Thread.Sleep(45000); // wait for HA Q to drain and notify ack to go out. |
| |
| switch (closeType) |
| { |
| case 1: |
| |
| m_client1.Call(ClientDown, true); |
| m_client2.Call(ClientDown, true); |
| |
| Util.Log("Clients downed with keepalive true."); |
| break; |
| case 2: |
| |
| m_client1.Call(ClientDown, false); |
| m_client2.Call(ClientDown, false); |
| |
| Util.Log("Clients downed with keepalive false."); |
| break; |
| case 3: |
| |
| m_client1.Call(CrashClient); |
| |
| m_client2.Call(CrashClient); |
| |
| Util.Log("Clients downed as crash."); |
| break; |
| default: |
| break; |
| } |
| |
| if (downtime == 1) |
| { |
| m_feeder.Call(FeederUpdate, 2, 10); |
| |
| Util.Log("Feeder performed update during downtime."); |
| Thread.Sleep(20000); // wait for HA Q to drain and notify ack to go out. |
| } |
| |
| m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, redundancy, DurableClientId1, TimeSpan.FromSeconds(300)); |
| |
| // Sleep for 45 seconds since durable timeout is 30 seconds so that client2 times out |
| Thread.Sleep(45000); |
| |
| m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, redundancy, DurableClientId2, TimeSpan.FromSeconds(30)); |
| |
| Util.Log("Clients brought back up."); |
| |
| if (closeType != 2 && downtime == 1) |
| { |
| m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1); |
| |
| m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); |
| |
| } |
| else |
| { |
| |
| m_client1.Call(VerifyBasic, 1, 4, 8, 1, 1); |
| |
| m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); |
| |
| } |
| |
| Util.Log("Verification completed."); |
| |
| m_feeder.Call(ClientDown, false); |
| |
| m_client1.Call(ClientDown, false); |
| |
| m_client2.Call(ClientDown, false); |
| |
| Util.Log("Feeder and Clients closed."); |
| |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| |
| if (redundancy == 1) |
| { |
| CacheHelper.StopJavaServer(2); |
| Util.Log("Cacheserver 2 stopped."); |
| } |
| |
| Util.Log("Completed loop with closeType = {0}, redundancy = {1}, downtime = {2} ", closeType, redundancy, downtime); |
| |
| } // end for int downtime |
| } // end for int closeType |
| } // end for int redundancy |
| CacheHelper.StopJavaLocator(1); |
| } |
| |
| // Basic Durable Test to check durable event recieving for different combination |
| // of Close type ( Keep Alive = true / false ) , Intermediate update and rudundancy |
| |
| [Test] |
| public void DurableAndNonDurableBasic() |
| { |
| runDurableAndNonDurableBasic(); |
| } // end [Test] DurableAndNonDurableBasic |
| |
| #endregion |
| |
| #region Durable Intrest Test |
| |
| public void InitDurableClientRemoveInterest(int client, string locators, |
| int redundancyLevel, string durableClientId, TimeSpan durableTimeout) |
| { |
| // Client Registered Durable Intrest on two keys. We need to unregister them all here. |
| |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| if (ThinClientDurableTests.m_checker1 == null) |
| { |
| ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| if (ThinClientDurableTests.m_checker2 == null) |
| { |
| ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, |
| durableClientId, durableTimeout); |
| CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL1_", true); |
| |
| CacheHelper.DCache.ReadyForEvents(); |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| |
| // Unregister Regex only durable |
| region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); |
| region1.GetSubscriptionService().UnregisterRegex(m_regexes[0]); |
| |
| // Unregister list only durable |
| string[] ldkeys = new string[] { m_mixKeys[3] }; |
| region1.GetSubscriptionService().RegisterKeys(ldkeys, true, false); |
| region1.GetSubscriptionService().UnregisterKeys(ldkeys); |
| } |
| |
| public void InitDurableClientNoInterest(int client, string locators, |
| int redundancyLevel, string durableClientId, TimeSpan durableTimeout) |
| { |
| // we use "client" to either create a DurableListener or use the existing ones |
| // if the clients are initialized for the second time |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| if (ThinClientDurableTests.m_checker1 == null) |
| { |
| ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| if (ThinClientDurableTests.m_checker2 == null) |
| { |
| ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, |
| durableClientId, durableTimeout); |
| CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL1_", true); |
| CacheHelper.DCache.ReadyForEvents(); |
| } |
| |
| void runDurableInterest() |
| { |
| CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml"); |
| CacheHelper.StartJavaLocator(1, "GFELOC"); |
| Util.Log("Locator started"); |
| CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); |
| Util.Log("Cacheserver 1 started."); |
| |
| m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); |
| Util.Log("Feeder started."); |
| |
| m_client1.Call(ClearChecker, 1); |
| m_client2.Call(ClearChecker, 2); |
| m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, |
| 0, DurableClientId1, TimeSpan.FromSeconds(60)); |
| m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, |
| 0, DurableClientId2, TimeSpan.FromSeconds(60)); |
| Util.Log("Clients started."); |
| |
| m_feeder.Call(FeederUpdate, 1, 10); |
| Util.Log("Feeder performed first update."); |
| |
| Thread.Sleep(15000); |
| |
| m_client1.Call(ClientDown, true); |
| m_client2.Call(ClientDown, true); |
| Util.Log("Clients downed with keepalive true."); |
| |
| m_client1.Call(InitDurableClientNoInterest, 1, CacheHelper.Locators, |
| 0, DurableClientId1, TimeSpan.FromSeconds(60)); |
| Util.Log("Client 1 started with no interest."); |
| |
| m_client2.Call(InitDurableClientRemoveInterest, 2, CacheHelper.Locators, |
| 0, DurableClientId2, TimeSpan.FromSeconds(60)); |
| Util.Log("Client 2 started with remove interest."); |
| |
| m_feeder.Call(FeederUpdate, 2, 10); |
| Util.Log("Feeder performed second update."); |
| |
| Thread.Sleep(10000); |
| |
| // only durable Intrest will remain. |
| m_client1.Call(VerifyBasic, 1, 4, 12, 2, 1); |
| |
| // no second update should be recieved. |
| m_client2.Call(VerifyBasic, 2, 4, 8, 1, 1); |
| Util.Log("Verification completed."); |
| |
| m_feeder.Call(ClientDown, false); |
| m_client1.Call(ClientDown, false); |
| m_client2.Call(ClientDown, false); |
| Util.Log("Feeder and Clients closed."); |
| |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| |
| CacheHelper.StopJavaLocator(1); |
| Util.Log("Locator stopped"); |
| |
| CacheHelper.ClearEndpoints(); |
| CacheHelper.ClearLocators(); |
| } |
| |
| //This is to test whether durable registered intrests remains on reconnect. and |
| // Unregister works on reconnect. |
| |
| [Test] |
| public void DurableInterest() |
| { |
| runDurableInterest(); |
| } // end [Test] DurableInterest |
| #endregion |
| |
| #region Durable Failover Test |
| |
| |
| public void InitDurableClientForFailover(int client, string locators, |
| int redundancyLevel, string durableClientId, TimeSpan durableTimeout) |
| { |
| // we use "client" to either create a DurableListener or use the existing ones |
| // if the clients are initialized for the second time |
| DurableListener<object, object> checker = null; |
| if (client == 1) |
| { |
| if (ThinClientDurableTests.m_checker1 == null) |
| { |
| ThinClientDurableTests.m_checker1 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker1; |
| } |
| else // client == 2 |
| { |
| if (ThinClientDurableTests.m_checker2 == null) |
| { |
| ThinClientDurableTests.m_checker2 = DurableListener<object, object>.Create(); |
| } |
| checker = ThinClientDurableTests.m_checker2; |
| } |
| CacheHelper.InitConfigForDurable_Pool(locators, redundancyLevel, |
| durableClientId, durableTimeout, TimeSpan.FromSeconds(35)); |
| CacheHelper.CreateTCRegion_Pool(RegionNames[0], false, true, checker, |
| CacheHelper.Locators, "__TESTPOOL1_", true); |
| CacheHelper.DCache.ReadyForEvents(); |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(RegionNames[0]); |
| |
| try |
| { |
| region1.GetSubscriptionService().RegisterRegex(m_regexes[0], true); |
| region1.GetSubscriptionService().RegisterRegex(m_regexes[1], false); |
| } |
| catch (Exception other) |
| { |
| Assert.Fail("RegisterKeys threw unexpected exception: {0}", other.Message); |
| } |
| } |
| |
| public void FeederUpdateForFailover(string region, int value, int sleep) |
| { |
| //update only 2 keys. |
| IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(region); |
| |
| region1[m_mixKeys[0]] = value; |
| Thread.Sleep(sleep); |
| region1[m_mixKeys[1]] = value; |
| Thread.Sleep(sleep); |
| |
| } |
| |
| void runDurableFailover() |
| { |
| CacheHelper.SetupJavaServers(true, |
| "cacheserver_notify_subscription.xml", "cacheserver_notify_subscription2.xml"); |
| |
| CacheHelper.StartJavaLocator(1, "GFELOC"); |
| Util.Log("Locator started"); |
| |
| for (int clientDown = 0; clientDown <= 1; clientDown++) |
| { |
| for (int redundancy = 0; redundancy <= 1; redundancy++) |
| { |
| Util.Log("Starting loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy); |
| |
| CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); |
| Util.Log("Cacheserver 1 started."); |
| |
| m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); |
| Util.Log("Feeder started with redundancy level as 0."); |
| |
| m_client1.Call(ClearChecker, 1); |
| m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators, |
| redundancy, DurableClientId1, TimeSpan.FromSeconds(300)); |
| Util.Log("Client started with redundancy level as {0}.", redundancy); |
| |
| m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 1, 10); |
| Util.Log("Feeder updates 1 completed."); |
| |
| CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); |
| Util.Log("Cacheserver 2 started."); |
| |
| //Time for redundancy thread to detect. |
| Thread.Sleep(35000); |
| |
| if (clientDown == 1) |
| { |
| m_client1.Call(ClientDown, true); |
| } |
| |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| |
| //Time for failover |
| Thread.Sleep(5000); |
| |
| m_feeder.Call(FeederUpdateForFailover, RegionNames[0], 2, 10); |
| Util.Log("Feeder updates 2 completed."); |
| |
| //Restart Client |
| if (clientDown == 1) |
| { |
| m_client1.Call(InitDurableClientForFailover, 1, CacheHelper.Locators, |
| redundancy, DurableClientId1, TimeSpan.FromSeconds(300)); |
| Util.Log("Client Restarted with redundancy level as {0}.", redundancy); |
| } |
| |
| //Verify |
| if (clientDown == 1) |
| { |
| if (redundancy == 0) // Events missed |
| { |
| m_client1.Call(VerifyBasic, 1, 2, 2, 1, 1); |
| } |
| else // redundancy == 1 Only Durable Events should be recieved. |
| { |
| m_client1.Call(VerifyBasic, 1, 2, 3, 2, 1); |
| } |
| } |
| else // In normal failover all events should be recieved. |
| { |
| m_client1.Call(VerifyBasic, 1, 2, 4, 2, 2); |
| } |
| |
| Util.Log("Verification completed."); |
| |
| m_feeder.Call(ClientDown, false); |
| m_client1.Call(ClientDown, false); |
| Util.Log("Feeder and Client closed."); |
| |
| CacheHelper.StopJavaServer(2); |
| Util.Log("Cacheserver 2 stopped."); |
| |
| Util.Log("Completed loop with clientDown = {0}, redundancy = {1}", clientDown, redundancy); |
| }// for redundancy |
| } // for clientDown |
| CacheHelper.StopJavaLocator(1); |
| Util.Log("Locator stopped"); |
| |
| CacheHelper.ClearEndpoints(); |
| CacheHelper.ClearLocators(); |
| } |
| |
| void RunDurableClient(int expectedPendingQSize) |
| { |
| var pp = Properties<string, string>.Create(); |
| pp.Insert("durable-client-id", "DurableClientId"); |
| pp.Insert("durable-timeout", "30s"); |
| |
| var cacheFactory = new CacheFactory(pp); |
| var cache = cacheFactory.Create(); |
| cache.GetPoolFactory().SetSubscriptionEnabled(true); |
| cache.GetPoolFactory().SetSubscriptionAckInterval(TimeSpan.FromMilliseconds(5000)); |
| cache.GetPoolFactory().SetSubscriptionMessageTrackingTimeout(TimeSpan.FromMilliseconds(5000)); |
| Util.Log("Created the Geode Cache Programmatically"); |
| |
| RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.CACHING_PROXY); |
| IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck"); |
| Util.Log("Created the DistRegionAck Region Programmatically"); |
| |
| QueryService qService = cache.GetQueryService(); |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| |
| ICqListener<object, object> cqLstner = new MyCqListener1<object, object>(); |
| cqFac.AddCqListener(cqLstner); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| Util.Log("Attached CqListener"); |
| String query = "select * from /DistRegionAck"; |
| CqQuery<object, object> qry = qService.NewCq("MyCq", query, cqAttr, true); |
| Util.Log("Created new CqQuery"); |
| |
| qry.Execute(); |
| Util.Log("Executed new CqQuery"); |
| Thread.Sleep(10000); |
| |
| PendingEventCount(region, expectedPendingQSize, false); |
| |
| //Send ready for Event message to Server( only for Durable Clients ). |
| //Server will send queued events to client after recieving this. |
| cache.ReadyForEvents(); |
| |
| Util.Log("Sent ReadyForEvents message to server"); |
| Thread.Sleep(10000); |
| // Close the Geode Cache with keepalive = true. Server will queue events for |
| // durable registered keys and will deliver all events when client will reconnect |
| // within timeout period and send "readyForEvents()" |
| |
| PendingEventCount(region, 0, true); |
| |
| cache.Close(true); |
| |
| Util.Log("Closed the Geode Cache with keepalive as true"); |
| } |
| |
| void runDurableClientWithTwoPools() |
| { |
| CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml"); |
| CacheHelper.StartJavaLocator(1, "GFELOC"); |
| Util.Log("Locator started"); |
| CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); |
| Util.Log("Cacheserver 1 started."); |
| |
| m_feeder.Call(InitFeeder2, CacheHelper.Locators, 0); |
| Util.Log("Feeder started."); |
| |
| m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, TimeSpan.FromSeconds(30), -2, -2); |
| Util.Log("DurableClient with Two Pools Initialized"); |
| |
| m_feeder.Call(FeederUpdate2, 5, 10); |
| Util.Log("Feeder performed first update."); |
| Thread.Sleep(15000); |
| |
| m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, TimeSpan.FromSeconds(30), 6, 11); //+1 for marker, so 5+1, 10+1 etc |
| Util.Log("DurableClient with Two Pools after first update"); |
| |
| m_feeder.Call(FeederUpdate2, 10, 5); |
| Util.Log("Feeder performed second update."); |
| Thread.Sleep(15000); |
| |
| m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, TimeSpan.FromSeconds(30), 16, 16); |
| Util.Log("DurableClient with Two Pools after second update"); |
| |
| Thread.Sleep(45000); //45 > 30 secs. |
| m_client1.Call(InitDurableClientWithTwoPools, CacheHelper.Locators, 0, DurableClientId1, TimeSpan.FromSeconds(30), -1, -1); |
| Util.Log("DurableClient with Two Pools after timeout"); |
| |
| m_feeder.Call(ClientDown, false); |
| Util.Log("Feeder and Clients closed."); |
| |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| |
| CacheHelper.StopJavaLocator(1); |
| Util.Log("Locator stopped"); |
| |
| CacheHelper.ClearEndpoints(); |
| CacheHelper.ClearLocators(); |
| } |
| |
| void RunFeeder() |
| { |
| var cacheFactory = new CacheFactory(); |
| Util.Log("Feeder connected to the Geode Distributed System"); |
| |
| Cache cache = cacheFactory.Create(); |
| Util.Log("Created the Geode Cache"); |
| |
| RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY); |
| Util.Log("Created the RegionFactory"); |
| |
| // Create the Region Programmatically. |
| IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck"); |
| Util.Log("Created the Region Programmatically."); |
| |
| PendingEventCount(region, 0, true); |
| |
| for (int i = 0; i < 10; i++) |
| { |
| region[i] = i; |
| } |
| Thread.Sleep(10000); |
| Util.Log("put on 0-10 keys done."); |
| |
| // Close the Geode Cache |
| cache.Close(); |
| Util.Log("Closed the Geode Cache"); |
| } |
| |
| void RunFeeder1() |
| { |
| CacheFactory cacheFactory = new CacheFactory(); |
| Util.Log("Feeder connected to the Geode Distributed System"); |
| |
| Cache cache = cacheFactory.Create(); |
| Util.Log("Created the Geode Cache"); |
| |
| RegionFactory regionFactory = cache.CreateRegionFactory(RegionShortcut.PROXY); |
| Util.Log("Created the RegionFactory"); |
| |
| // Create the Region Programmatically. |
| IRegion<object, object> region = regionFactory.Create<object, object>("DistRegionAck"); |
| Util.Log("Created the Region Programmatically."); |
| |
| PendingEventCount(region, 0, true); |
| |
| for (int i = 10; i < 20; i++) |
| { |
| region[i] = i; |
| } |
| Thread.Sleep(10000); |
| Util.Log("put on 10-20 keys done."); |
| |
| // Close the Geode Cache |
| cache.Close(); |
| Util.Log("Closed the Geode Cache"); |
| } |
| |
| void VerifyEvents() |
| { |
| Util.Log("MyCqListener1.m_cntEvents = {0} ", MyCqListener1<object, object>.m_cntEvents); |
| Assert.AreEqual(MyCqListener1<object, object>.m_cntEvents, 20, "Incorrect events, expected 20"); |
| } |
| |
| void runCQDurable() |
| { |
| CacheHelper.SetupJavaServers(false, "serverDurableClient.xml"); |
| CacheHelper.StartJavaServer(1, "GFECS1"); |
| m_client1.Call(RunDurableClient, -2); // 1st time no Q, hence check -2 as PendingEventCount. |
| m_client2.Call(RunFeeder); |
| m_client1.Call(RunDurableClient, 10); |
| m_client2.Call(RunFeeder1); |
| m_client1.Call(RunDurableClient, 10); |
| m_client1.Call(VerifyEvents); |
| Thread.Sleep(45 * 1000); // sleep 45 secs > 30 secs, check -1 as PendingEventCount. |
| m_client1.Call(RunDurableClient, -1); |
| CacheHelper.StopJavaServer(1); |
| } |
| |
| [Test] |
| public void DurableFailover() |
| { |
| runDurableFailover(); |
| } // end [Test] DurableFailover |
| |
| [Test] |
| public void CQDurable() |
| { |
| runCQDurable(); |
| |
| runDurableClientWithTwoPools(); |
| } |
| #endregion |
| } |
| } |