blob: 9fbbb5e6280d944c7b1afccccab608222e34afd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.core;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;
import org.apache.hugegraph.masterelection.ClusterRole;
import org.apache.hugegraph.masterelection.ClusterRoleStore;
import org.apache.hugegraph.masterelection.Config;
import org.apache.hugegraph.masterelection.RoleElectionStateMachine;
import org.apache.hugegraph.masterelection.StandardRoleElectionStateMachine;
import org.apache.hugegraph.masterelection.StateMachineCallback;
import org.apache.hugegraph.masterelection.StateMachineContext;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Utils;
import org.junit.Test;
public class RoleElectionStateMachineTest {
public static class LogEntry {
Integer epoch;
String node;
Role role;
enum Role {
master,
worker,
candidate,
abdication,
unknown
}
public LogEntry(Integer epoch, String node, Role role) {
this.epoch = epoch;
this.node = node;
this.role = role;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LogEntry)) {
return false;
}
LogEntry logEntry = (LogEntry) obj;
return Objects.equals(epoch, logEntry.epoch) &&
Objects.equals(node, logEntry.node) && role == logEntry.role;
}
@Override
public int hashCode() {
return Objects.hash(epoch, node, role);
}
@Override
public String toString() {
return "LogEntry{" +
"epoch=" + epoch +
", node='" + node + '\'' +
", role=" + role +
'}';
}
}
private static class TestConfig implements Config {
String node;
public TestConfig(String node) {
this.node = node;
}
@Override
public String node() {
return this.node;
}
@Override
public String url() {
return "http://127.0.0.1:8080";
}
@Override
public int exceedsFailCount() {
return 2;
}
@Override
public long randomTimeoutMillisecond() {
return 400;
}
@Override
public long heartBeatIntervalSecond() {
return 1;
}
@Override
public int masterDeadTimes() {
return 5;
}
@Override
public long baseTimeoutMillisecond() {
return 100;
}
}
@Test
public void testStateMachine() throws InterruptedException {
final CountDownLatch stop = new CountDownLatch(4);
final int MAX_COUNT = 200;
final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
final List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
final StateMachineCallback callback = new StateMachineCallback() {
@Override
public void onAsRoleMaster(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
Utils.println("master node: " + node);
masterNodes.add(node);
}
@Override
public void onAsRoleWorker(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void onAsRoleCandidate(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void unknown(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void onAsRoleAbdication(StateMachineContext context) {
Integer epochId = context.epoch();
String node = context.node();
logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
if (logRecords.size() > MAX_COUNT) {
context.stateMachine().shutdown();
}
}
@Override
public void error(StateMachineContext context, Throwable e) {
Utils.println("state machine error: node " +
context.node() +
" message " + e.getMessage());
}
};
final List<ClusterRole> clusterRoleLogs = Collections.synchronizedList(new ArrayList<>(100));
final ClusterRoleStore clusterRoleStore = new ClusterRoleStore() {
volatile int epoch = 0;
final Map<Integer, ClusterRole> data = new ConcurrentHashMap<>();
ClusterRole copy(ClusterRole clusterRole) {
if (clusterRole == null) {
return null;
}
return new ClusterRole(clusterRole.node(), clusterRole.url(),
clusterRole.epoch(), clusterRole.clock());
}
@Override
public boolean updateIfNodePresent(ClusterRole clusterRole) {
if (clusterRole.epoch() < this.epoch) {
return false;
}
ClusterRole copy = this.copy(clusterRole);
ClusterRole newClusterRole = data.compute(copy.epoch(), (key, value) -> {
if (copy.epoch() > this.epoch) {
this.epoch = copy.epoch();
Assert.assertNull(value);
clusterRoleLogs.add(copy);
Utils.println("The node " + copy + " become new master:");
return copy;
}
Assert.assertEquals(value.epoch(), copy.epoch());
if (Objects.equals(value.node(), copy.node()) &&
value.clock() <= copy.clock()) {
Utils.println("The master node " + copy + " keep heartbeat");
clusterRoleLogs.add(copy);
if (value.clock() == copy.clock()) {
Assert.fail("Clock must increase when same epoch and node id");
}
return copy;
}
return value;
});
return Objects.equals(newClusterRole, copy);
}
@Override
public Optional<ClusterRole> query() {
return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
}
};
RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
Thread node1 = new Thread(() -> {
Config config = new TestConfig("1");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[1] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});
Thread node2 = new Thread(() -> {
Config config = new TestConfig("2");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[2] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});
Thread node3 = new Thread(() -> {
Config config = new TestConfig("3");
RoleElectionStateMachine stateMachine =
new StandardRoleElectionStateMachine(config, clusterRoleStore);
machines[3] = stateMachine;
stateMachine.apply(callback);
stop.countDown();
});
node1.start();
node2.start();
node3.start();
Thread randomShutdown = new Thread(() -> {
Set<String> dropNodes = new HashSet<>();
while (dropNodes.size() < 3) {
LockSupport.parkNanos(5_000_000_000L);
int size = masterNodes.size();
if (size < 1) {
continue;
}
String node = masterNodes.get(size - 1);
if (dropNodes.contains(node)) {
continue;
}
machines[Integer.parseInt(node)].shutdown();
dropNodes.add(node);
Utils.println("----shutdown machine " + node);
}
stop.countDown();
});
randomShutdown.start();
stop.await();
Assert.assertGt(0, logRecords.size());
Map<Integer, String> masters = new HashMap<>();
for (LogEntry entry : logRecords) {
if (entry.role == LogEntry.Role.master) {
String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
if (lastNode != null) {
Assert.assertEquals(lastNode, entry.node);
}
}
}
Assert.assertGt(0, masters.size());
}
}