blob: d758af63dc61fba2996c9e6534f0a745298c78df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.iot.util.TestEntry;
import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
import org.apache.tsfile.utils.PublicBAOS;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ReplicateTest {
private static final long CHECK_POINT_GAP = 500;
private final Logger logger = LoggerFactory.getLogger(ReplicateTest.class);
private final ConsensusGroupId gid = new DataRegionId(1);
private static final long timeout = TimeUnit.SECONDS.toMillis(300);
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
private int basePort = 9000;
private final List<Peer> peers =
Arrays.asList(
new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 2)),
new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort - 1)),
new Peer(gid, 3, new TEndPoint("127.0.0.1", basePort)));
private final List<File> peersStorage =
Arrays.asList(
new File("target" + java.io.File.separator + "1"),
new File("target" + java.io.File.separator + "2"),
new File("target" + java.io.File.separator + "3"));
private final ConsensusGroup group = new ConsensusGroup(gid, peers);
private final List<IoTConsensus> servers = new ArrayList<>();
private final List<TestStateMachine> stateMachines = new ArrayList<>();
@Before
public void setUp() throws Exception {
for (File file : peersStorage) {
file.mkdirs();
stateMachines.add(new TestStateMachine());
}
initServer();
}
@After
public void tearDown() throws Exception {
stopServer();
for (File file : peersStorage) {
FileUtils.deleteFully(file);
}
}
public void changeConfiguration(int i) {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
outputStream.writeInt(this.peers.size());
for (Peer peer : this.peers) {
peer.serialize(outputStream);
}
File storageDir = new File(IoTConsensus.buildPeerDir(peersStorage.get(i), gid));
Path tmpConfigurationPath =
Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
Files.write(tmpConfigurationPath, publicBAOS.getBuf());
if (Files.exists(configurationPath)) {
Files.delete(configurationPath);
}
Files.move(tmpConfigurationPath, configurationPath);
} catch (IOException e) {
logger.error("Unexpected error occurs when persisting configuration", e);
}
}
private void initServer() throws IOException {
for (int i = 0; i < peers.size(); i++) {
findPortAvailable(i);
}
for (int i = 0; i < peers.size(); i++) {
int finalI = i;
changeConfiguration(i);
servers.add(
(IoTConsensus)
ConsensusFactory.getConsensusImpl(
ConsensusFactory.IOT_CONSENSUS,
ConsensusConfig.newBuilder()
.setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setStorageDir(peersStorage.get(i).getAbsolutePath())
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
groupId -> stateMachines.get(finalI))
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
ConsensusFactory.IOT_CONSENSUS))));
servers.get(i).start();
}
}
private void stopServer() {
servers.parallelStream().forEach(IoTConsensus::stop);
servers.clear();
}
/**
* The three nodes use the requests in the queue to replicate the requests to the other two nodes.
*/
@Test
public void replicateUsingQueueTest()
throws IOException, InterruptedException, ConsensusException {
logger.info("Start ReplicateUsingQueueTest");
servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
}
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
while (servers.get(i).getImpl(gid).getMinSyncIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to replicate entries");
}
Thread.sleep(100);
}
}
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP * 3, stateMachines.get(0).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 3, stateMachines.get(1).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 3, stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
try {
stopServer();
initServer();
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
while (servers.get(i).getImpl(gid).getMinSyncIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to recover entries");
}
Thread.sleep(100);
}
}
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getMinSyncIndex());
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
// just succeed when can not bind socket
logger.info("Can not start IoTConsensus because", e);
} else {
logger.error("Failed because", e);
Assert.fail("Failed because " + e.getMessage());
}
}
}
/**
* First, suspend one node to test that the request replication between the two alive nodes is ok,
* then restart all nodes to lose state in the queue, and test using WAL replication to make all
* nodes finally consistent.
*/
@Test
public void replicateUsingWALTest() throws IOException, InterruptedException, ConsensusException {
logger.info("Start ReplicateUsingWALTest");
servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
}
Assert.assertEquals(0, servers.get(0).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getMinSyncIndex());
try {
stopServer();
initServer();
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();
// should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
// replicating all entries
while (servers.get(i).getImpl(gid).getMinSyncIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
logger.error("{}", servers.get(i).getImpl(gid).getMinSyncIndex());
Assert.fail("Unable to replicate entries");
}
Thread.sleep(100);
}
}
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(0).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(1).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 2, stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
// just succeed when can not bind socket
logger.info("Can not start IoTConsensus because", e);
} else {
logger.error("Failed because", e);
Assert.fail("Failed because " + e.getMessage());
}
}
}
@Test
public void parsingAndConstructIDTest() throws Exception {
logger.info("Start ParsingAndConstructIDTest");
servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
}
List<ConsensusGroupId> ids = servers.get(0).getAllConsensusGroupIdsWithoutStarting();
Assert.assertEquals(1, ids.size());
Assert.assertEquals(gid, ids.get(0));
String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
try {
File regionDirFile = new File(regionDir);
Assert.assertTrue(regionDirFile.exists());
} catch (Exception e) {
Assert.fail();
}
}
private void findPortAvailable(int i) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
try (ServerSocket ignored = new ServerSocket(this.peers.get(i).getEndpoint().port)) {
// success
return;
} catch (IOException e) {
// Port is already in use, wait and retry
this.peers.set(i, new Peer(gid, i + 1, new TEndPoint("127.0.0.1", this.basePort)));
logger.info("try port {} for node {}.", this.basePort++, i + 1);
try {
Thread.sleep(50); // Wait for 1 second before retrying
} catch (InterruptedException ex) {
// Handle the interruption if needed
}
}
}
Assert.fail(String.format("can not find port for node %d after 300s", i + 1));
}
}