Pipe IT: Stablized the IoTDBPipeClusterIT.testPipeAfterDataRegionLeaderStop (#17725)
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index ad283d4..3a0aaec 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -51,6 +53,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -59,12 +63,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTableManualEnhanced.class})
public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT {
+ private static final double SYNC_LAG_DELTA = 0.001;
+
@Override
@Before
public void setUp() {
@@ -299,41 +306,7 @@
TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
- final AtomicInteger leaderPort = new AtomicInteger(-1);
- final TShowRegionResp showRegionResp =
- client.showRegion(new TShowRegionReq().setIsTableModel(true));
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderPort.set(regionInfo.getClientRpcPort());
- }
- });
-
- int leaderIndex = -1;
- for (int i = 0; i < 3; ++i) {
- if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
- leaderIndex = i;
- try {
- senderEnv.shutdownDataNode(i);
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (final InterruptedException ignored) {
- }
- try {
- senderEnv.startDataNode(i);
- ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- }
- }
+ final int leaderIndex = restartTableDataRegionLeader(client, "test1");
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}
@@ -343,6 +316,7 @@
TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, handleFailure);
+ waitForTableDataRegionReplicationComplete(Arrays.asList("test", "test1"));
}
try {
@@ -398,6 +372,140 @@
}
}
+ private int restartTableDataRegionLeader(
+ final SyncConfigNodeIServiceClient client, final String database) throws TException {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTableDataRegionLeaders(Collections.singletonList(database), client);
+ if (leaderRegionInfoList.isEmpty()) {
+ return -1;
+ }
+
+ final TRegionInfo targetRegionInfo =
+ leaderRegionInfoList.stream()
+ .min(Comparator.comparingInt(regionInfo -> regionInfo.getConsensusGroupId().getId()))
+ .orElse(null);
+ if (targetRegionInfo == null) {
+ return -1;
+ }
+
+ final int leaderPort = targetRegionInfo.getClientRpcPort();
+ for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
+ if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
+ continue;
+ }
+
+ try {
+ senderEnv.shutdownDataNode(i);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ return -1;
+ }
+
+ try {
+ senderEnv.startDataNode(i);
+ ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+ return i;
+ }
+ return -1;
+ }
+
+ private void waitForTableDataRegionReplicationComplete(final List<String> databases) {
+ await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTableDataRegionLeaders(databases, client);
+ Assert.assertFalse(
+ "No table DataRegion leader found for databases " + databases,
+ leaderRegionInfoList.isEmpty());
+
+ for (final TRegionInfo regionInfo : leaderRegionInfoList) {
+ final DataNodeWrapper leaderNode =
+ findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+ final String metricsUrl =
+ "http://"
+ + leaderNode.getIp()
+ + ":"
+ + leaderNode.getMetricPort()
+ + "/metrics";
+ final String metricsContent = senderEnv.getUrlContent(metricsUrl, null);
+ Assert.assertNotNull(
+ "Failed to fetch metrics from leader DataNode at " + metricsUrl,
+ metricsContent);
+ assertSyncLagIsZero(metricsContent, buildDataRegionTag(regionInfo), metricsUrl);
+ }
+ }
+ });
+ }
+
+ private List<TRegionInfo> showTableDataRegionLeaders(
+ final List<String> databases, final SyncConfigNodeIServiceClient client) throws TException {
+ final TShowRegionResp showRegionResp =
+ client.showRegion(
+ new TShowRegionReq()
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .setDatabases(databases)
+ .setIsTableModel(true));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ final List<TRegionInfo> result = new ArrayList<>();
+ for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ result.add(regionInfo);
+ }
+ }
+ return result;
+ }
+
+ private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
+ for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) {
+ if (dataNodeWrapper.getPort() == port) {
+ return dataNodeWrapper;
+ }
+ }
+ fail("Failed to find DataNodeWrapper for client rpc port " + port);
+ return null;
+ }
+
+ private String buildDataRegionTag(final TRegionInfo regionInfo) {
+ return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
+ }
+
+ private void assertSyncLagIsZero(
+ final String metricsContent, final String dataRegionTag, final String metricsUrl) {
+ for (final String line : metricsContent.split("\\R")) {
+ if (!line.startsWith("iot_consensus{")
+ || !line.contains("type=\"syncLag\"")
+ || !line.contains("region=\"" + dataRegionTag + "\"")) {
+ continue;
+ }
+ final int lastSpaceIndex = line.lastIndexOf(' ');
+ Assert.assertTrue("Malformed syncLag metric line: " + line, lastSpaceIndex > 0);
+ Assert.assertEquals(
+ "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line,
+ 0.0,
+ Double.parseDouble(line.substring(lastSpaceIndex + 1)),
+ SYNC_LAG_DELTA);
+ return;
+ }
+ fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
+ }
+
@Test
public void testPipeAfterRegisterNewDataNode() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);