| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteAtomicSequence; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.affinity.AffinityKey; |
| import org.apache.ignite.cache.affinity.AffinityKeyMapped; |
| import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; |
| import org.apache.ignite.cache.query.annotations.QueryGroupIndex; |
| import org.apache.ignite.cache.query.annotations.QuerySqlField; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.compute.ComputeJob; |
| import org.apache.ignite.compute.ComputeJobAdapter; |
| import org.apache.ignite.compute.ComputeJobResult; |
| import org.apache.ignite.compute.ComputeTaskFuture; |
| import org.apache.ignite.compute.ComputeTaskSplitAdapter; |
| import org.apache.ignite.configuration.AtomicConfiguration; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; |
| import org.apache.ignite.internal.util.GridAtomicLong; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.P1; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.T5; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheRebalanceMode.NONE; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * |
| */ |
| public class GridCacheConcurrentTxMultiNodeLoadTest extends GridCommonAbstractTest { |
| /** Timers. */ |
| private static final ConcurrentMap<Thread, ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>>> timers = |
| new ConcurrentHashMap<>(); |
| |
| /** */ |
| private static final long PRINT_FREQ = 10000; |
| |
| /** */ |
| private static final GridAtomicLong lastPrint = new GridAtomicLong(); |
| |
| /** */ |
| private static final IgnitePredicate<ClusterNode> serverNode = new P1<ClusterNode>() { |
| @Override public boolean apply(ClusterNode n) { |
| String igniteInstanceName = G.ignite(n.id()).name(); |
| |
| return igniteInstanceName != null && igniteInstanceName.contains("server"); |
| } |
| }; |
| |
| /** */ |
| private static final IgnitePredicate<ClusterNode> clientNode = new P1<ClusterNode>() { |
| @Override public boolean apply(ClusterNode n) { |
| String igniteInstanceName = G.ignite(n.id()).name(); |
| |
| return igniteInstanceName != null && igniteInstanceName.contains("client"); |
| } |
| }; |
| |
| /** */ |
| private CacheMode mode = PARTITIONED; |
| |
| /** */ |
| private boolean cacheOn; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration c = super.getConfiguration(igniteInstanceName); |
| |
| c.getTransactionConfiguration().setDefaultTxConcurrency(PESSIMISTIC); |
| c.getTransactionConfiguration().setDefaultTxIsolation(REPEATABLE_READ); |
| |
| AtomicConfiguration atomicCfg = new AtomicConfiguration(); |
| |
| atomicCfg.setAtomicSequenceReserveSize(100000); |
| atomicCfg.setCacheMode(mode); |
| |
| c.setAtomicConfiguration(atomicCfg); |
| |
| if (cacheOn) { |
| CacheConfiguration cc = defaultCacheConfiguration(); |
| |
| cc.setCacheMode(mode); |
| |
| LruEvictionPolicy plc = new LruEvictionPolicy(); |
| plc.setMaxSize(1000); |
| |
| cc.setEvictionPolicy(plc); |
| cc.setOnheapCacheEnabled(true); |
| cc.setWriteSynchronizationMode(FULL_SYNC); |
| cc.setRebalanceMode(NONE); |
| |
| c.setCacheConfiguration(cc); |
| } |
| else |
| c.setCacheConfiguration(); |
| |
| c.setPeerClassLoadingEnabled(false); |
| |
| return c; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return Long.MAX_VALUE; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testEvictions() throws Exception { |
| try { |
| cacheOn = true; |
| |
| Ignite srvr1 = startGrid("server1"); |
| |
| srvr1.atomicSequence("ID", 0, true); |
| |
| startGrid("server2"); |
| |
| cacheOn = false; |
| |
| // Client processes count. |
| int clientCnt = 8; |
| |
| for (int i = 1; i <= clientCnt; i++) |
| startGrid("client" + i); |
| |
| Collection<ClusterNode> srvrNodes = srvr1.cluster().forPredicate(serverNode).nodes(); |
| Collection<ClusterNode> clientNodes = srvr1.cluster().forPredicate(clientNode).nodes(); |
| |
| assert srvrNodes.size() == 2; |
| |
| // Threads count per each client process. |
| int threadCnt = 2; |
| |
| int srvrMaxNoTerminals = threadCnt / srvrNodes.size(); |
| |
| if (srvrMaxNoTerminals * srvrNodes.size() != threadCnt) { |
| threadCnt = srvrMaxNoTerminals * srvrNodes.size(); |
| |
| info("Using " + threadCnt + " threads instead to ensure equal distribution of terminals"); |
| } |
| |
| Collection<Callable<Object>> clients = new ArrayList<>(threadCnt * clientCnt); |
| |
| info("No of servers: " + srvrNodes.size()); |
| info("No of clients: " + clientNodes.size()); |
| info("Thread count: " + threadCnt); |
| info("Max number of terminals / server: " + srvrMaxNoTerminals); |
| |
| // Distribute terminals evenly across all servers |
| for (ClusterNode node : srvrNodes) { |
| UUID srvrId = node.id(); |
| |
| info(">>> Node ID: " + srvrId); |
| |
| int terminalsPerSrvr = 0; |
| |
| int tid = 0; // Terminal ID. |
| |
| while (true) { |
| String terminalId = String.valueOf(++tid); |
| |
| // Server partition cache |
| UUID mappedId = srvr1.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(terminalId).id(); |
| |
| if (!srvrId.equals(mappedId)) |
| continue; |
| |
| info("Affinity mapping [key=" + terminalId + ", nodeId=" + mappedId + ']'); |
| |
| for (int i = 1; i <= clientCnt; i++) |
| clients.add(new Client(G.ignite("client" + i), terminalId, srvrId)); |
| |
| info("Terminal ID: " + terminalId); |
| |
| terminalsPerSrvr++; |
| |
| if (terminalsPerSrvr == srvrMaxNoTerminals) |
| break; |
| } |
| } |
| |
| displayReqCount(); |
| |
| ExecutorService pool = Executors.newFixedThreadPool(clients.size()); |
| |
| pool.invokeAll(clients); |
| |
| Thread.sleep(Long.MAX_VALUE); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void displayReqCount() { |
| new Thread(new Runnable() { |
| @SuppressWarnings({"BusyWait"}) |
| @Override public void run() { |
| int interval = 10; |
| |
| while (true) { |
| long cnt0 = Client.txCnt.get(); |
| long lt0 = Client.latency.get(); |
| |
| try { |
| Thread.sleep(interval * 1000); |
| } |
| catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| |
| long cnt1 = Client.txCnt.get(); |
| long lt1 = Client.latency.get(); |
| |
| info(">>>"); |
| info(">>> Transaction/s: " + (cnt1 - cnt0) / interval); |
| info(">>> Avg Latency: " + ((cnt1 - cnt0) > 0 ? (lt1 - lt0) / (cnt1 - cnt0) + "ms" : "invalid")); |
| info(">>> Max Submit Time: " + Client.submitTime.getAndSet(0)); |
| |
| try { |
| PerfJob.printTimers(); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| }).start(); |
| } |
| |
| /** |
| * |
| */ |
| private static class Client implements Callable<Object> { |
| /** */ |
| private static AtomicLong txCnt = new AtomicLong(); |
| |
| /** */ |
| private static AtomicLong latency = new AtomicLong(); |
| |
| /** */ |
| private static GridAtomicLong submitTime = new GridAtomicLong(); |
| |
| /** */ |
| private Ignite g; |
| |
| /** */ |
| private String terminalId; |
| |
| /** */ |
| private UUID nodeId; |
| |
| /** |
| * @param g Grid. |
| * @param terminalId Terminal ID. |
| * @param nodeId Node ID. |
| */ |
| private Client(Ignite g, String terminalId, UUID nodeId) { |
| this.g = g; |
| this.terminalId = terminalId; |
| this.nodeId = nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"InfiniteLoopStatement"}) |
| @Override public Object call() throws Exception { |
| while (true) { |
| try { |
| long t0 = System.currentTimeMillis(); |
| |
| long submitTime1 = t0; |
| |
| IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode)); |
| |
| ComputeTaskFuture<Void> f1 = comp.executeAsync(RequestTask.class, new Message(terminalId, nodeId)); |
| |
| submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); |
| |
| f1.get(); |
| |
| submitTime1 = System.currentTimeMillis(); |
| |
| ComputeTaskFuture<Void> f2 = comp.executeAsync(ResponseTask.class, new Message(terminalId, nodeId)); |
| |
| submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); |
| |
| f2.get(); |
| |
| long t1 = System.currentTimeMillis(); |
| |
| txCnt.incrementAndGet(); |
| |
| latency.addAndGet(t1 - t0); |
| } |
| catch (IgniteException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class Message implements Serializable { |
| /** */ |
| private String terminalId; |
| |
| /** */ |
| private UUID nodeId; |
| |
| /** |
| * @param terminalId Terminal ID. |
| * @param nodeId Node ID. |
| */ |
| Message(String terminalId, UUID nodeId) { |
| this.terminalId = terminalId; |
| this.nodeId = nodeId; |
| } |
| |
| /** |
| * @return Terminal ID. |
| */ |
| String getTerminalId() { |
| return terminalId; |
| } |
| |
| /** |
| * @param terminalId Terminal ID. |
| */ |
| void setTerminalId(String terminalId) { |
| this.terminalId = terminalId; |
| } |
| |
| /** |
| * @return Node ID. |
| */ |
| UUID getNodeId() { |
| return nodeId; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| */ |
| void setNodeId(UUID nodeId) { |
| this.nodeId = nodeId; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class PerfJob extends ComputeJobAdapter { |
| /** */ |
| private static final long MAX = 5000; |
| |
| /** */ |
| @AffinityKeyMapped |
| private String affKey; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** |
| * @param msg Message. |
| */ |
| PerfJob(Message msg) { |
| super(msg); |
| |
| affKey = msg.getTerminalId(); |
| } |
| |
| /** |
| * @return Message. |
| */ |
| private Message message() { |
| return argument(0); |
| } |
| |
| /** |
| * @return Terminal ID. |
| */ |
| public String terminalId() { |
| return message().getTerminalId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object execute() { |
| ConcurrentMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap(); |
| |
| T2<AtomicLong, AtomicLong> cntrs = nodeLoc.get("cntrs"); |
| |
| if (cntrs == null) { |
| T2<AtomicLong, AtomicLong> other = nodeLoc.putIfAbsent("cntrs", |
| cntrs = new T2<>(new AtomicLong(), new AtomicLong(System.currentTimeMillis()))); |
| |
| if (other != null) |
| cntrs = other; |
| } |
| |
| long cnt = cntrs.get1().incrementAndGet(); |
| |
| doWork(); |
| |
| GridNearCacheAdapter near = (GridNearCacheAdapter)((IgniteKernal) ignite).internalCache(DEFAULT_CACHE_NAME); |
| GridDhtCacheAdapter dht = near.dht(); |
| |
| long start = cntrs.get2().get(); |
| |
| long now = System.currentTimeMillis(); |
| |
| long dur = now - start; |
| |
| if (dur > 20000 && cntrs.get2().compareAndSet(start, System.currentTimeMillis())) { |
| cntrs.get1().set(0); |
| |
| X.println("Stats [tx/sec=" + (cnt / (dur / 1000)) + ", nearSize=" + near.size() + |
| ", dhtSize=" + dht.size() + ']'); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param name Timer name. |
| * @param xid XID. |
| * @param key Key. |
| * @param termId Terminal ID. |
| */ |
| private void startTimer(String name, @Nullable IgniteUuid xid, @Nullable String key, String termId) { |
| ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> m = timers.get(Thread.currentThread()); |
| |
| if (m == null) { |
| ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> old = |
| timers.putIfAbsent(Thread.currentThread(), |
| m = new ConcurrentHashMap<>()); |
| |
| if (old != null) |
| m = old; |
| } |
| |
| T5<Long, Long, Long, IgniteUuid, Object> t = m.get(name); |
| |
| if (t == null) { |
| T5<Long, Long, Long, IgniteUuid, Object> old = m.putIfAbsent(name, |
| t = new T5<>()); |
| |
| if (old != null) |
| t = old; |
| } |
| |
| t.set1(System.currentTimeMillis()); |
| t.set2(0L); |
| t.set4(xid); |
| t.set5(key == null ? null : new AffinityKey<String>(key, termId) {}); |
| } |
| |
| /** |
| * @param name Timer name. |
| */ |
| private void stopTimer(String name) { |
| ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> m = timers.get(Thread.currentThread()); |
| |
| T5<Long, Long, Long, IgniteUuid, Object> t = m.get(name); |
| |
| assert t != null; |
| |
| long now = System.currentTimeMillis(); |
| |
| t.set2(now); |
| t.set3(Math.max(t.get3() == null ? 0 : t.get3(), now - t.get1())); |
| t.set4(null); |
| t.set5(null); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private static void printTimers() throws Exception { |
| //String termId = terminalId(); |
| |
| long now = System.currentTimeMillis(); |
| |
| if (lastPrint.get() + PRINT_FREQ < now && lastPrint.setIfGreater(now)) { |
| Map<String, Long> maxes = new HashMap<>(); |
| |
| Set<AffinityKey<String>> keys = null; |
| |
| for (Map.Entry<Thread, ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>>> e1 : timers.entrySet()) { |
| for (Map.Entry<String, T5<Long, Long, Long, IgniteUuid, Object>> e2 : e1.getValue().entrySet()) { |
| T5<Long, Long, Long, IgniteUuid, Object> t = e2.getValue(); |
| |
| long start = t.get1(); |
| long end = t.get2(); |
| |
| IgniteUuid xid = t.get4(); |
| |
| long duration = end == 0 ? now - start : end - start; |
| |
| long max = t.get3() == null ? duration : t.get3(); |
| |
| if (duration < 0) |
| duration = now - start; |
| |
| if (duration > MAX) { |
| X.println("Maxed out timer [name=" + e2.getKey() + ", key=" + t.get5() + |
| ", duration=" + duration + ", ongoing=" + (end == 0) + |
| ", thread=" + e1.getKey().getName() + ", xid=" + xid + ']'); |
| |
| AffinityKey<String> key = (AffinityKey<String>)t.get5(); |
| |
| if (key != null) { |
| if (keys == null) |
| keys = new LinkedHashSet<>(); |
| |
| keys.add(key); |
| } |
| } |
| |
| Long cmax = maxes.get(e2.getKey()); |
| |
| if (cmax == null || max > cmax) |
| maxes.put(e2.getKey(), max); |
| |
| t.set3(null); |
| } |
| } |
| |
| if (!F.isEmpty(keys)) { |
| for (Ignite g : G.allGrids()) { |
| if (g.name().contains("server")) { |
| GridNearCacheAdapter<AffinityKey<String>, Object> near = |
| (GridNearCacheAdapter<AffinityKey<String>, Object>)((IgniteKernal)g). |
| <AffinityKey<String>, Object>internalCache(DEFAULT_CACHE_NAME); |
| GridDhtCacheAdapter<AffinityKey<String>, Object> dht = near.dht(); |
| |
| for (AffinityKey<String> k : keys) { |
| GridNearCacheEntry nearEntry = (GridNearCacheEntry)near.peekEx(k); |
| GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(k); |
| |
| X.println("Near entry [igniteInstanceName=" + g.name() + ", key=" + k + ", entry=" + |
| nearEntry); |
| X.println("DHT entry [igniteInstanceName=" + g.name() + ", key=" + k + ", entry=" + |
| dhtEntry); |
| } |
| } |
| } |
| } |
| |
| for (Map.Entry<String, Long> e : maxes.entrySet()) |
| X.println("Timer [name=" + e.getKey() + ", maxTime=" + e.getValue() + ']'); |
| |
| X.println(">>>>"); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void doWork() { |
| Session ses = new Session(terminalId()); |
| |
| try { |
| try (Transaction tx = ignite.transactions().txStart()) { |
| Request req = new Request(getId()); |
| |
| req.setMessageId(getId()); |
| |
| String key = req.getCacheKey(); |
| |
| startTimer("putRequest", tx.xid(), key, terminalId()); |
| |
| put(req, key, terminalId()); |
| |
| stopTimer("putRequest"); |
| // |
| // for (int i = 0; i < 5; i++) { |
| // Response rsp = new Response(getId()); |
| // |
| // startTimer("putResponse-" + i, tx.xid()); |
| // |
| // put(rsp, rsp.getCacheKey(), terminalId()); |
| // |
| // stopTimer("putResponse-" + i); |
| // } |
| |
| key = ses.getCacheKey(); |
| |
| startTimer("putSession", tx.xid(), key, terminalId()); |
| |
| put(ses, key, terminalId()); |
| |
| stopTimer("putSession"); |
| |
| startTimer("commit", tx.xid(), null, terminalId()); |
| |
| tx.commit(); |
| |
| stopTimer("commit"); |
| } |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| /** |
| * @return New ID. |
| */ |
| private long getId() { |
| IgniteAtomicSequence seq = ignite.atomicSequence("ID", 0, true); |
| |
| return seq.incrementAndGet(); |
| } |
| |
| /** |
| * @param o Object to put. |
| * @param cacheKey Cache key. |
| * @param terminalId Terminal ID. |
| */ |
| private void put(Object o, String cacheKey, String terminalId) { |
| // CacheProjection<AffinityKey<String>, Object> cache = ((IgniteKernal)ignite).cache(DEFAULT_CACHE_NAME); |
| // |
| // AffinityKey<String> affinityKey = new AffinityKey<>(cacheKey, terminalId); |
| // |
| // Entry<AffinityKey<String>, Object> entry = cache.entry(affinityKey); |
| // |
| // entry.setx(o); |
| } |
| |
| /** |
| * @param cacheKey Cache key. |
| * @param terminalId Terminal ID. |
| * @return Cached object. |
| */ |
| @SuppressWarnings({"RedundantCast"}) |
| private <T> Object get(String cacheKey, String terminalId) { |
| Object key = new AffinityKey<>(cacheKey, terminalId); |
| |
| return (T) ignite.cache(DEFAULT_CACHE_NAME).get(key); |
| } |
| } |
| |
| /** |
| * |
| */ |
| @QueryGroupIndex(name = "msg_tx") |
| private static class Request implements Serializable { |
| /** */ |
| @QuerySqlField(index = true) |
| private Long id; |
| |
| /** */ |
| @QuerySqlField(name = "messageId") |
| @QuerySqlField.Group(name = "msg_tx", order = 3) |
| private long msgId; |
| |
| /** */ |
| @QuerySqlField(name = "transactionId") |
| @QuerySqlField.Group(name = "msg_tx", order = 1) |
| private long txId; |
| |
| /** |
| * @param id Request ID. |
| */ |
| Request(long id) { |
| this.id = id; |
| } |
| |
| /** |
| * @param msgId Message ID. |
| */ |
| public void setMessageId(long msgId) { |
| this.msgId = msgId; |
| } |
| |
| /** |
| * @return Cache key. |
| */ |
| public String getCacheKey() { |
| return "RESPONSE:" + id.toString(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class Response implements Serializable { |
| /** */ |
| @QuerySqlField |
| private Long id; |
| |
| /** */ |
| @QuerySqlField(name = "messageId") |
| private long msgId; |
| |
| /** */ |
| @QuerySqlField(name = "transactionId") |
| private long txId; |
| |
| /** |
| * @param id Response ID. |
| */ |
| Response(long id) { |
| this.id = id; |
| } |
| |
| /** |
| * @return Cache key. |
| */ |
| public String getCacheKey() { |
| return "REQUEST:" + id.toString(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class Session implements Serializable { |
| /** */ |
| @QuerySqlField(index = true) |
| private String terminalId; |
| |
| /** |
| * @param terminalId Terminal ID. |
| */ |
| Session(String terminalId) { |
| this.terminalId = terminalId; |
| } |
| |
| /** |
| * @return Cache key. |
| */ |
| public String getCacheKey() { |
| return "SESSION:" + terminalId; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class ResponseTask extends ComputeTaskSplitAdapter<Message, Void> { |
| /** {@inheritDoc} */ |
| @Override protected Collection<? extends ComputeJob> split(int arg0, Message msg) { |
| return Collections.singletonList(new PerfJob(msg)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Void reduce(List<ComputeJobResult> results) { |
| return null; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RequestTask extends ComputeTaskSplitAdapter<Message, Void> { |
| /** {@inheritDoc} */ |
| @Override protected Collection<? extends ComputeJob> split(int arg0, Message msg) { |
| return Collections.singletonList(new PerfJob(msg)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Void reduce(List<ComputeJobResult> results) { |
| return null; |
| } |
| } |
| } |