update flow balancer
diff --git a/cluster/collect-log-dc.sh b/cluster/collect-log-dc.sh new file mode 100644 index 0000000..b4339e0 --- /dev/null +++ b/cluster/collect-log-dc.sh
@@ -0,0 +1,13 @@ +src_path=/home/jt/iotdb_expr_vg/logs/* + +ips=(dc16 dc17 dc18) +#ips=(dc11 dc12 dc13 dc14 dc11 dc12) +target_path=/d/CodeRepo/iotdb/cluster/target/logs + +mkdir $target_path +rm $target_path/* +for ip in ${ips[*]} + do + mkdir $target_path/$ip + scp -r jt@$ip:$src_path $target_path/$ip + done \ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 8c897e5..d26f0e4 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -225,6 +225,10 @@ private int flowMonitorMaxWindowSize = 1000; private long flowMonitorWindowInterval = 1000; + private boolean useFollowerLoadBalance = false; + private int followerLoadBalanceWindowsToUse = 3; + private double followerLoadBalanceOverestimateFactor = 1.1; + private int logDispatcherBatchSize = 10; /** * create a clusterConfig class. The internalIP will be set according to the server's hostname. If @@ -713,4 +717,37 @@ public long getFlowMonitorWindowInterval() { return flowMonitorWindowInterval; } + + public boolean isUseFollowerLoadBalance() { + return useFollowerLoadBalance; + } + + public void setUseFollowerLoadBalance(boolean useFollowerLoadBalance) { + this.useFollowerLoadBalance = useFollowerLoadBalance; + } + + public int getFollowerLoadBalanceWindowsToUse() { + return followerLoadBalanceWindowsToUse; + } + + public void setFollowerLoadBalanceWindowsToUse(int followerLoadBalanceWindowsToUse) { + this.followerLoadBalanceWindowsToUse = followerLoadBalanceWindowsToUse; + } + + public double getFollowerLoadBalanceOverestimateFactor() { + return followerLoadBalanceOverestimateFactor; + } + + public void setFollowerLoadBalanceOverestimateFactor( + double followerLoadBalanceOverestimateFactor) { + this.followerLoadBalanceOverestimateFactor = followerLoadBalanceOverestimateFactor; + } + + public int getLogDispatcherBatchSize() { + return logDispatcherBatchSize; + } + + public void setLogDispatcherBatchSize(int logDispatcherBatchSize) { + this.logDispatcherBatchSize = logDispatcherBatchSize; + } }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index 42a5b80..6b1c757 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -397,6 +397,28 @@ properties.getProperty( "enable_instrumenting", String.valueOf(config.isEnableInstrumenting())))); + config.setUseFollowerLoadBalance( + Boolean.parseBoolean( + properties.getProperty( + "use_follower_load_balance", String.valueOf(config.isUseFollowerLoadBalance())))); + + config.setFollowerLoadBalanceWindowsToUse( + Integer.parseInt( + properties.getProperty( + "follower_load_balance_windows_to_use", + String.valueOf(config.getFollowerLoadBalanceWindowsToUse())))); + + config.setFollowerLoadBalanceOverestimateFactor( + Double.parseDouble( + properties.getProperty( + "follower_load_balance_overestimate_factor", + String.valueOf(config.getFollowerLoadBalanceOverestimateFactor())))); + + config.setLogDispatcherBatchSize( + Integer.parseInt( + properties.getProperty( + "log_dispatcher_batch_size", String.valueOf(config.getLogDispatcherBatchSize())))); + String consistencyLevel = properties.getProperty("consistency_level"); if (consistencyLevel != null) { config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java index 8042a03..b14ece5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -37,10 +37,14 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,11 +56,15 @@ private AtomicLong requestCounter = new AtomicLong(); private AtomicLong latencySum = new AtomicLong(); + private AtomicLong burstRequestCounter = new AtomicLong(); + private AtomicLong burstLatencySum = new AtomicLong(); private long maxLatency = 0; private int threadNum = 64; private int workloadSize = 64 * 1024; private int printInterval = 1000; private ClientManager clientPool; + private long maxRunningSecond; + private long burstInterval; private int maxRequestNum; private ExecutorService pool = Executors.newCachedThreadPool(); private List<Node> nodeList = new ArrayList<>(); @@ -65,12 +73,16 @@ private List<EndPoint> endPoints = new ArrayList<>(); private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>(); private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>(); + private long startTime; + private volatile boolean duringBurst = false; + private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public ExprBench(Node target) { clientPool = new ClientManager(false, Type.MetaGroupClient); } private static class EndPoint { + private Node node; private int raftId; @@ -86,6 +98,7 @@ } private static class Statistic { + private AtomicLong sum = new AtomicLong(); private AtomicLong cnt = new AtomicLong(); @@ -100,91 +113,142 @@ } } + private void benchmarkTask(int taskId) { + int endPointIdx = taskId % endPoints.size(); + Client client = null; + + ExecutNonQueryReq request = new ExecutNonQueryReq(); + DummyPlan plan = new DummyPlan(); + plan.setWorkload(new byte[workloadSize]); + plan.setNeedForward(true); + + ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096); + Map<EndPoint, Node> endPointLeaderMap = new HashMap<>(); + + Node target = null; + long currRequsetNum = -1; + while (true) { + EndPoint endPoint = endPoints.get(endPointIdx); + RateLimiter rateLimiter = rateLimiterMap.get(endPoint); + if (rateLimiter != null) { + rateLimiter.acquire(1); + } + + target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node); + int raftId = endPoint.raftId; + plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId); + + try { + client = clientPool.borrowSyncClient(target, ClientCategory.META); + } catch (IOException e) { + e.printStackTrace(); + } + + byteBuffer.clear(); + plan.serialize(byteBuffer); + byteBuffer.flip(); + request.planBytes = byteBuffer; + request.setPlanBytesIsSet(true); + + long reqLatency = System.nanoTime(); + try { + TSStatus status = client.executeNonQueryPlan(request); + clientPool.returnSyncClient(client, target, ClientCategory.META); + if (status.isSetRedirectNode()) { + Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880); + endPointLeaderMap.put(endPoint, leader); + logger.debug("Leader of {} is changed to {}", endPoint, leader); + } + + currRequsetNum = requestCounter.incrementAndGet(); + if (currRequsetNum > threadNum * 10L) { + reqLatency = System.nanoTime() - reqLatency; + maxLatency = Math.max(maxLatency, reqLatency); + latencySum.addAndGet(reqLatency); + latencyMap.get(endPoint).add(reqLatency); + if (duringBurst) { + burstRequestCounter.incrementAndGet(); + burstLatencySum.addAndGet(reqLatency); + } + } + } catch (TException e) { + e.printStackTrace(); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + if (currRequsetNum % printInterval == 0) { + System.out.println( + String.format( + "%s %d %d %f(%f) %f %f", + dateFormat.format(new Date(System.currentTimeMillis())), + elapsedTime, + currRequsetNum, + (currRequsetNum + 0.0) / elapsedTime, + currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime, + maxLatency / 1000.0, + (latencySum.get() + 0.0) / currRequsetNum)); + System.out.println(latencyMap); + } + + if (currRequsetNum >= maxRequestNum || elapsedTime / 1000 >= maxRunningSecond) { + break; + } + } + } + + private void insertBurst() { + long burstStart = maxRunningSecond / 2 - burstInterval / 2; + long burstEnd = maxRunningSecond / 2 + burstInterval / 2; + + long elapsedTime = (System.currentTimeMillis() - startTime) / 1000; + while (elapsedTime < burstStart) { + try { + Thread.sleep(1000); + elapsedTime = (System.currentTimeMillis() - startTime) / 1000; + } catch (InterruptedException e) { + logger.warn("Unexpected interruption"); + } + } + duringBurst = true; + System.out.printf("Burst starts"); + for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) { + RateLimiter rateLimiter = endPointRateLimiterEntry.getValue(); + rateLimiter.setRate(rateLimiter.getRate() * 2); + } + + while (elapsedTime < burstEnd) { + try { + Thread.sleep(1000); + elapsedTime = (System.currentTimeMillis() - startTime) / 1000; + } catch (InterruptedException e) { + logger.warn("Unexpected interruption"); + } + } + duringBurst = false; + System.out.printf("Burst ends"); + for (Entry<EndPoint, RateLimiter> endPointRateLimiterEntry : rateLimiterMap.entrySet()) { + RateLimiter rateLimiter = endPointRateLimiterEntry.getValue(); + rateLimiter.setRate(rateLimiter.getRate() / 2); + } + } + public void benchmark() { - long startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { - int finalI = i; - pool.submit( - () -> { - int endPointIdx = finalI % endPoints.size(); - Client client = null; - - ExecutNonQueryReq request = new ExecutNonQueryReq(); - DummyPlan plan = new DummyPlan(); - plan.setWorkload(new byte[workloadSize]); - plan.setNeedForward(true); - - ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096); - Map<EndPoint, Node> endPointLeaderMap = new HashMap<>(); - - Node target = null; - long currRequsetNum = -1; - while (true) { - - EndPoint endPoint = endPoints.get(endPointIdx); - RateLimiter rateLimiter = rateLimiterMap.get(endPoint); - if (rateLimiter != null) { - rateLimiter.acquire(1); - } - - target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node); - int raftId = endPoint.raftId; - plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId); - - try { - client = clientPool.borrowSyncClient(target, ClientCategory.META); - } catch (IOException e) { - e.printStackTrace(); - } - - byteBuffer.clear(); - plan.serialize(byteBuffer); - byteBuffer.flip(); - request.planBytes = byteBuffer; - request.setPlanBytesIsSet(true); - - long reqLatency = System.nanoTime(); - try { - TSStatus status = client.executeNonQueryPlan(request); - clientPool.returnSyncClient(client, target, ClientCategory.META); - if (status.isSetRedirectNode()) { - Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880); - endPointLeaderMap.put(endPoint, leader); - logger.debug("Leader of {} is changed to {}", endPoint, leader); - } - - currRequsetNum = requestCounter.incrementAndGet(); - if (currRequsetNum > threadNum * 10) { - reqLatency = System.nanoTime() - reqLatency; - maxLatency = Math.max(maxLatency, reqLatency); - latencySum.addAndGet(reqLatency); - latencyMap.get(endPoint).add(reqLatency); - } - } catch (TException e) { - e.printStackTrace(); - } - - if (currRequsetNum % printInterval == 0) { - long elapsedTime = System.currentTimeMillis() - startTime; - System.out.println( - String.format( - "%d %d %f(%f) %f %f", - elapsedTime, - currRequsetNum, - (currRequsetNum + 0.0) / elapsedTime, - currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime, - maxLatency / 1000.0, - (latencySum.get() + 0.0) / currRequsetNum)); - System.out.println(latencyMap); - } - - if (currRequsetNum >= maxRequestNum) { - break; - } - } - }); + int taskId = i; + pool.submit(() -> benchmarkTask(taskId)); } pool.shutdown(); + if (burstInterval > 0) { + insertBurst(); + } + while (!pool.isTerminated()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.warn("Unexpected interruption"); + } + } } public void setMaxRequestNum(int maxRequestNum) { @@ -195,11 +259,13 @@ ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000); Node target = new Node(); ExprBench bench = new ExprBench(target); - bench.maxRequestNum = Integer.parseInt(args[0]); - bench.threadNum = Integer.parseInt(args[1]); - bench.workloadSize = Integer.parseInt(args[2]) * 1024; - bench.printInterval = Integer.parseInt(args[3]); - String[] nodesSplit = args[4].split(","); + bench.maxRunningSecond = Integer.parseInt(args[0]); + bench.burstInterval = Integer.parseInt(args[1]); + bench.maxRequestNum = Integer.parseInt(args[2]); + bench.threadNum = Integer.parseInt(args[3]); + bench.workloadSize = Integer.parseInt(args[4]) * 1024; + bench.printInterval = Integer.parseInt(args[5]); + String[] nodesSplit = args[6].split(","); for (String s : nodesSplit) { String[] nodeSplit = s.split(":"); Node node = new Node(); @@ -207,13 +273,13 @@ node.setMetaPort(Integer.parseInt(nodeSplit[1])); bench.nodeList.add(node); } - String[] raftFactorSplit = args[5].split(","); + String[] raftFactorSplit = args[7].split(","); bench.raftFactors = new int[raftFactorSplit.length]; for (int i = 0; i < raftFactorSplit.length; i++) { bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]); } - if (args.length >= 7) { - String[] ratesSplit = args[6].split(","); + if (args.length >= 9) { + String[] ratesSplit = args[8].split(","); bench.rateLimits = new int[ratesSplit.length]; for (int i = 0; i < ratesSplit.length; i++) { bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]); @@ -236,5 +302,14 @@ bench.benchmark(); System.out.println(bench.latencyMap); + if (bench.burstInterval > 0) { + long burstRequest = bench.burstRequestCounter.get(); + long burstLatencySum = bench.burstLatencySum.get(); + double burstAvgLatency = burstLatencySum * 1.0 / burstRequest; + double burstThroughput = burstRequest * 1.0 / bench.burstInterval; + System.out.printf( + "Statistics during burst: num request %d, throughput %f, latency %f", + burstRequest, burstThroughput, burstAvgLatency); + } } }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java index 9cb507e..3afaa2b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/flowcontrol/FlowBalancer.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.expr.flowcontrol; +import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.log.LogDispatcher; import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; @@ -42,7 +43,10 @@ private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class); private double maxFlow = 900_000_000; private double minFlow = 10_000_000; - private int windowsToUse = 3; + private int windowsToUse = + ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceWindowsToUse(); + private double overestimateFactor = + ClusterDescriptor.getInstance().getConfig().getFollowerLoadBalanceOverestimateFactor(); private int flowBalanceIntervalMS = 1000; private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE; private LogDispatcher logDispatcher; @@ -81,7 +85,7 @@ int followerNum = nodeNum - 1; double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse); - double assumedFlow = thisNodeFlow * 1.1; + double assumedFlow = thisNodeFlow * overestimateFactor; logger.info("Flow of this node: {}", thisNodeFlow); Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap(); @@ -108,7 +112,7 @@ int i = 0; for (; i < quorumFollowerNum; i++) { Node node = followers.get(i); - nodesRate.put(node, flowToQuorum); + nodesRate.put(node, maxFlow); remainingFlow -= flowToQuorum; } double flowToRemaining = remainingFlow / (followerNum - quorumFollowerNum);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java index 3086444..dd7dfca 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -106,8 +106,6 @@ } void createQueueAndBindingThreads() { - double baseRate = 300_000_000.0; - int i = 1; for (Node node : member.getAllNodes()) { if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) { BlockingQueue<SendLogRequest> logBlockingQueue; @@ -117,13 +115,11 @@ nodesLogQueuesMap.put(node, logBlockingQueue); FlowMonitorManager.INSTANCE.register(node); nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE)); - nodesRate.put(node, baseRate * i); - i += 100; } } updateRateLimiter(); - for (i = 0; i < bindingThreadNum; i++) { + for (int i = 0; i < bindingThreadNum; i++) { for (Entry<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesMap.entrySet()) { executorServices .computeIfAbsent( @@ -420,20 +416,24 @@ currBatch.get(0).getVotingLog().getLog().getCurrLogIndex(), currBatch.get(currBatch.size() - 1).getVotingLog().getLog().getCurrLogIndex()); while (logIndex < currBatch.size()) { - long logSize = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize(); + long logSize = 0; + long logSizeLimit = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize(); List<ByteBuffer> logList = new ArrayList<>(); int prevIndex = logIndex; for (; logIndex < currBatch.size(); logIndex++) { long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length; - if (logSize - curSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) { + if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) { break; } - logSize -= curSize; + logSize += curSize; logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry); } AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex); + FlowMonitorManager.INSTANCE.report(receiver, logSize); + nodesRateLimiter.get(receiver).acquire((int) logSize); + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex)); } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 6bd6f6a..6fa543f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -331,8 +331,10 @@ startBackGroundThreads(); setSkipElection(false); FlowMonitorManager.INSTANCE.register(thisNode); - flowBalancer = new FlowBalancer(logDispatcher, this); - flowBalancer.start(); + if (config.isUseFollowerLoadBalance()) { + flowBalancer = new FlowBalancer(logDispatcher, this); + flowBalancer.start(); + } logger.info("{} started", name); } @@ -456,7 +458,9 @@ heartBeatService = null; appendLogThreadPool = null; - flowBalancer.stop(); + if (flowBalancer != null) { + flowBalancer.stop(); + } logger.info("Member {} stopped", name); }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java index 2d2b860..f127a28 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -498,8 +498,10 @@ private static void printTo(Statistic currNode, StringBuilder out) { if (currNode != Statistic.ROOT && currNode.valid) { - indent(out, currNode.level); - out.append(currNode).append("\n"); + if (currNode.counter.get() != 0) { + indent(out, currNode.level); + out.append(currNode).append("\n"); + } } for (Statistic child : currNode.children) { printTo(child, out);