| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections; |
| using System.Collections.Generic; |
| using System.Text; |
| using System.Threading; |
| |
| namespace Apache.Geode.Client.FwkLib |
| { |
| using Apache.Geode.DUnitFramework; |
| using Apache.Geode.Client.Tests; |
| using Apache.Geode.Client; |
| |
| public class EntryTxTask<TKey, TVal> : ClientTask |
| { |
| #region Private members |
| |
| static protected int BEGIN_TX = 1001; |
| static protected int EXECUTE_TX_OPS = 1002; |
| static protected int EXECUTE_NONTX_OPS = 1003; |
| static protected int COMMIT_TX = 1004; |
| static protected int ROLLBACK_TX = 1005; |
| |
| |
| private IRegion<TKey, TVal> m_region; |
| private static int keyCount = 0; |
| private static int kCount = 0; |
| private int m_MaxKeys; |
| private static Dictionary<TransactionId, object> activeTxns; |
| private static Dictionary<IRegion<TKey, TVal>, Dictionary<TKey, TVal>> OpMap; |
| private object txlock = new object(); |
| private List<TransactionId> keyList ; |
| private Int32 m_create; |
| private Int32 m_update; |
| private Int32 m_destroy; |
| //private Int32 m_invalidate; |
| private Int32 m_cnt; |
| //bool m_isDestroy; |
| bool m_finish; |
| bool m_beginTx; |
| bool m_commitTx; |
| private object CLASS_LOCK = new object(); |
| private const string SerialExc = "isSerialExecution"; |
| private const string ConcurrentExc = "isConcurrentExecution"; |
| private const string WorkTime = "workTime"; |
| |
| #endregion |
| |
| private static String getClientIdString() |
| { |
| return "Cl_Id" + Util.ClientId + "_thr_" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); |
| } |
| |
| public TxInfo begin(IRegion<TKey,TVal> region) |
| { |
| CacheTransactionManager txManager = null; |
| IRegion<TKey, TVal> reg = region; |
| TransactionId id = null; |
| TxInfo txInfo = new TxInfo(); |
| txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| try |
| { |
| txManager.Begin(); |
| try |
| { |
| kCount++; |
| TKey key = (TKey)(object)(kCount); |
| TVal value = GetValue(); |
| reg[key] = value; |
| Util.Log("The Key is={0} and value={1}", key.ToString(),value.ToString()); |
| } |
| catch (Exception e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught {0}", e); |
| } |
| } |
| catch (IllegalStateException e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught {0}", e); |
| } |
| try |
| { |
| id = txManager.Suspend(); |
| Util.Log("Suspend() complete "); |
| } |
| catch (IllegalStateException e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Got {0}", e); |
| } |
| Util.Log("BEGIN_TX returning txId = {0}",id); |
| try |
| { |
| txInfo.setTxId(id); |
| } |
| catch (Exception e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Got this {0}", e); |
| } |
| |
| return txInfo; |
| } |
| |
| public bool commit(TxInfo txInfo) |
| { |
| TransactionId id = (TransactionId)(object)txInfo.getTxId(); |
| bool commited = false; |
| CacheTransactionManager txManager = null; |
| txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| if (txManager.TryResume(id, TimeSpan.FromMilliseconds(30000))) |
| { |
| try |
| { |
| txManager.Commit(); |
| commited = true; |
| } |
| catch (CommitConflictException ex) |
| { |
| //Expected exception with concurrent transactions. |
| Util.Log("Got expected exception {0}", ex); |
| } |
| catch (TransactionDataRebalancedException ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Got {0}", ex); |
| } |
| catch (TransactionDataNodeHasDepartedException ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Got {0}", ex); |
| } |
| } |
| else |
| Util.Log("TxId {0} is not suspended in this member with tryResume time limit, cannot commit. Expected, continuing test."); |
| Util.Log("Commited returning {0}",commited); |
| return commited; |
| |
| } |
| |
| public void rollback(TxInfo txInfo) |
| { |
| TransactionId id = (TransactionId)(object)txInfo.getTxId(); |
| CacheTransactionManager txManager = null; |
| bool isRollBack = false; |
| txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| if (txManager.TryResume(id, TimeSpan.FromMilliseconds(30000))) |
| { |
| txManager.Rollback(); |
| isRollBack = true; |
| } |
| else |
| Util.Log("TxId {0} is not suspended in this member with tryResume time limit, cannot rollback. Expected with concurrent execution, continuing test."); |
| Util.Log("RollbackTx returning {0}",isRollBack); |
| |
| } |
| |
| public void executeTxOps(TxInfo txInfo) |
| { |
| //This will do a resume+doOps+suspend |
| TransactionId id = (TransactionId)(object)txInfo.getTxId(); |
| CacheTransactionManager txManager = null; |
| txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| bool executedOps = false; |
| int numOfOpsToDo = 5; |
| if (txManager.TryResume(id, TimeSpan.FromMilliseconds(30000))) |
| { |
| try |
| { |
| doEntryOps(numOfOpsToDo); |
| executedOps = true; |
| } |
| catch (TransactionDataNodeHasDepartedException ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught {0}", ex); |
| } |
| catch (TransactionDataRebalancedException ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught {0}", ex); |
| } |
| catch (Exception ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught unexpected exception during doEntryOps task {0}", ex.Message); |
| } |
| finally |
| { |
| id = txManager.Suspend(); |
| Util.Log("Suspend() complete txId is {0}", id); |
| } |
| } |
| Util.Log("EXECUTE_TX returned {0}", executedOps); |
| } |
| |
| public void doEntryOps(int numOfOpsToDo) |
| { |
| int numOpsCompleted = 0; |
| string opcode = null; |
| List<string> opList = new List<string>(); |
| IRegion<TKey, TVal> region = m_region; |
| int size = 0; |
| int create = 0, update = 0, destroy = 0, invalidate = 0, localdestroy = 0, /*localinvalidate = 0,*/ |
| get = 0/*, putall = 0*/; |
| try |
| { |
| size = region.Count; |
| opcode = FwkTest<TKey, TVal>.CurrentTest.GetStringValue("entryOps"); |
| Util.Log("Op code is {0} and numOf opsCompleted is {1} and Regionsize is {2}", opcode, numOpsCompleted, size); |
| if (opcode == null) opcode = "no-opcode"; |
| if (((size < 1) && (opcode != "create")) || (opcode == "create")) |
| { |
| opcode = "create"; |
| addEntry(region); |
| Interlocked.Increment(ref m_create); |
| create++; |
| } |
| else if (opcode == "update") |
| { |
| updateEntry(region); |
| Interlocked.Increment(ref m_update); |
| update++; |
| } |
| else if (opcode == "destroy") |
| { |
| destroyEntry(region, false); |
| Interlocked.Increment(ref m_destroy); |
| destroy++; |
| } |
| else if (opcode == "localDestroy") |
| { |
| destroyEntry(region, true); |
| //Interlocked.Increment(); |
| localdestroy++; |
| } |
| else if (opcode == "get") |
| { |
| getKey(region); |
| get++; |
| } |
| else if (opcode == "invalidate") |
| { |
| invalidateEntry(region,false); |
| invalidate++; |
| } |
| else |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("CacheServer.doEntryOps() Invalid operation " + |
| "specified: {0}", opcode); |
| } |
| numOpsCompleted++; |
| opList.Add(opcode); |
| |
| } |
| catch (Exception ex) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("CacheServer.doEntryOps() Caught unexpected " + |
| "exception during entry '{0}' operation: {1}.", opcode, ex); |
| } |
| Util.Log("DoEntryOP: create = {0}, update = {1}, destroy = {2},get = {3} invalidate={4}, num of Ops ={5}", |
| m_create, m_update, m_destroy, get, invalidate, numOpsCompleted); |
| } |
| |
| protected TKey addEntry(IRegion<TKey, TVal> m_region) |
| { |
| TKey key = GetNewKey(); |
| TVal value = GetValue(); |
| //int beforeSize = 0; |
| bool isSerialExecution = false; |
| Dictionary<TKey, TVal> addMap = new Dictionary<TKey, TVal>(); |
| try |
| { |
| m_region.Add(key, value); |
| addMap[key] = value; |
| OpMap[m_region] = addMap; |
| doEdgeClientValidation(); |
| } |
| catch (EntryExistsException ex) |
| { |
| if (isSerialExecution) |
| { |
| // cannot get this exception; nobody else can have this key |
| throw new Exception(ex.StackTrace); |
| } |
| else |
| { |
| Util.Log("Caught {0} (expected with concurrent execution); continuing with test", ex); |
| } |
| } |
| return key; |
| } |
| |
| protected void updateEntry(IRegion<TKey, TVal> r) |
| { |
| TKey key = GetExistingKey(true,r); |
| TVal value = GetValue(); |
| Dictionary<TKey, TVal> updateMap = new Dictionary<TKey, TVal>(); |
| r[key] = value; |
| updateMap[key] = value; |
| OpMap[m_region] = updateMap; |
| doEdgeClientValidation(); |
| } |
| |
| protected void invalidateEntry(IRegion<TKey, TVal> r, bool isLocalInvalidate) |
| { |
| TKey key = GetExistingKey(true,r); |
| bool containsKey = m_region.GetLocalView().ContainsKey(key); |
| bool containsValueForKey = m_region.GetLocalView().ContainsValueForKey(key); |
| Util.Log("containsKey for " + key + ": " + containsKey); |
| Util.Log("containsValueForKey for " + key + ": " + containsValueForKey); |
| try |
| { |
| if (isLocalInvalidate) |
| { // do a local invalidate |
| m_region.GetLocalView().Invalidate(key); |
| } |
| else |
| { // do a distributed invalidate |
| m_region.Invalidate(key); |
| } |
| } |
| catch (EntryNotFoundException e) |
| { |
| Util.Log("Caught {0} (expected); continuing with test", e); |
| return; |
| |
| } |
| } |
| |
| protected void destroyEntry(IRegion<TKey, TVal> m_region, bool isLocalDestroy) |
| { |
| TKey key = GetExistingKey(true, m_region); |
| bool isSerialExecution = false; |
| try |
| { |
| if (isLocalDestroy) |
| { // do a local invalidate |
| m_region.GetLocalView().Remove(key); |
| } |
| else |
| { // do a distributed invalidate |
| m_region.Remove(key); |
| } |
| } |
| catch (EntryNotFoundException e) |
| { |
| if (isSerialExecution) |
| throw new Exception(e.StackTrace); |
| else |
| { |
| Util.Log("Caught {0} (expected with concurrent execution); continuing with test", e); |
| return; |
| } |
| } |
| } |
| |
| protected void getKey(IRegion<TKey, TVal> aRegion) |
| { |
| TKey key = GetExistingKey(true,aRegion); |
| TVal anObj = default(TVal); |
| try |
| { |
| anObj = aRegion[key]; |
| } |
| catch (Apache.Geode.Client.KeyNotFoundException) |
| { |
| if (!EqualityComparer<TVal>.Default.Equals(anObj, default(TVal))) |
| { |
| throw new Apache.Geode.Client.KeyNotFoundException(); |
| } |
| |
| } |
| } |
| |
| protected TKey GetExistingKey(bool useServerKeys,IRegion<TKey,TVal> region) |
| { |
| TKey key = default(TKey); |
| if (useServerKeys) |
| { |
| int size = region.Count; |
| TKey[] keys = (TKey[])region.Keys; |
| key = keys[Util.Rand(0, size)]; |
| } |
| else |
| { |
| int size = region.GetLocalView().Count; |
| TKey[] keys = (TKey[])region.GetLocalView().Keys; |
| key = keys[Util.Rand(0, size)]; |
| } |
| return key; |
| } |
| |
| protected TKey GetNewKey() |
| { |
| keyCount++; |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey("distinctKeys"); |
| int numKeys = FwkTest<TKey, TVal>.CurrentTest.GetUIntValue("distinctKeys"); |
| // String keybuf = String.Format("Key-{0}-{1}-{2}", Util.PID, Util.ThreadID, keyCount); |
| TKey key = (TKey)(object)(keyCount); |
| // keyCount++; |
| return key; |
| } |
| |
| protected TVal GetValue() |
| { |
| TVal tmpValue = default(TVal); |
| FwkTest<TKey,TVal>.CurrentTest.ResetKey("valueSizes"); |
| int size = FwkTest<TKey, TVal>.CurrentTest.GetUIntValue("valueSizes"); |
| StringBuilder builder = new StringBuilder(); |
| Random random = new Random(); |
| char ch; |
| for (int j = 0; j < size; j++) |
| { |
| ch = Convert.ToChar(Convert.ToInt32(Math.Floor(26 * random.NextDouble() + 65))); |
| builder.Append(ch); |
| } |
| if (typeof(TVal) == typeof(string)) |
| { |
| tmpValue = (TVal)(object)builder.ToString(); |
| } |
| else if (typeof(TVal) == typeof(byte[])) |
| { |
| tmpValue = (TVal)(object)(Encoding.ASCII.GetBytes(builder.ToString())); |
| } |
| return tmpValue; |
| } |
| |
| public void doTransactions(IRegion<TKey,TVal> region)//,Dictionary<TransactionId, object> m_maps) |
| { |
| Util.Log("Inside ResumableTx:doTransactions()"); |
| TransactionId txId = null; |
| //String txIdStr = ""; |
| TxInfo txInfo = null; |
| IRegion<TKey, TVal> reg = region; |
| int action = 0; |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey("minExecutions"); |
| int minExecutions = FwkTest<TKey, TVal>.CurrentTest.GetUIntValue("minExecutions"); |
| int numExecutions = 0; |
| Util.Log("activeTx map count is {0}",activeTxns.Count); |
| if (activeTxns.Count > 0) |
| { |
| try |
| { |
| keyList = new List<TransactionId>(activeTxns.Keys); |
| } |
| catch (Exception e) { FwkTest<TKey, TVal>.CurrentTest.FwkException("Inside keylist exception {0}", e); } |
| } |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey("numThreads"); |
| string NUMTHREADS = "numThreads"; |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey(NUMTHREADS); |
| int numActiveThd = FwkTest<TKey, TVal>.CurrentTest.GetUIntValue(NUMTHREADS); |
| int numActiveTx = activeTxns.Count; |
| try |
| { |
| Util.Log("numAvtiveTx is {0} and no of AvtiveThread = {1} and activemap count= {2}", numActiveTx, numActiveThd,activeTxns.Count); |
| } |
| catch (Exception e) { FwkTest<TKey, TVal>.CurrentTest.FwkException("Caught exception {0}",e); } |
| lock (CLASS_LOCK) |
| { |
| if (numActiveTx < numActiveThd) |
| { |
| action = BEGIN_TX; |
| } |
| else |
| { |
| for (int i = 0; i < keyList.Count; i++) |
| { |
| txId = (TransactionId)(object)keyList[i]; |
| txInfo = (TxInfo)(object)activeTxns[txId]; |
| |
| numExecutions = txInfo.getNumExecutions(); |
| if (numExecutions > minExecutions) |
| { |
| bool isCommit = FwkTest<TKey, TVal>.CurrentTest.GetBoolValue("TxBool"); |
| if (isCommit) |
| { |
| action = COMMIT_TX; |
| } |
| else |
| { |
| action = ROLLBACK_TX; |
| } |
| break; |
| } |
| } |
| if (action == 0) |
| { |
| action = EXECUTE_TX_OPS; |
| Random random = new Random(); |
| int rant = random.Next(0, keyList.Count - 1); |
| txId = (TransactionId)(object)keyList[rant]; |
| txInfo = (TxInfo)(object)activeTxns[txId]; |
| } |
| } |
| } |
| Boolean success = false; |
| try |
| { |
| lock (txlock) |
| { |
| switch (action) |
| { |
| case (1001): |
| txInfo = begin(reg); |
| txId = txInfo.getTxId(); |
| activeTxns[txId] = (Object)txInfo; |
| break; |
| case (1002): |
| executeTxOps(txInfo); |
| txId = txInfo.getTxId(); |
| txInfo = (TxInfo)(object)activeTxns[txId]; |
| if(txInfo != null) |
| { // could happen if tx committed, entry removed |
| txInfo.incrementNumExecutions(); |
| activeTxns[txId] = (Object)txInfo; |
| } |
| break; |
| case (1004): |
| success = commit(txInfo); |
| break; |
| case (1005): |
| rollback(txInfo); |
| txId = txInfo.getTxId(); |
| activeTxns.Remove(txId); |
| break; |
| } |
| } |
| } |
| catch (Exception e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("The Test Threw this exception {0}", e); |
| } |
| } |
| |
| public void doSerialExecution(IRegion<TKey, TVal> region) |
| { |
| Util.Log("Inside ResumableTX doSerialExecution()"); |
| keyList = new List<TransactionId>(activeTxns.Keys); |
| TxInfo txInfo; |
| TransactionId txId = null; |
| for (int i = 0; i < keyList.Count; i++) |
| { |
| txInfo = (TxInfo)activeTxns[keyList[i]]; |
| txId = (TransactionId)(object)txInfo.getTxId(); |
| //bool executedOps; |
| try |
| { |
| executeTxOps(txInfo); |
| } |
| catch (Exception ex) |
| { |
| Util.Log("Caught {0} while executing tx ops", ex); |
| //executedOps = false; |
| } |
| txInfo.incrementNumExecutions(); |
| activeTxns[txId] = (Object)txInfo; |
| } |
| } |
| |
| public void doEdgeClientValidation() |
| { |
| //Validate the number of expected key/values and no. of destroyed entries. |
| Util.Log("Verifying expected keys/values"); |
| Util.Log("Region count is {0} and local region count is {1}",m_region.Count,m_region.GetLocalView().Count); |
| TKey key; |
| TVal val; |
| bool success = true; |
| lock (OpMap) |
| { |
| foreach (KeyValuePair<IRegion<TKey, TVal>, Dictionary<TKey, TVal>> kvp in OpMap) |
| { |
| IRegion<TKey, TVal> myregion = (IRegion<TKey, TVal>)kvp.Key; |
| Dictionary<TKey, TVal> mapp = (Dictionary<TKey, TVal>)kvp.Value; |
| foreach (KeyValuePair<TKey, TVal> sp in mapp) |
| { |
| key = sp.Key; |
| val = sp.Value; |
| if (!m_region.ContainsKey(key)) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Expected containsKey() to be true for key {0} ,but it was false", key); |
| success = false; |
| } |
| if (val != null && !m_region.ContainsValueForKey(key)) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Expected containsValueForKey() to be true for key {0} ,but it was false", key); |
| success = false; |
| } |
| } |
| } |
| } |
| if (!success) |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("edge client validation failed"); |
| else |
| Util.Log("Done executing doEdgeClientValidation - validation successful"); |
| |
| } |
| |
| public void finishAllActiveTx(Dictionary<TransactionId,object> activeTX,Boolean isCommit) |
| { |
| lock (CLASS_LOCK) |
| { |
| Util.Log("In Finish active map count is {0}",activeTX.Count); |
| keyList = new List<TransactionId>(activeTX.Keys); |
| bool txCompleted = false; |
| for (int i = 0; i < keyList.Count; i++) |
| { |
| TransactionId txId = (TransactionId)(object)keyList[i]; |
| TxInfo txInfo = (TxInfo)(object)activeTX[txId]; |
| try |
| { |
| txCompleted = commit(txInfo); |
| Util.Log("Commit got {0}",txCompleted); |
| } |
| catch (Exception e) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkException("Got this exception {0}", e); |
| } |
| finally |
| { |
| activeTX.Remove(txId); |
| } |
| Util.Log("After commit/update the Active Map is {0}", activeTX.Count); |
| } |
| } |
| |
| } |
| |
| public EntryTxTask(IRegion<TKey, TVal> region, int keyCnt, Dictionary<TransactionId, object> maps,Dictionary<IRegion<TKey,TVal>,Dictionary<TKey,TVal>>omaps, Boolean finish,Boolean isBeginTx,Boolean isCommitTx) |
| : base() |
| { |
| m_region = region; |
| m_MaxKeys = keyCnt; |
| activeTxns = maps; |
| OpMap = omaps; |
| m_create = 0; |
| m_update = 0; |
| m_destroy = 0; |
| //m_invalidate = 0; |
| m_cnt = 0; |
| //m_isDestroy = true; |
| m_finish = finish; |
| m_beginTx = isBeginTx; |
| m_commitTx = isCommitTx; |
| } |
| public override void DoTask(int iters, object data) |
| { |
| Int32 localcnt = m_cnt; |
| Interlocked.Increment(ref m_cnt); |
| int offset = Util.Rand(m_MaxKeys); |
| int count = offset; |
| //TKey key = default(TKey); |
| //TVal value = default(TVal); |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey(SerialExc); |
| bool isSerialExecution = FwkTest<TKey, TVal>.CurrentTest.GetBoolValue(SerialExc); |
| |
| FwkTest<TKey, TVal>.CurrentTest.ResetKey(ConcurrentExc); |
| bool isConcExecution = FwkTest<TKey, TVal>.CurrentTest.GetBoolValue(ConcurrentExc); |
| int Thrd = FwkTest<TKey, TVal>.CurrentTest.GetUIntValue("numThreads"); |
| Util.Log("EntryTask::DoTask: starting {0} iterations. and isSerialExecution is {1}", iters,isSerialExecution); |
| while (Running && (iters-- != 0)) |
| { |
| if (isConcExecution && !m_finish) |
| { |
| doTransactions(m_region); |
| } |
| if (isSerialExecution && !m_finish) |
| { |
| lock (CLASS_LOCK) |
| { |
| doSerialExecution(m_region); |
| } |
| } |
| if (m_finish && !m_commitTx) |
| { |
| finishAllActiveTx(activeTxns,false); |
| } |
| if (m_beginTx) |
| { |
| lock (txlock) |
| { |
| TxInfo txInfo; |
| TransactionId txId = null; |
| txInfo = begin(m_region); |
| txId = (TransactionId)(object)txInfo.getTxId(); |
| activeTxns[txId] = (Object)txInfo; |
| } |
| } |
| |
| if (m_commitTx && m_finish) |
| { |
| finishAllActiveTx(activeTxns, m_commitTx); |
| } |
| } |
| Interlocked.Add(ref m_iters, count - offset); |
| } |
| } |
| |
| public class ResumableTx<TKey,TVal> : FwkTest<TKey,TVal> |
| { |
| // Tx actions (for concurrent resumable transactions with function execution) |
| static protected int BEGIN_TX = 1001; |
| static protected int EXECUTE_TX_OPS = 1002; |
| static protected int EXECUTE_NONTX_OPS = 1003; |
| static protected int COMMIT = 1004; |
| static protected int ROLLBACK = 1005; |
| #region Private constants and statics |
| |
| static protected int SUSPEND = 2; |
| static protected int RESUME = 3; |
| |
| private static Dictionary<string, Dictionary<TKey, TVal>> BeforeTxMap = new Dictionary<string, Dictionary<TKey, TVal>>(); |
| private static Dictionary<string, Dictionary<TKey, TVal>> AfterTxMap = new Dictionary<string, Dictionary<TKey, TVal>>(); |
| protected static Dictionary<TransactionId, object> activeTxns = null; |
| protected static Dictionary<IRegion<TKey,TVal>,Dictionary<TKey, TVal>> OpMap = null; |
| //private List<TransactionId> keyList; |
| //private static int globalTxId = 0; |
| //private static int ExecutionNo = 0; |
| //private static string RRName = null; |
| |
| private object CLASS_LOCK = new object(); |
| |
| #endregion |
| |
| private const string RegionName = "regionName"; |
| private const string ValueSizes = "valueSizes"; |
| private const string OpsSecond = "opsSecond"; |
| private const string EntryCount = "entryCount"; |
| private const string WorkTime = "workTime"; |
| private const string EntryOps = "entryOps"; |
| private const string LargeSetQuery = "largeSetQuery"; |
| private const string UnsupportedPRQuery = "unsupportedPRQuery"; |
| private const string ObjectType = "objectType"; |
| protected const string NumThreads = "numThreads"; |
| protected const string TimedInterval = "timedInterval"; |
| protected const string DistinctKeys = "distinctKeys"; |
| protected const string BEGINTX = "isBeginTX"; |
| protected const string COMMITTX = "isCommitTX"; |
| |
| public class SilenceListener<TKey1, TVal1> : CacheListenerAdapter<TKey1,TVal1> |
| { |
| public override void AfterCreate(EntryEvent<TKey1, TVal1> ev) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterCreate key"); |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterCreate key = {0} value = {1}", ev.Key, ev.NewValue); |
| } |
| public override void AfterUpdate(EntryEvent<TKey1, TVal1> ev) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterUpdate"); |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterUpdate key = {0} value = {1}", ev.Key, ev.NewValue); |
| } |
| public override void AfterDestroy(EntryEvent<TKey1, TVal1> ev) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterDestroy"); |
| } |
| public override void AfterInvalidate(EntryEvent<TKey1, TVal1> ev) |
| { |
| FwkTest<TKey, TVal>.CurrentTest.FwkInfo("SilenceListener: AfterInvalidate"); |
| } |
| } |
| |
| #region Private utility methods |
| |
| private IRegion<TKey, TVal> GetRegion() |
| { |
| return GetRegion(null); |
| } |
| |
| protected IRegion<TKey, TVal> GetRegion(string regionName) |
| { |
| IRegion<TKey, TVal> region; |
| if (regionName == null) |
| { |
| regionName = GetStringValue("regionName"); |
| } |
| if (regionName == null) |
| { |
| region = (IRegion<TKey, TVal>)GetRootRegion(); |
| if (region == null) |
| { |
| IRegion<TKey, TVal>[] rootRegions = CacheHelper<TKey, TVal>.DCache.RootRegions<TKey, TVal>(); |
| if (rootRegions != null && rootRegions.Length > 0) |
| { |
| region = rootRegions[Util.Rand(rootRegions.Length)]; |
| } |
| } |
| } |
| else |
| { |
| region = CacheHelper<TKey, TVal>.GetRegion(regionName); |
| } |
| return region; |
| } |
| #endregion |
| |
| #region Public Methods |
| |
| public static ICacheListener<TKey, TVal> CreateSilenceListener() |
| { |
| return new SilenceListener<TKey, TVal>(); |
| } |
| |
| public virtual void DoCreatePool() |
| { |
| FwkInfo("In DoCreatePool()"); |
| try |
| { |
| CreatePool(); |
| } |
| catch (Exception ex) |
| { |
| FwkException("DoCreatePool() Caught Exception: {0}", ex); |
| } |
| FwkInfo("DoCreatePool() complete."); |
| } |
| |
| |
| public virtual void DoCreateRegion() |
| { |
| FwkInfo("ResumableTx:DoCreateRegion()"); |
| try |
| { |
| IRegion<TKey, TVal> region = CreateRootRegion(); |
| ResetKey("useTransactions"); |
| if (region == null) |
| { |
| FwkException("ResumableTx:DoCreateRegion() could not create region."); |
| } |
| FwkInfo("ResumableTx:DoCreateRegion() Created region '{0}'", region.Name); |
| } |
| catch (Exception ex) |
| { |
| FwkException("ResumableTx:DoCreateRegion() Caught Exception: {0}", ex); |
| } |
| FwkInfo("ResumableTx:DoCreateRegion() complete."); |
| } |
| |
| public void DoCloseCache() |
| { |
| FwkInfo("DoCloseCache() Closing cache and disconnecting from" + |
| " distributed system."); |
| CacheHelper<TKey, TVal>.Close(); |
| } |
| |
| public void DoRegisterAllKeys() |
| { |
| FwkInfo("In DoRegisterAllKeys()"); |
| try |
| { |
| IRegion<TKey, TVal> region = GetRegion(); |
| FwkInfo("DoRegisterAllKeys() region name is {0}", region.Name); |
| bool isDurable = GetBoolValue("isDurableReg"); |
| ResetKey("getInitialValues"); |
| bool isGetInitialValues = GetBoolValue("getInitialValues"); |
| bool checkReceiveVal = GetBoolValue("checkReceiveVal"); |
| bool isReceiveValues = true; |
| if (checkReceiveVal) |
| { |
| ResetKey("receiveValue"); |
| isReceiveValues = GetBoolValue("receiveValue"); |
| } |
| region.GetSubscriptionService().RegisterAllKeys(isDurable, isGetInitialValues, isReceiveValues); |
| } |
| catch (Exception ex) |
| { |
| FwkException("DoRegisterAllKeys() Caught Exception: {0}", ex); |
| } |
| FwkInfo("DoRegisterAllKeys() complete."); |
| } |
| |
| public void DoSerialTxWithOps() |
| { |
| FwkInfo("Inside DoSerialTxWithOps ActiveMap count is {0}", activeTxns.Count); |
| IRegion<TKey, TVal> region = GetRegion(); |
| int entryCount = GetUIntValue(EntryCount); |
| entryCount = (entryCount < 1) ? 10000 : entryCount; |
| OpMap = new Dictionary<IRegion<TKey, TVal>, Dictionary<TKey, TVal>>(); |
| |
| int timedInterval = GetTimeValue("timedInterval") * 1000; |
| int maxTime = 10 * timedInterval; |
| // Loop over key set sizes |
| ResetKey("distinctKeys"); |
| int numKeys = GetUIntValue("distinctKeys"); |
| |
| ResetKey(NumThreads); |
| int numThreads; |
| while ((numThreads = GetUIntValue(NumThreads)) > 0) |
| { |
| EntryTxTask<TKey, TVal> entrytask = new EntryTxTask<TKey, TVal>(region, numKeys / numThreads, activeTxns, OpMap,false,false,false); |
| FwkInfo("Running timed task "); |
| try |
| { |
| RunTask(entrytask, numThreads, -1, timedInterval, maxTime, null); |
| } |
| catch (ClientTimeoutException) |
| { |
| FwkException("In DoSerialTxWithOps() Timed run timed out."); |
| } |
| FwkInfo("Completed timed task "); |
| Thread.Sleep(3000); |
| //entrytask.dumpToBB(); |
| } |
| } |
| |
| public void DoConcTxWithOps() |
| { |
| FwkInfo("Inside ResumableTx:DoConcTxWithOps()"); |
| IRegion<TKey, TVal> region = GetRegion(); |
| activeTxns = new Dictionary<TransactionId, object>(); |
| OpMap = new Dictionary<IRegion<TKey, TVal>, Dictionary<TKey, TVal>>(); |
| int entryCount = GetUIntValue(EntryCount); |
| entryCount = (entryCount < 1) ? 10000 : entryCount; |
| |
| int secondsToRun = GetTimeValue(WorkTime); |
| secondsToRun = (secondsToRun < 1) ? 10 : secondsToRun; |
| |
| DateTime now = DateTime.Now; |
| DateTime end = now.AddSeconds(secondsToRun); |
| |
| int timedInterval = GetTimeValue(TimedInterval) * 1000; |
| int maxTime = 10 * timedInterval; |
| // Loop over key set sizes |
| ResetKey(DistinctKeys); |
| int numKeys = GetUIntValue(DistinctKeys); |
| |
| ResetKey(NumThreads); |
| int numThreads; |
| while ((numThreads = GetUIntValue(NumThreads)) > 0) |
| { |
| EntryTxTask<TKey, TVal> entrytask = new EntryTxTask<TKey, TVal>(region, numKeys / numThreads, activeTxns,OpMap,false,false,false); |
| FwkInfo("Running timed task "); |
| try |
| { |
| RunTask(entrytask, numThreads, -1, timedInterval, maxTime, null); |
| } |
| catch (ClientTimeoutException) |
| { |
| FwkException("In DoConcTxWithOps() Timed run timed out."); |
| } |
| Thread.Sleep(3000); |
| //entrytask.dumpToBB(); |
| } |
| FwkInfo("Completed timed task "); |
| } |
| |
| public void DoBeginTx() |
| { |
| IRegion<TKey, TVal> region = GetRegion(); |
| activeTxns = new Dictionary<TransactionId, object>(); |
| int timedInterval = GetTimeValue(TimedInterval) * 1000; |
| int maxTime = 10 * timedInterval; |
| ResetKey(DistinctKeys); |
| |
| ResetKey(BEGINTX); |
| bool isBeginTx = GetBoolValue(BEGINTX); |
| |
| int numKeys = GetUIntValue(DistinctKeys); |
| ResetKey(NumThreads); |
| int numThreads; |
| while ((numThreads = GetUIntValue(NumThreads)) > 0) |
| { |
| EntryTxTask<TKey, TVal> entrytask = new EntryTxTask<TKey, TVal>(region, numKeys / numThreads, activeTxns,OpMap, false, isBeginTx, false); |
| FwkInfo("Running timed task "); |
| try |
| { |
| RunTask(entrytask, numThreads, -1, timedInterval, maxTime, null); |
| } |
| catch (ClientTimeoutException) |
| { |
| FwkException("In DoBeginTx() Timed run timed out."); |
| } |
| Thread.Sleep(3000); |
| //entrytask.dumpToBB(); |
| } |
| |
| } |
| |
| public void DoFinishAllActiveTx() |
| { |
| FwkInfo("Inside DoFinishActiveTx ActiveMap count is {0}",activeTxns.Count); |
| Dictionary<TKey, TVal> tempmap = new Dictionary<TKey, TVal>(); |
| IRegion<TKey, TVal> region = GetRegion(); |
| int timedInterval = GetTimeValue(TimedInterval) * 1000; |
| int maxTime = 10 * timedInterval; |
| // Loop over key set sizes |
| ResetKey(DistinctKeys); |
| int numKeys = GetUIntValue(DistinctKeys); |
| |
| ResetKey(COMMITTX); |
| bool isCommitTx = GetBoolValue(COMMITTX); |
| //bool Finish=true; |
| ResetKey(NumThreads); |
| int numThreads; |
| while ((numThreads = GetUIntValue(NumThreads)) > 0) |
| { |
| EntryTxTask<TKey, TVal> entrytask = new EntryTxTask<TKey, TVal>(region, numKeys / numThreads, activeTxns,OpMap, true,false,isCommitTx); |
| FwkInfo("Running timed task "); |
| try |
| { |
| RunTask(entrytask, numThreads, -1, timedInterval, maxTime, null); |
| } |
| catch (ClientTimeoutException) |
| { |
| FwkException("In DoFinishAllActiveTx() Timed run timed out."); |
| } |
| Thread.Sleep(3000); |
| //entrytask.dumpToBB(); |
| } |
| |
| } |
| |
| public void DoPopulateRegion() |
| { |
| CacheTransactionManager txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| //TransactionId txId = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| IRegion<TKey, TVal> reg = GetRegion(); |
| for (int i = 0; i < 5; i++) |
| { |
| TKey key = (TKey)(object)(i); |
| TVal value = (TVal)(object)"Value_"; |
| txManager.Begin(); |
| reg[key] = value; |
| txManager.Commit(); |
| Util.Log("The Key is={0} and value={1}", key.ToString(), value.ToString()); |
| } |
| } |
| |
| public void DoGet() |
| { |
| IRegion<TKey, TVal> reg = GetRegion(); |
| for (int i = 0; i < 5; i++) |
| { |
| TKey key = (TKey)(object)(i); |
| try |
| { |
| TVal val = reg[key]; |
| Util.Log("The Key is={0} and value={1}", key.ToString(), val.ToString()); |
| } |
| catch (Exception e) { FwkException("SP:Caught {0}",e); } |
| } |
| } |
| |
| public void DoBasicTX() |
| { |
| FwkInfo("Inside ResumableTx:DoBasicTX()"); |
| IRegion<TKey, TVal> region = GetRegion(); |
| string opcode = null; |
| int action = 0; |
| int minNoEx = GetUIntValue("minExecutions"); |
| |
| if (region == null) |
| { |
| FwkSevere("ResumableTx:DoBasicTX(): No region to perform operations on."); |
| } |
| CacheTransactionManager txManager = null; |
| TransactionId txId = null; |
| |
| int numOfEx = 0; |
| try |
| { |
| txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| if (numOfEx == 0) |
| { |
| FwkInfo("Begin.() tx numOfEx={0} and minNoEx={1}", numOfEx, minNoEx); |
| txManager.Begin(); |
| action = BEGIN_TX; |
| doValidateTxOps(txId, action); |
| } |
| while (!(numOfEx > minNoEx)) |
| { |
| try |
| { |
| FwkInfo("numOfEx={0} and minNoEx={1}", numOfEx, minNoEx); |
| txId = txManager.Suspend(); |
| action = SUSPEND; |
| doValidateTxOps(txId, action); |
| |
| txManager.Resume(txId); |
| int numKeys = 100; |
| Dictionary<TKey, TVal> keyValMap = new Dictionary<TKey, TVal>(); |
| TKey key; |
| TVal value; |
| for (int i = 0; i < numKeys; i++) |
| { |
| string keyName = String.Format("key_{0}", i); |
| key = (TKey)(object)keyName.ToString(); |
| value = (TVal)(object)"Value_"; |
| region[key] = value; |
| keyValMap[key] = value; |
| AfterTxMap["put"] = keyValMap; |
| } |
| action = RESUME; |
| FwkInfo("keyValMap count={0} and region count is {1}",keyValMap.Count,region.Keys.Count); |
| doValidateTxOps(txId, action); |
| } |
| catch (IllegalStateException ex) |
| { |
| FwkException("Got {0}", ex.Message); |
| } |
| catch (CommitConflictException ex) |
| { |
| FwkException("Got {0}", ex.Message); |
| } |
| numOfEx++; |
| } |
| if (numOfEx > minNoEx) |
| { |
| try |
| { |
| txManager.Commit(); |
| } |
| catch (CommitConflictException) |
| { |
| FwkInfo("Got Expected exception as there was a write conflict"); |
| } |
| action = COMMIT; |
| doValidateTxOps(txId, action); |
| } |
| } |
| catch (Exception ex) |
| { |
| FwkException("ResumableTx.DoEntryOpsWithTX() Caught unexpected " + |
| "exception during entry '{0}' operation: {1}.", opcode, ex); |
| } |
| } |
| |
| public void doValidateTxOps(TransactionId txId, int txState) |
| { |
| FwkInfo("Inside Resumabletx:DoValidateTxOps"); |
| FwkInfo("Transactional Id is {0} and transaction state is={1}",txId,txState); |
| //int cnt =0; |
| //TKey key; |
| int txnState = txState; |
| TransactionId txnId = txId; |
| ResetKey("entryCount"); |
| int EntryCount=GetUIntValue("entryCount"); |
| CacheTransactionManager txManager = CacheHelper<TKey, TVal>.DCache.CacheTransactionManager; |
| try |
| { |
| switch (txnState) |
| { |
| case 1001: |
| FwkInfo("...txnState is Begin ..."); |
| checkContainsKey(txnState); |
| break; |
| case 2: |
| FwkInfo("...txnState is Suspend..."); |
| if (!txManager.IsSuspended(txId) && !txManager.Exists(txId)) |
| { |
| FwkException("After Suspend(),the Transaction should have been Suspended and should still exist"); |
| } |
| checkContainsKey(txnState); |
| break; |
| case 3: |
| FwkInfo("...txnState is Resume..."); |
| if (txManager.IsSuspended(txnId)) |
| { |
| FwkException("After Resume(),the Transaction should NOT have been Suspended"); |
| } |
| if (!txManager.Exists(txnId)) |
| { |
| FwkException("After Resume(),the Transaction should still exist"); |
| } |
| checkContainsKey(txnState); |
| break; |
| case 1004: |
| FwkInfo("...txnState is Commit..."); |
| if (txManager.IsSuspended(txnId)) |
| { |
| FwkException("After Commit(),the Transaction should NOT have been Suspended"); |
| } |
| if (txManager.Exists(txnId)) |
| { |
| FwkException("After Commit(),the Transaction should Not exist"); |
| } |
| if (txManager.TryResume(txnId)) |
| { |
| FwkException("After Commit(),the Transaction should Not be resumed"); |
| } |
| checkContainsKey(txnState); |
| FwkInfo("Commit BeforeTxMap.Count={0} ", BeforeTxMap.Count); |
| break; |
| } |
| |
| } |
| catch(Exception ex) |
| { |
| FwkException("doValidateTxOps caught exception {0}",ex.Message); |
| } |
| } |
| |
| public void checkContainsKey(int txnState) |
| { |
| IRegion<TKey, TVal> region = GetRegion(); |
| Dictionary<TKey, TVal> BfTxOpMap = new Dictionary<TKey, TVal>(); |
| Dictionary<TKey, TVal> AfTxOpMap = new Dictionary<TKey, TVal>(); |
| try |
| { |
| if (txnState == 3) |
| { |
| BfTxOpMap = BeforeTxMap["put"]; |
| AfTxOpMap = AfterTxMap["put"]; |
| } |
| else |
| BfTxOpMap = BeforeTxMap["put"]; |
| |
| } |
| catch (Exception e) |
| { |
| FwkException("Throwing this exception {0} BMap cpunt={1} AfMap Count={2}", e.Message, BfTxOpMap.Count, AfTxOpMap.Count); |
| } |
| FwkInfo("BfTxOpmap count is {0} and AfterTxMap is {1}", BfTxOpMap.Count, AfTxOpMap.Count); |
| foreach (KeyValuePair<TKey, TVal> bkp in BfTxOpMap) |
| { |
| FwkInfo("Inside BfTxOpMap"); |
| TKey BfTxKey = bkp.Key; |
| if (!region.ContainsKey(BfTxKey)) |
| FwkException("getKey: expected key {0} is not present in the region key set", bkp.Key.ToString()); |
| } |
| foreach (KeyValuePair<TKey, TVal> akp in AfTxOpMap) |
| { |
| FwkInfo("Inside AfTxOpMap"); |
| TKey AfTxKey = akp.Key; |
| if (!region.ContainsKey(AfTxKey)) |
| FwkException("getKey: expected key {0} is not present in the region key set", akp.Key.ToString()); |
| } |
| } |
| |
| #endregion |
| } |
| } |
| |