| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Threading; |
| |
| namespace Apache.Geode.Client.UnitTests |
| { |
| using NUnit.Framework; |
| using Apache.Geode.DUnitFramework; |
| using Apache.Geode.Client.Tests; |
| using Apache.Geode.Client; |
| |
| [TestFixture] |
| [Category("group3")] |
| [Category("unicast_only")] |
| [Category("generics")] |
| |
| public class ThinClientCqPdxTest : ThinClientRegionSteps |
| { |
| public class MyCqListener<TKey, TResult> : ICqListener<TKey, TResult> |
| { |
| #region Private members |
| private bool m_failedOver = false; |
| private UInt32 m_eventCountBefore = 0; |
| private UInt32 m_errorCountBefore = 0; |
| private UInt32 m_eventCountAfter = 0; |
| private UInt32 m_errorCountAfter = 0; |
| |
| #endregion |
| |
| #region Public accessors |
| |
| public void failedOver() |
| { |
| m_failedOver = true; |
| } |
| public UInt32 getEventCountBefore() |
| { |
| return m_eventCountBefore; |
| } |
| public UInt32 getErrorCountBefore() |
| { |
| return m_errorCountBefore; |
| } |
| public UInt32 getEventCountAfter() |
| { |
| return m_eventCountAfter; |
| } |
| public UInt32 getErrorCountAfter() |
| { |
| return m_errorCountAfter; |
| } |
| #endregion |
| |
| public virtual void OnEvent(CqEvent<TKey, TResult> ev) |
| { |
| Util.Log("MyCqListener::OnEvent called"); |
| if (m_failedOver == true) |
| m_eventCountAfter++; |
| else |
| m_eventCountBefore++; |
| |
| //ISerializable val = ev.getNewValue(); |
| //ICacheableKey key = ev.getKey(); |
| |
| TResult val = (TResult)ev.getNewValue(); |
| /*ICacheableKey*/ |
| TKey key = ev.getKey(); |
| |
| CqOperation opType = ev.getQueryOperation(); |
| //CacheableString keyS = key as CacheableString; |
| string keyS = key.ToString(); //as string; |
| Portfolio pval = val as Portfolio; |
| PortfolioPdx pPdxVal = val as PortfolioPdx; |
| Assert.IsTrue((pPdxVal != null) || (pval != null)); |
| //string opStr = "DESTROY"; |
| /*if (opType == CqOperation.OP_TYPE_CREATE) |
| opStr = "CREATE"; |
| else if (opType == CqOperation.OP_TYPE_UPDATE) |
| opStr = "UPDATE";*/ |
| |
| //Util.Log("key {0}, value ({1},{2}), op {3}.", keyS, |
| // pval.ID, pval.Pkid, opStr); |
| } |
| public virtual void OnError(CqEvent<TKey, TResult> ev) |
| { |
| Util.Log("MyCqListener::OnError called"); |
| if (m_failedOver == true) |
| m_errorCountAfter++; |
| else |
| m_errorCountBefore++; |
| } |
| public virtual void Close() |
| { |
| Util.Log("MyCqListener::close called"); |
| } |
| public virtual void Clear() |
| { |
| Util.Log("MyCqListener::Clear called"); |
| m_eventCountBefore = 0; |
| m_errorCountBefore = 0; |
| m_eventCountAfter = 0; |
| m_errorCountAfter = 0; |
| } |
| } |
| |
| public class MyCqListener1<TKey, TResult> : ICqListener<TKey, TResult> |
| { |
| public static UInt32 m_cntEvents = 0; |
| |
| public virtual void OnEvent(CqEvent<TKey, TResult> ev) |
| { |
| m_cntEvents++; |
| Util.Log("MyCqListener1::OnEvent called"); |
| Object val = (Object)ev.getNewValue(); |
| Object pkey = (Object)ev.getKey(); |
| int value = (int)val; |
| int key = (int)pkey; |
| CqOperation opType = ev.getQueryOperation(); |
| String opStr = "Default"; |
| if (opType == CqOperation.OP_TYPE_CREATE) |
| opStr = "CREATE"; |
| else if (opType == CqOperation.OP_TYPE_UPDATE) |
| opStr = "UPDATE"; |
| |
| Util.Log("MyCqListener1::OnEvent called with {0} , key = {1}, value = {2} ", |
| opStr, key, value); |
| } |
| public virtual void OnError(CqEvent<TKey, TResult> ev) |
| { |
| Util.Log("MyCqListener1::OnError called"); |
| } |
| public virtual void Close() |
| { |
| Util.Log("MyCqListener1::close called"); |
| } |
| } |
| |
| public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult> |
| { |
| #region Private members |
| private bool m_failedOver = false; |
| private UInt32 m_eventCountBefore = 0; |
| private UInt32 m_errorCountBefore = 0; |
| private UInt32 m_eventCountAfter = 0; |
| private UInt32 m_errorCountAfter = 0; |
| private UInt32 m_CqConnectedCount = 0; |
| private UInt32 m_CqDisConnectedCount = 0; |
| |
| #endregion |
| |
| #region Public accessors |
| |
| public MyCqStatusListener(int id) |
| { |
| } |
| |
| public void failedOver() |
| { |
| m_failedOver = true; |
| } |
| public UInt32 getEventCountBefore() |
| { |
| return m_eventCountBefore; |
| } |
| public UInt32 getErrorCountBefore() |
| { |
| return m_errorCountBefore; |
| } |
| public UInt32 getEventCountAfter() |
| { |
| return m_eventCountAfter; |
| } |
| public UInt32 getErrorCountAfter() |
| { |
| return m_errorCountAfter; |
| } |
| public UInt32 getCqConnectedCount() |
| { |
| return m_CqConnectedCount; |
| } |
| public UInt32 getCqDisConnectedCount() |
| { |
| return m_CqDisConnectedCount; |
| } |
| #endregion |
| |
| public virtual void OnEvent(CqEvent<TKey, TResult> ev) |
| { |
| Util.Log("MyCqStatusListener::OnEvent called"); |
| if (m_failedOver == true) |
| m_eventCountAfter++; |
| else |
| m_eventCountBefore++; |
| |
| TResult val = (TResult)ev.getNewValue(); |
| TKey key = ev.getKey(); |
| |
| CqOperation opType = ev.getQueryOperation(); |
| string keyS = key.ToString(); //as string; |
| } |
| public virtual void OnError(CqEvent<TKey, TResult> ev) |
| { |
| Util.Log("MyCqStatusListener::OnError called"); |
| if (m_failedOver == true) |
| m_errorCountAfter++; |
| else |
| m_errorCountBefore++; |
| } |
| public virtual void Close() |
| { |
| Util.Log("MyCqStatusListener::close called"); |
| } |
| public virtual void OnCqConnected() |
| { |
| m_CqConnectedCount++; |
| Util.Log("MyCqStatusListener::OnCqConnected called"); |
| } |
| public virtual void OnCqDisconnected() |
| { |
| m_CqDisConnectedCount++; |
| Util.Log("MyCqStatusListener::OnCqDisconnected called"); |
| } |
| |
| public virtual void Clear() |
| { |
| Util.Log("MyCqStatusListener::Clear called"); |
| m_eventCountBefore = 0; |
| m_errorCountBefore = 0; |
| m_eventCountAfter = 0; |
| m_errorCountAfter = 0; |
| m_CqConnectedCount = 0; |
| m_CqDisConnectedCount = 0; |
| } |
| } |
| |
| |
| #region Private members |
| private static bool m_usePdxObjects = false; |
| private UnitProcess m_client1; |
| private UnitProcess m_client2; |
| private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2", |
| "Portfolios3" }; |
| private static string QERegionName = "Portfolios"; |
| private static string CqName = "MyCq"; |
| |
| private static string CqName1 = "testCQAllServersLeave"; |
| private static string CqName2 = "testCQAllServersLeave1"; |
| |
| private static string CqQuery1 = "select * from /DistRegionAck"; |
| private static string CqQuery2 = "select * from /DistRegionAck1"; |
| //private static string CqName1 = "MyCq1"; |
| |
| #endregion |
| |
| protected override ClientBase[] GetClients() |
| { |
| m_client1 = new UnitProcess(); |
| m_client2 = new UnitProcess(); |
| return new ClientBase[] { m_client1, m_client2 }; |
| } |
| |
| [TestFixtureSetUp] |
| public override void InitTests() |
| { |
| base.InitTests(); |
| m_client1.Call(InitClient); |
| m_client2.Call(InitClient); |
| } |
| |
| [TearDown] |
| public override void EndTest() |
| { |
| CacheHelper.StopJavaServers(); |
| base.EndTest(); |
| } |
| |
| |
| public void InitClient() |
| { |
| CacheHelper.Init(); |
| try |
| { |
| CacheHelper.DCache.TypeRegistry.RegisterType(Portfolio.CreateDeserializable, 8); |
| CacheHelper.DCache.TypeRegistry.RegisterType(Position.CreateDeserializable, 7); |
| } |
| catch (IllegalStateException) |
| { |
| // ignore since we run multiple iterations for pool and non pool configs |
| } |
| } |
| public void StepOne(string locators) |
| { |
| CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true, |
| null, locators, "__TESTPOOL1_", true); |
| CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true, |
| null, locators, "__TESTPOOL1_", true); |
| CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true, |
| null, locators, "__TESTPOOL1_", true); |
| CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true, |
| null, locators, "__TESTPOOL1_", true); |
| CacheHelper.CreateTCRegion_Pool<object, object>("DistRegionAck", true, true, |
| null, locators, "__TESTPOOL1_", true); |
| IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); |
| Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes; |
| region.CreateSubRegion(QueryRegionNames[1], regattrs); |
| } |
| |
| public void StepTwo(bool usePdxObject) |
| { |
| IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); |
| IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); |
| IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]); |
| IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]); |
| IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]); |
| |
| QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); |
| Util.Log("Object type is pdx = " + m_usePdxObjects); |
| |
| Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| |
| if (!usePdxObject) |
| { |
| qh.PopulatePortfolioData(region0, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePositionData(region1, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioData(region2, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioData(region3, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| } |
| else |
| { |
| CacheHelper.DCache.TypeRegistry.RegisterPdxType(PortfolioPdx.CreateDeserializable); |
| CacheHelper.DCache.TypeRegistry.RegisterPdxType(PositionPdx.CreateDeserializable); |
| qh.PopulatePortfolioPdxData(region0, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioPdxData(subRegion0, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioPdxData(region1, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioPdxData(region2, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| qh.PopulatePortfolioPdxData(region3, qh.PortfolioSetSize, |
| qh.PortfolioNumSets); |
| } |
| } |
| |
| public void StepTwoQT() |
| { |
| IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); |
| IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); |
| |
| QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); |
| |
| qh.PopulatePortfolioData(region0, 100, 20, 100); |
| qh.PopulatePositionData(subRegion0, 100, 20); |
| } |
| |
| public void StepOneQE(string locators) |
| { |
| CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, |
| null, locators, "__TESTPOOL1_", true); |
| IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); |
| Portfolio p1 = new Portfolio(1, 100); |
| Portfolio p2 = new Portfolio(2, 100); |
| Portfolio p3 = new Portfolio(3, 100); |
| Portfolio p4 = new Portfolio(4, 100); |
| |
| region["1"] = p1; |
| region["2"] = p2; |
| region["3"] = p3; |
| region["4"] = p4; |
| |
| var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); |
| cqFac.AddCqListener(cqLstner); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); |
| qry.Execute(); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| region["4"] = p1; |
| region["3"] = p2; |
| region["2"] = p3; |
| region["1"] = p4; |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry = qs.GetCq<object, object>(CqName); |
| |
| CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); |
| Assert.AreEqual(1, cqSvcStats.numCqsActive()); |
| Assert.AreEqual(1, cqSvcStats.numCqsCreated()); |
| Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); |
| |
| cqAttr = qry.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| Assert.IsNotNull(vl); |
| Assert.AreEqual(1, vl.Length); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(cqLstner); |
| MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); |
| |
| CqStatistics cqStats = qry.GetStatistics(); |
| Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); |
| if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) |
| { |
| Assert.Fail("cq before count zero"); |
| } |
| qry.Stop(); |
| Assert.AreEqual(1, cqSvcStats.numCqsStopped()); |
| qry.Close(); |
| Assert.AreEqual(1, cqSvcStats.numCqsClosed()); |
| // Bring down the region |
| region.GetLocalView().DestroyRegion(); |
| } |
| |
| public void StepOnePdxQE(string locators) |
| { |
| CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, |
| null, locators, "__TESTPOOL1_", true); |
| IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); |
| PortfolioPdx p1 = new PortfolioPdx(1, 100); |
| PortfolioPdx p2 = new PortfolioPdx(2, 100); |
| PortfolioPdx p3 = new PortfolioPdx(3, 100); |
| PortfolioPdx p4 = new PortfolioPdx(4, 100); |
| |
| region["1"] = p1; |
| region["2"] = p2; |
| region["3"] = p3; |
| region["4"] = p4; |
| |
| var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); |
| cqFac.AddCqListener(cqLstner); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); |
| qry.Execute(); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| region["4"] = p1; |
| region["3"] = p2; |
| region["2"] = p3; |
| region["1"] = p4; |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry = qs.GetCq<object, object>(CqName); |
| |
| CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); |
| Assert.AreEqual(1, cqSvcStats.numCqsActive()); |
| Assert.AreEqual(1, cqSvcStats.numCqsCreated()); |
| Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); |
| |
| cqAttr = qry.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| Assert.IsNotNull(vl); |
| Assert.AreEqual(1, vl.Length); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(cqLstner); |
| MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); |
| |
| CqStatistics cqStats = qry.GetStatistics(); |
| Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); |
| if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) |
| { |
| Assert.Fail("cq before count zero"); |
| } |
| qry.Stop(); |
| Assert.AreEqual(1, cqSvcStats.numCqsStopped()); |
| qry.Close(); |
| Assert.AreEqual(1, cqSvcStats.numCqsClosed()); |
| // Bring down the region |
| region.GetLocalView().DestroyRegion(); |
| } |
| public void KillServer() |
| { |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| } |
| |
| public delegate void KillServerDelegate(); |
| |
| /* |
| public void StepOneFailover() |
| { |
| // This is here so that Client1 registers information of the cacheserver |
| // that has been already started |
| CacheHelper.SetupJavaServers("remotequery.xml", |
| "cqqueryfailover.xml"); |
| CacheHelper.StartJavaServer(1, "GFECS1"); |
| Util.Log("Cacheserver 1 started."); |
| |
| CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); |
| |
| Region region = CacheHelper.GetVerifyRegion(QueryRegionNames[0]); |
| Portfolio p1 = new Portfolio(1, 100); |
| Portfolio p2 = new Portfolio(2, 200); |
| Portfolio p3 = new Portfolio(3, 300); |
| Portfolio p4 = new Portfolio(4, 400); |
| |
| region.Put("1", p1); |
| region.Put("2", p2); |
| region.Put("3", p3); |
| region.Put("4", p4); |
| } |
| */ |
| /* |
| public void StepTwoFailover() |
| { |
| CacheHelper.StartJavaServer(2, "GFECS2"); |
| Util.Log("Cacheserver 2 started."); |
| |
| IAsyncResult killRes = null; |
| KillServerDelegate ksd = new KillServerDelegate(KillServer); |
| CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); |
| |
| IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QueryRegionNames[0]); |
| |
| var qs = CacheHelper.DCache.GetQueryService(); |
| CqAttributesFactory cqFac = new CqAttributesFactory(); |
| ICqListener cqLstner = new MyCqListener(); |
| cqFac.AddCqListener(cqLstner); |
| CqAttributes cqAttr = cqFac.Create(); |
| CqQuery qry = qs.NewCq(CqName1, "select * from /" + QERegionName + " p where p.ID!<4", cqAttr, true); |
| qry.Execute(); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| qry = qs.GetCq(CqName1); |
| cqAttr = qry.GetCqAttributes(); |
| ICqListener[] vl = cqAttr.getCqListeners(); |
| Assert.IsNotNull(vl); |
| Assert.AreEqual(1, vl.Length); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(cqLstner); |
| MyCqListener myLisner = cqLstner as MyCqListener; |
| if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() != 0) |
| { |
| Assert.Fail("cq after count not zero"); |
| } |
| |
| killRes = ksd.BeginInvoke(null, null); |
| Thread.Sleep(18000); // sleep 0.3min to allow failover complete |
| myLisner.failedOver(); |
| |
| Portfolio p1 = new Portfolio(1, 100); |
| Portfolio p2 = new Portfolio(2, 200); |
| Portfolio p3 = new Portfolio(3, 300); |
| Portfolio p4 = new Portfolio(4, 400); |
| |
| region.Put("4", p1); |
| region.Put("3", p2); |
| region.Put("2", p3); |
| region.Put("1", p4); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry = qs.GetCq(CqName1); |
| cqAttr = qry.GetCqAttributes(); |
| vl = cqAttr.getCqListeners(); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(vl); |
| Assert.AreEqual(1, vl.Length); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(cqLstner); |
| myLisner = cqLstner as MyCqListener; |
| if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() == 0) |
| { |
| Assert.Fail("no cq after failover"); |
| } |
| |
| killRes.AsyncWaitHandle.WaitOne(); |
| ksd.EndInvoke(killRes); |
| qry.Stop(); |
| qry.Close(); |
| } |
| */ |
| |
| public void ProcessCQ(string locators) |
| { |
| CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, |
| null, locators, "__TESTPOOL1_", true); |
| |
| IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); |
| Portfolio p1 = new Portfolio(1, 100); |
| Portfolio p2 = new Portfolio(2, 100); |
| Portfolio p3 = new Portfolio(3, 100); |
| Portfolio p4 = new Portfolio(4, 100); |
| |
| region["1"] = p1; |
| region["2"] = p2; |
| region["3"] = p3; |
| region["4"] = p4; |
| |
| var qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService(); |
| |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); |
| ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1); |
| |
| ICqListener<object, object>[] v = new ICqListener<object, object>[2]; |
| cqFac.AddCqListener(cqLstner); |
| v[0] = cqLstner; |
| v[1] = cqStatusLstner; |
| cqFac.InitCqListeners(v); |
| Util.Log("InitCqListeners called"); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + " p where p.ID >= 1", cqAttr, false); |
| qry1.Execute(); |
| |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| region["4"] = p1; |
| region["3"] = p2; |
| region["2"] = p3; |
| region["1"] = p4; |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry1 = qs.GetCq<object, object>("CQ1"); |
| cqAttr = qry1.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| Assert.IsNotNull(vl); |
| Assert.AreEqual(2, vl.Length); |
| cqLstner = vl[0]; |
| Assert.IsNotNull(cqLstner); |
| MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); |
| Assert.AreEqual(4, myLisner.getEventCountBefore()); |
| |
| cqStatusLstner = (ICqStatusListener<object, object>)vl[1]; |
| Assert.IsNotNull(cqStatusLstner); |
| MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore()); |
| Assert.AreEqual(1, myStatLisner.getCqConnectedCount()); |
| Assert.AreEqual(4, myStatLisner.getEventCountBefore()); |
| |
| CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator(); |
| mutator.RemoveCqListener(cqLstner); |
| cqAttr = qry1.GetCqAttributes(); |
| Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); |
| Assert.AreEqual(1, cqAttr.getCqListeners().Length); |
| |
| mutator.RemoveCqListener(cqStatusLstner); |
| cqAttr = qry1.GetCqAttributes(); |
| Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); |
| Assert.AreEqual(0, cqAttr.getCqListeners().Length); |
| |
| ICqListener<object, object>[] v2 = new ICqListener<object, object>[2]; |
| v2[0] = cqLstner; |
| v2[1] = cqStatusLstner; |
| MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner; |
| myLisner2.Clear(); |
| MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner; |
| myStatLisner2.Clear(); |
| mutator.SetCqListeners(v2); |
| cqAttr = qry1.GetCqAttributes(); |
| Assert.AreEqual(2, cqAttr.getCqListeners().Length); |
| |
| region["4"] = p1; |
| region["3"] = p2; |
| region["2"] = p3; |
| region["1"] = p4; |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry1 = qs.GetCq<object, object>("CQ1"); |
| cqAttr = qry1.GetCqAttributes(); |
| ICqListener<object, object>[] v3 = cqAttr.getCqListeners(); |
| Assert.IsNotNull(v3); |
| Assert.AreEqual(2, vl.Length); |
| cqLstner = v3[0]; |
| Assert.IsNotNull(cqLstner); |
| myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore()); |
| Assert.AreEqual(4, myLisner2.getEventCountBefore()); |
| |
| cqStatusLstner = (ICqStatusListener<object, object>)v3[1]; |
| Assert.IsNotNull(cqStatusLstner); |
| myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; |
| Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore()); |
| Assert.AreEqual(0, myStatLisner2.getCqConnectedCount()); |
| Assert.AreEqual(4, myStatLisner2.getEventCountBefore()); |
| |
| mutator = qry1.GetCqAttributesMutator(); |
| mutator.RemoveCqListener(cqLstner); |
| cqAttr = qry1.GetCqAttributes(); |
| Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); |
| Assert.AreEqual(1, cqAttr.getCqListeners().Length); |
| |
| mutator.RemoveCqListener(cqStatusLstner); |
| cqAttr = qry1.GetCqAttributes(); |
| Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); |
| Assert.AreEqual(0, cqAttr.getCqListeners().Length); |
| |
| region["4"] = p1; |
| region["3"] = p2; |
| region["2"] = p3; |
| region["1"] = p4; |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| |
| qry1 = qs.GetCq<object, object>("CQ1"); |
| cqAttr = qry1.GetCqAttributes(); |
| ICqListener<object, object>[] v4 = cqAttr.getCqListeners(); |
| Assert.IsNotNull(v4); |
| Assert.AreEqual(0, v4.Length); |
| Util.Log("cqAttr.getCqListeners() done"); |
| } |
| |
| public void CreateAndExecuteCQ_StatusListener(string poolName, string cqName, string cqQuery, int id) |
| { |
| var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| cqFac.AddCqListener(new MyCqStatusListener<object, object>(id)); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); |
| qry.Execute(); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| } |
| |
| public void CreateAndExecuteCQ_Listener(string poolName, string cqName, string cqQuery, int id) |
| { |
| var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); |
| CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); |
| cqFac.AddCqListener(new MyCqListener<object, object>(/*id*/)); |
| CqAttributes<object, object> cqAttr = cqFac.Create(); |
| CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); |
| qry.Execute(); |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| } |
| |
| public void CheckCQStatusOnConnect(string poolName, string cqName, int onCqStatusConnect) |
| { |
| var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); |
| CqQuery<object, object> query = qs.GetCq<object, object>(cqName); |
| CqAttributes<object, object> cqAttr = query.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>) vl[0]; |
| Util.Log("CheckCQStatusOnConnect = {0} ", myCqStatusLstr.getCqConnectedCount()); |
| Assert.AreEqual(onCqStatusConnect, myCqStatusLstr.getCqConnectedCount()); |
| } |
| |
| public void CheckCQStatusOnDisConnect(string poolName, string cqName, int onCqStatusDisConnect) |
| { |
| var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); |
| CqQuery<object, object> query = qs.GetCq<object, object>(cqName); |
| CqAttributes<object, object> cqAttr = query.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; |
| Util.Log("CheckCQStatusOnDisConnect = {0} ", myCqStatusLstr.getCqDisConnectedCount()); |
| Assert.AreEqual(onCqStatusDisConnect, myCqStatusLstr.getCqDisConnectedCount()); |
| } |
| |
| public void PutEntries(string regionName) |
| { |
| IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(regionName); |
| for (int i = 1; i <= 10; i++) { |
| region["key-" + i] = "val-" + i; |
| } |
| Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete |
| } |
| |
| public void CheckCQStatusOnPutEvent(string poolName, string cqName, int onCreateCount) |
| { |
| var qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService(); |
| CqQuery<object, object> query = qs.GetCq<object, object>(cqName); |
| CqAttributes<object, object> cqAttr = query.GetCqAttributes(); |
| ICqListener<object, object>[] vl = cqAttr.getCqListeners(); |
| MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; |
| Util.Log("CheckCQStatusOnPutEvent = {0} ", myCqStatusLstr.getEventCountBefore()); |
| Assert.AreEqual(onCreateCount, myCqStatusLstr.getEventCountBefore()); |
| } |
| |
| public void CreateRegion(string locators, string servergroup, string regionName, string poolName) |
| { |
| CacheHelper.CreateTCRegion_Pool<object, object>(regionName, true, true, |
| null, locators, poolName, true, servergroup); |
| } |
| |
| void runCqQueryPdxTest() |
| { |
| CacheHelper.SetupJavaServers(true, "remotequeryN.xml"); |
| CacheHelper.StartJavaLocator(1, "GFELOC"); |
| Util.Log("Locator started"); |
| CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); |
| Util.Log("Cacheserver 1 started."); |
| |
| m_client1.Call(StepOne, CacheHelper.Locators); |
| Util.Log("StepOne complete."); |
| |
| m_client1.Call(StepTwo, m_usePdxObjects); |
| Util.Log("StepTwo complete."); |
| |
| if (!m_usePdxObjects) |
| m_client1.Call(StepOneQE, CacheHelper.Locators); |
| else |
| m_client1.Call(StepOnePdxQE, CacheHelper.Locators); |
| Util.Log("StepOne complete."); |
| |
| m_client1.Call(Close); |
| |
| CacheHelper.StopJavaServer(1); |
| Util.Log("Cacheserver 1 stopped."); |
| |
| CacheHelper.StopJavaLocator(1); |
| Util.Log("Locator stopped"); |
| } |
| |
| [Test] |
| public void CqQueryPdxTest() |
| { |
| m_usePdxObjects = true; |
| runCqQueryPdxTest(); |
| m_usePdxObjects = false; |
| } |
| } |
| } |