blob: a857599e541fbf0b916033da08aa53cc74e3442a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.client.DataClientProvider;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.common.TestAsyncClient;
import org.apache.iotdb.cluster.common.TestAsyncDataClient;
import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
import org.apache.iotdb.cluster.common.TestSnapshot;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.EmptyIntervalException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.ClusterPlanRouter;
import org.apache.iotdb.cluster.query.LocalQueryExecutor;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.DataClusterServer;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol.Factory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.cluster.server.NodeCharacter.ELECTOR;
import static org.apache.iotdb.cluster.server.NodeCharacter.FOLLOWER;
import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class MetaGroupMemberTest extends BaseMember {
private DataClusterServer dataClusterServer;
protected boolean mockDataClusterServer;
private Node exiledNode;
private int prevReplicaNum;
private List<String> prevSeedNodes;
@Override
@After
public void tearDown() throws Exception {
dataClusterServer.stop();
super.tearDown();
ClusterDescriptor.getInstance().getConfig().setReplicationNum(prevReplicaNum);
ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(prevSeedNodes);
}
@Override
@Before
public void setUp() throws Exception {
prevSeedNodes = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Collections.emptyList());
prevReplicaNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
ClusterDescriptor.getInstance().getConfig().setReplicationNum(2);
RaftServer.setConnectionTimeoutInMS(1000);
RaftServer.setWriteOperationTimeoutMS(1000);
RaftServer.setReadOperationTimeoutMS(1000);
super.setUp();
partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
testMetaMember.setPartitionTable(partitionTable);
dummyResponse.set(Response.RESPONSE_AGREE);
testMetaMember.setAllNodes(allNodes);
dataClusterServer =
new DataClusterServer(
TestUtils.getNode(0),
new DataGroupMember.Factory(null, testMetaMember) {
@Override
public DataGroupMember create(PartitionGroup partitionGroup, Node thisNode) {
return getDataGroupMember(partitionGroup, thisNode);
}
},
testMetaMember);
buildDataGroups(dataClusterServer);
testMetaMember.getThisNode().setNodeIdentifier(0);
testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()));
mockDataClusterServer = false;
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
exiledNode = null;
}
private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
DataGroupMember dataGroupMember =
new DataGroupMember(null, group, node, testMetaMember) {
@Override
public boolean syncLeader(CheckConsistency checkConsistency) {
return true;
}
@Override
public void pullSlots(NodeRemovalResult removalResult) {}
@Override
public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
try {
planExecutor.processNonQuery(plan);
return StatusUtils.OK;
} catch (QueryProcessException
| StorageGroupNotSetException
| StorageEngineException e) {
return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
@Override
public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
return executeNonQueryPlan(plan);
}
@Override
protected AppendLogResult sendLogToFollowers(Log log) {
return AppendLogResult.OK;
}
@Override
public AsyncClient getAsyncClient(Node node) {
return getClient(node);
}
@Override
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getClient(node);
}
AsyncClient getClient(Node node) {
return new TestAsyncClient(node.nodeIdentifier) {
@Override
public void startElection(
ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
new Thread(
() -> {
long resp = dummyResponse.get();
// MIN_VALUE means let the request time out
if (resp != Long.MIN_VALUE) {
resultHandler.onComplete(resp);
}
})
.start();
}
@Override
public void sendHeartbeat(
HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler) {
new Thread(
() -> {
HeartBeatResponse response = new HeartBeatResponse();
response.setFollower(thisNode);
response.setTerm(Response.RESPONSE_AGREE);
resultHandler.onComplete(response);
})
.start();
}
};
}
};
dataGroupMember.setLogManager(
new TestPartitionedLogManager(
null, partitionTable, group.getHeader().getNode(), TestSnapshot.Factory.INSTANCE));
dataGroupMember.setLeader(node);
dataGroupMember.setCharacter(NodeCharacter.LEADER);
dataGroupMember.setLocalQueryExecutor(
new LocalQueryExecutor(dataGroupMember) {
@Override
public PullSchemaResp queryTimeSeriesSchema(PullSchemaRequest request) {
return mockedPullTimeSeriesSchema(request);
}
@Override
public PullSchemaResp queryMeasurementSchema(PullSchemaRequest request) {
return mockedPullTimeSeriesSchema(request);
}
});
return dataGroupMember;
}
private PullSchemaResp mockedPullTimeSeriesSchema(PullSchemaRequest request) {
List<IMeasurementSchema> schemas = new ArrayList<>();
List<String> prefixPaths = request.getPrefixPaths();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
for (String prefixPath : prefixPaths) {
if (!prefixPath.equals(TestUtils.getTestSeries(10, 0))) {
IoTDB.metaManager.collectSeries(new PartialPath(prefixPath), schemas);
dataOutputStream.writeInt(schemas.size());
for (IMeasurementSchema schema : schemas) {
schema.partialSerializeTo(dataOutputStream);
}
} else {
dataOutputStream.writeInt(1);
TestUtils.getTestMeasurementSchema(0).partialSerializeTo(dataOutputStream);
}
}
} catch (IOException | IllegalPathException e) {
// ignore
}
PullSchemaResp resp = new PullSchemaResp();
resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
return resp;
}
@Override
protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException {
MetaGroupMember metaGroupMember =
new MetaGroupMember(new Factory(), node, new Coordinator()) {
@Override
public void applyAddNode(AddNodeLog addNodeLog) {
allNodes.add(addNodeLog.getNewNode());
}
@Override
public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
super.applyRemoveNode(removeNodeLog);
exiledNode = removeNodeLog.getRemovedNode();
}
@Override
public DataClusterServer getDataClusterServer() {
return mockDataClusterServer
? MetaGroupMemberTest.this.dataClusterServer
: super.getDataClusterServer();
}
@Override
public DataHeartbeatServer getDataHeartbeatServer() {
return new DataHeartbeatServer(thisNode, dataClusterServer) {
@Override
public void start() {}
};
}
@Override
public DataGroupMember getLocalDataMember(RaftNode header, Object request) {
return getDataGroupMember(header);
}
@Override
public DataGroupMember getLocalDataMember(RaftNode header) {
return getDataGroupMember(header);
}
@Override
public void updateHardState(long currentTerm, Node leader) {}
@Override
protected void addSeedNodes() {
List<String> seedUrls = config.getSeedNodeUrls();
// initialize allNodes
for (String seedUrl : seedUrls) {
Node node = ClusterUtils.parseNode(seedUrl);
if (node != null
&& (!node.getInternalIp().equals(thisNode.internalIp)
|| node.getMetaPort() != thisNode.getMetaPort())
&& !allNodes.contains(node)) {
// do not add the local node since it is added in `setThisNode()`
allNodes.add(node);
}
}
}
@Override
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getClient(node);
}
@Override
public AsyncClient getSendLogAsyncClient(Node node) {
return getAsyncClient(node);
}
@Override
public AsyncClient getAsyncClient(Node node) {
return getClient(node);
}
@Override
public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
return getClient(node);
}
AsyncClient getClient(Node node) {
try {
return new TestAsyncMetaClient(null, null, node, null) {
@Override
public void startElection(
ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
new Thread(
() -> {
long resp = dummyResponse.get();
// MIN_VALUE means let the request time out
if (resp != Long.MIN_VALUE) {
resultHandler.onComplete(resp);
}
})
.start();
}
@Override
public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
new Thread(() -> resultHandler.onComplete(null)).start();
}
@Override
public void sendHeartbeat(
HeartBeatRequest request,
AsyncMethodCallback<HeartBeatResponse> resultHandler) {
new Thread(
() -> {
HeartBeatResponse response = new HeartBeatResponse();
response.setFollower(thisNode);
response.setTerm(Response.RESPONSE_AGREE);
resultHandler.onComplete(response);
})
.start();
}
@Override
public void appendEntry(
AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
new Thread(
() -> {
long resp = dummyResponse.get();
// MIN_VALUE means let the request time out
if (resp != Long.MIN_VALUE) {
resultHandler.onComplete(dummyResponse.get());
}
})
.start();
}
@Override
public void addNode(
Node node,
StartUpStatus startUpStatus,
AsyncMethodCallback<AddNodeResponse> resultHandler) {
new Thread(
() -> {
if (node.getNodeIdentifier() == 10) {
resultHandler.onComplete(
new AddNodeResponse((int) Response.RESPONSE_IDENTIFIER_CONFLICT));
} else {
partitionTable.addNode(node);
AddNodeResponse resp = new AddNodeResponse((int) dummyResponse.get());
resp.setPartitionTableBytes(partitionTable.serialize());
resultHandler.onComplete(resp);
}
})
.start();
}
@Override
public void executeNonQueryPlan(
ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
new Thread(
() -> {
try {
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
planExecutor.processNonQuery(plan);
resultHandler.onComplete(StatusUtils.OK);
} catch (IOException
| QueryProcessException
| StorageGroupNotSetException
| StorageEngineException
| IllegalPathException e) {
resultHandler.onError(e);
}
})
.start();
}
@Override
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
new Thread(() -> resultHandler.onComplete(new TNodeStatus())).start();
}
@Override
public void exile(
ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
System.out.printf("%s was exiled%n", node);
exiledNode = node;
}
@Override
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
new Thread(
() -> {
testMetaMember.applyRemoveNode(
new RemoveNodeLog(partitionTable.serialize(), node));
resultHandler.onComplete(Response.RESPONSE_AGREE);
})
.start();
}
@Override
public void checkStatus(
StartUpStatus startUpStatus,
AsyncMethodCallback<CheckStatusResponse> resultHandler) {
new Thread(
() -> {
CheckStatusResponse response = new CheckStatusResponse();
response.setHashSaltEquals(true);
response.setPartitionalIntervalEquals(true);
response.setReplicationNumEquals(true);
response.setSeedNodeEquals(true);
resultHandler.onComplete(response);
})
.start();
}
@Override
public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() -> {
resultHandler.onComplete(
ClusterUtils.serializeMigrationStatus(Collections.emptyMap()));
})
.start();
}
};
} catch (IOException e) {
return null;
}
}
};
metaGroupMember.getCoordinator().setMetaGroupMember(metaGroupMember);
metaGroupMember.setLeader(node);
metaGroupMember.setAllNodes(allNodes);
metaGroupMember.setCharacter(NodeCharacter.LEADER);
metaGroupMember.setAppendLogThreadPool(testThreadPool);
metaGroupMember.setClientProvider(
new DataClientProvider(new TBinaryProtocol.Factory()) {
@Override
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new TestAsyncDataClient(node, dataGroupMemberMap);
}
});
return metaGroupMember;
}
private void buildDataGroups(DataClusterServer dataClusterServer) {
List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups();
dataClusterServer.setPartitionTable(partitionTable);
for (PartitionGroup partitionGroup : partitionGroups) {
RaftNode header = partitionGroup.getHeader();
DataGroupMember dataGroupMember = getDataGroupMember(partitionGroup, TestUtils.getNode(0));
dataGroupMember.start();
dataClusterServer.addDataGroupMember(dataGroupMember, header);
}
}
@Test
public void testClosePartition()
throws QueryProcessException, StorageEngineException, StorageGroupNotSetException,
IllegalPathException {
System.out.println("Start testClosePartition()");
// the operation is accepted
dummyResponse.set(Response.RESPONSE_AGREE);
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
insertPlan.setValues(new Object[] {String.valueOf(i)});
insertPlan.setMeasurementMNodes(
new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(0)});
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
StorageGroupProcessor processor =
StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
int prevTimeout = RaftServer.getConnectionTimeoutInMS();
RaftServer.setConnectionTimeoutInMS(100);
try {
System.out.println("Create the first file");
for (int i = 20; i < 30; i++) {
insertPlan.setTime(i);
insertPlan.setValues(new Object[] {String.valueOf(i)});
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
// the net work is down
dummyResponse.set(Long.MIN_VALUE);
System.out.println("Close the first file");
assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
// network resume in 100ms
dummyResponse.set(Response.RESPONSE_AGREE);
assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
System.out.println("Create the second file");
for (int i = 30; i < 40; i++) {
insertPlan.setTime(i);
insertPlan.setValues(new Object[] {String.valueOf(i)});
PlanExecutor planExecutor = new PlanExecutor();
planExecutor.processNonQuery(insertPlan);
}
// indicating the leader is stale
System.out.println("Close the second file");
dummyResponse.set(100);
assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
} finally {
RaftServer.setConnectionTimeoutInMS(prevTimeout);
}
testThreadPool.shutdownNow();
}
@Test
public void testAddNode() {
System.out.println("Start testAddNode()");
Node newNode = TestUtils.getNode(11);
testMetaMember.getPartitionTable().addNode(newNode);
testMetaMember.onElectionWins();
testMetaMember.applyAddNode(
new AddNodeLog(testMetaMember.getPartitionTable().serialize(), newNode));
assertTrue(partitionTable.getAllNodes().contains(newNode));
}
@Test
public void testBuildCluster() {
System.out.println("Start testBuildCluster()");
testMetaMember.start();
try {
testMetaMember.buildCluster();
long startTime = System.currentTimeMillis();
long timeConsumption = 0;
while (timeConsumption < 5000 && testMetaMember.getCharacter() != LEADER) {
timeConsumption = System.currentTimeMillis() - startTime;
}
if (timeConsumption >= 5000) {
fail("The member takes too long to be the leader");
}
assertEquals(LEADER, testMetaMember.getCharacter());
} catch (ConfigInconsistentException | StartUpCheckFailureException e) {
// do nothing
} finally {
testMetaMember.stop();
}
}
@Test
public void testJoinCluster() throws QueryProcessException {
System.out.println("Start testJoinCluster()");
MetaGroupMember newMember = getMetaGroupMember(TestUtils.getNode(10));
newMember.setCoordinator(new Coordinator());
newMember.start();
try {
newMember.joinCluster();
newMember.setCharacter(ELECTOR);
while (!LEADER.equals(newMember.getCharacter())) {
// wait until character changes
}
} catch (Exception e) {
fail("The expected exception is not thrown" + e);
} finally {
newMember.stop();
}
}
@Test
public void testJoinClusterFailed() throws QueryProcessException {
System.out.println("Start testJoinClusterFailed()");
long prevInterval = RaftServer.getHeartBeatIntervalMs();
RaftServer.setHeartBeatIntervalMs(10);
ClusterDescriptor.getInstance().getConfig().setJoinClusterTimeOutMs(100);
dummyResponse.set(Response.RESPONSE_NO_CONNECTION);
MetaGroupMember newMember = getMetaGroupMember(TestUtils.getNode(10));
try {
newMember.joinCluster();
fail("The unexpected exception is thrown");
} catch (Exception e) {
assertTrue(e instanceof StartUpCheckFailureException);
} finally {
newMember.closeLogManager();
RaftServer.setHeartBeatIntervalMs(prevInterval);
}
}
@Test
public void testSendSnapshot() throws IllegalPathException {
System.out.println("Start testSendSnapshot()");
SendSnapshotRequest request = new SendSnapshotRequest();
// 1. prepare storage group and its tll
Map<PartialPath, Long> storageGroupTTL = new HashMap<>();
long baseTTL = 3600;
for (int i = 0; i <= 10; i++) {
storageGroupTTL.put(new PartialPath(TestUtils.getTestSg(i)), baseTTL + i * 100);
if (i >= 5) {
storageGroupTTL.put(new PartialPath(TestUtils.getTestSg(i)), Long.MAX_VALUE);
}
}
HashMap<String, User> userMap = new HashMap<>();
HashMap<String, Role> roleMap = new HashMap<>();
try {
IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
// 2. prepare the role info
authorizer.createRole("role_1");
authorizer.createRole("role_2");
authorizer.createRole("role_3");
authorizer.createRole("role_4");
authorizer.grantPrivilegeToRole("role_1", TestUtils.getTestSg(3), 1);
authorizer.grantPrivilegeToRole("role_2", TestUtils.getTestSg(4), 1);
roleMap.put("role_1", authorizer.getRole("role_1"));
roleMap.put("role_2", authorizer.getRole("role_2"));
roleMap.put("role_3", authorizer.getRole("role_3"));
roleMap.put("role_4", authorizer.getRole("role_4"));
// 3. prepare the user info
authorizer.createUser("user_1", "password_1");
authorizer.createUser("user_2", "password_2");
authorizer.createUser("user_3", "password_3");
authorizer.createUser("user_4", "password_4");
authorizer.grantPrivilegeToUser("user_1", TestUtils.getTestSg(1), 1);
authorizer.setUserUseWaterMark("user_2", true);
authorizer.grantRoleToUser("role_1", "user_1");
userMap.put("user_1", authorizer.getUser("user_1"));
userMap.put("user_2", authorizer.getUser("user_2"));
userMap.put("user_3", authorizer.getUser("user_3"));
userMap.put("user_4", authorizer.getUser("user_4"));
} catch (AuthException e) {
Assert.fail(e.getMessage());
}
// 4. prepare the partition table
SlotPartitionTable partitionTable = (SlotPartitionTable) TestUtils.getPartitionTable(3);
partitionTable.setLastMetaLogIndex(0);
ByteBuffer beforePartitionTableBuffer = partitionTable.serialize();
// 5. serialize
MetaSimpleSnapshot snapshot =
new MetaSimpleSnapshot(storageGroupTTL, userMap, roleMap, beforePartitionTableBuffer);
request.setSnapshotBytes(snapshot.serialize());
AtomicReference<Void> reference = new AtomicReference<>();
new MetaAsyncService(testMetaMember)
.sendSnapshot(request, new GenericHandler(TestUtils.getNode(0), reference));
// 6. check whether the snapshot applied or not
Map<PartialPath, Long> localStorageGroupTTL = IoTDB.metaManager.getStorageGroupsTTL();
assertNotNull(localStorageGroupTTL);
assertEquals(storageGroupTTL, localStorageGroupTTL);
try {
IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
assertTrue(authorizer.checkUserPrivileges("user_1", TestUtils.getTestSg(1), 1));
assertTrue(authorizer.checkUserPrivileges("user_1", TestUtils.getTestSg(3), 1));
assertFalse(authorizer.checkUserPrivileges("user_3", TestUtils.getTestSg(1), 1));
assertTrue(authorizer.isUserUseWaterMark("user_2"));
assertFalse(authorizer.isUserUseWaterMark("user_4"));
Map<String, Role> localRoleMap = authorizer.getAllRoles();
assertEquals(roleMap, localRoleMap);
PartitionTable localPartitionTable = this.testMetaMember.getPartitionTable();
assertEquals(localPartitionTable, partitionTable);
} catch (AuthException e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testProcessNonQuery() throws IllegalPathException {
System.out.println("Start testProcessNonQuery()");
mockDataClusterServer = true;
// as a leader
testMetaMember.setCharacter(LEADER);
testMetaMember.setAppendLogThreadPool(testThreadPool);
for (int i = 10; i < 20; i++) {
// process a non partitioned plan
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath(TestUtils.getTestSg(i)));
TSStatus status = coordinator.executeNonQueryPlan(setStorageGroupPlan);
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSg(i))));
// process a partitioned plan
TimeseriesSchema schema = TestUtils.getTestTimeSeriesSchema(i, 0);
CreateTimeSeriesPlan createTimeSeriesPlan =
new CreateTimeSeriesPlan(
new PartialPath(schema.getFullPath()),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor(),
schema.getProps(),
Collections.emptyMap(),
Collections.emptyMap(),
null);
status = coordinator.executeNonQueryPlan(createTimeSeriesPlan);
if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSeries(i, 0))));
}
testThreadPool.shutdownNow();
}
@Test
public void testProcessNonQueryAsFollower() throws IllegalPathException, QueryProcessException {
System.out.println("Start testProcessNonQuery()");
mockDataClusterServer = true;
MetaGroupMember testMetaMember2 = getMetaGroupMember(TestUtils.getNode(2));
testMetaMember2.setCharacter(LEADER);
// as a follower
testMetaMember.setCharacter(FOLLOWER);
testMetaMember.setLeader(testMetaMember2.thisNode);
testMetaMember.setAppendLogThreadPool(testThreadPool);
for (int i = 10; i < 20; i++) {
// process a non partitioned plan
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath(TestUtils.getTestSg(i)));
TSStatus status = coordinator.executeNonQueryPlan(setStorageGroupPlan);
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSg(i))));
// process a partitioned plan
TimeseriesSchema schema = TestUtils.getTestTimeSeriesSchema(i, 0);
CreateTimeSeriesPlan createTimeSeriesPlan =
new CreateTimeSeriesPlan(
new PartialPath(schema.getFullPath()),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor(),
schema.getProps(),
Collections.emptyMap(),
Collections.emptyMap(),
null);
status = coordinator.executeNonQueryPlan(createTimeSeriesPlan);
if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSeries(i, 0))));
}
testThreadPool.shutdownNow();
}
@Test
public void testGetReaderByTimestamp()
throws QueryProcessException, StorageEngineException, IOException,
StorageGroupNotSetException, IllegalPathException {
System.out.println("Start testGetReaderByTimestamp()");
RaftServer.setReadOperationTimeoutMS(10000);
mockDataClusterServer = true;
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(i)));
IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0);
try {
IoTDB.metaManager.createTimeseries(
new PartialPath(schema.getMeasurementId()),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor(),
schema.getProps());
} catch (MetadataException e) {
// ignore
}
for (int j = 0; j < 10; j++) {
insertPlan.setTime(j);
insertPlan.setValues(new Object[] {String.valueOf(j)});
insertPlan.setMeasurementMNodes(
new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(0)});
planExecutor.processNonQuery(insertPlan);
}
}
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
long[] times = new long[10];
for (int i = 0; i < 10; i++) {
times[i] = i;
}
Set<String> deviceMeasurements = new HashSet<>();
deviceMeasurements.add(TestUtils.getTestMeasurement(0));
for (int i = 0; i < 10; i++) {
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
new PartialPath(TestUtils.getTestSeries(i, 0)),
deviceMeasurements,
TSDataType.DOUBLE,
context,
true,
null);
Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
for (int j = 0; j < 10; j++) {
assertEquals(j * 1.0, (double) values[j], 0.00001);
}
}
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
@Test
public void testGetReader()
throws QueryProcessException, StorageEngineException, IOException,
StorageGroupNotSetException, IllegalPathException, EmptyIntervalException {
System.out.println("Start testGetReader()");
mockDataClusterServer = true;
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setNeedInferType(true);
insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
RaftServer.setReadOperationTimeoutMS(1000);
for (int i = 0; i < 10; i++) {
insertPlan.setPrefixPath(new PartialPath(TestUtils.getTestSg(i)));
IMeasurementSchema schema = TestUtils.getTestMeasurementSchema(0);
try {
IoTDB.metaManager.createTimeseries(
new PartialPath(schema.getMeasurementId()),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor(),
schema.getProps());
} catch (MetadataException e) {
// ignore
}
for (int j = 0; j < 10; j++) {
insertPlan.setTime(j);
insertPlan.setValues(new Object[] {String.valueOf(j)});
insertPlan.setMeasurementMNodes(
new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(0)});
planExecutor.processNonQuery(insertPlan);
}
}
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
try {
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
Set<String> deviceMeasurements = new HashSet<>();
deviceMeasurements.add(TestUtils.getTestMeasurement(0));
for (int i = 0; i < 10; i++) {
ManagedSeriesReader reader =
readerFactory.getSeriesReader(
new PartialPath(TestUtils.getTestSeries(i, 0)),
deviceMeasurements,
TSDataType.DOUBLE,
TimeFilter.gtEq(5),
ValueFilter.ltEq(8.0),
context,
true);
assertTrue(reader.hasNextBatch());
BatchData batchData = reader.nextBatch();
for (int j = 5; j < 9; j++) {
assertTrue(batchData.hasCurrent());
assertEquals(j, batchData.currentTime());
assertEquals(j * 1.0, batchData.getDouble(), 0.00001);
batchData.next();
}
assertFalse(batchData.hasCurrent());
assertFalse(reader.hasNextBatch());
}
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
@Test
public void testGetMatchedPaths() throws MetadataException {
System.out.println("Start testGetMatchedPaths()");
List<PartialPath> matchedPaths =
((CMManager) IoTDB.metaManager)
.getMatchedPaths(new PartialPath(TestUtils.getTestSg(0) + ".*"));
assertEquals(20, matchedPaths.size());
for (int j = 0; j < 10; j++) {
assertTrue(matchedPaths.contains(new PartialPath(TestUtils.getTestSeries(0, j))));
}
matchedPaths =
((CMManager) IoTDB.metaManager)
.getMatchedPaths(new PartialPath(TestUtils.getTestSg(10) + ".*"));
assertTrue(matchedPaths.isEmpty());
}
@Test
public void testProcessValidHeartbeatReq() throws QueryProcessException {
System.out.println("Start testProcessValidHeartbeatReq()");
MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
testMetaMember.setCoordinator(new Coordinator());
try {
HeartBeatRequest request = new HeartBeatRequest();
request.setRequireIdentifier(true);
HeartBeatResponse response = new HeartBeatResponse();
testMetaMember.processValidHeartbeatReq(request, response);
assertEquals(10, response.getFollowerIdentifier());
request.setRegenerateIdentifier(true);
testMetaMember.processValidHeartbeatReq(request, response);
assertTrue(response.getFollowerIdentifier() != 10);
assertTrue(response.isRequirePartitionTable());
request.setPartitionTableBytes(partitionTable.serialize());
testMetaMember.processValidHeartbeatReq(request, response);
assertEquals(partitionTable, testMetaMember.getPartitionTable());
} finally {
testMetaMember.stop();
}
}
@Test
public void testProcessValidHeartbeatResp() throws QueryProcessException {
System.out.println("Start testProcessValidHeartbeatResp()");
MetaGroupMember metaGroupMember = getMetaGroupMember(TestUtils.getNode(9));
metaGroupMember.start();
metaGroupMember.onElectionWins();
try {
for (int i = 0; i < 10; i++) {
HeartBeatResponse response = new HeartBeatResponse();
response.setFollowerIdentifier(i);
response.setRequirePartitionTable(true);
response.setFollower(TestUtils.getNode(i));
metaGroupMember.processValidHeartbeatResp(response, TestUtils.getNode(i));
metaGroupMember.removeBlindNode(TestUtils.getNode(i));
}
assertNotNull(metaGroupMember.getPartitionTable());
} finally {
metaGroupMember.stop();
}
}
@Test
public void testAppendEntry() {
System.out.println("Start testAppendEntry()");
System.out.println("Term before append: " + testMetaMember.getTerm().get());
testMetaMember.setPartitionTable(null);
CloseFileLog log = new CloseFileLog(TestUtils.getTestSg(0), 0, true);
log.setCurrLogIndex(0);
log.setCurrLogTerm(0);
AppendEntryRequest request = new AppendEntryRequest();
request.setEntry(log.serialize());
request.setTerm(0);
request.setLeaderCommit(0);
request.setPrevLogIndex(-1);
request.setPrevLogTerm(-1);
request.setLeader(new Node("127.0.0.1", 30000, 0, 40000, Constants.RPC_PORT, "127.0.0.1"));
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
new MetaAsyncService(testMetaMember).appendEntry(request, handler);
assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE, (long) result.get());
System.out.println("Term after first append: " + testMetaMember.getTerm().get());
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember).appendEntry(request, handler);
System.out.println("Term after second append: " + testMetaMember.getTerm().get());
assertEquals(Response.RESPONSE_AGREE, (long) result.get());
}
@Test
public void testRemoteAddNode() {
System.out.println("Start testRemoteAddNode()");
int prevTimeout = RaftServer.getConnectionTimeoutInMS();
try {
// cannot add node when partition table is not built
testMetaMember.setPartitionTable(null);
AtomicReference<AddNodeResponse> result = new AtomicReference<>();
GenericHandler<AddNodeResponse> handler = new GenericHandler<>(TestUtils.getNode(0), result);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(10), TestUtils.getStartUpStatus(), handler);
AddNodeResponse response = result.get();
assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE, response.getRespNum());
// cannot add itself
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(0), TestUtils.getStartUpStatus(), handler);
assertNull(result.get());
// process the request as a leader
testMetaMember.setCharacter(LEADER);
testMetaMember.onElectionWins();
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(10), TestUtils.getStartUpStatus(), handler);
response = result.get();
assertEquals(Response.RESPONSE_AGREE, response.getRespNum());
assertEquals(partitionTable.serialize(), response.partitionTableBytes);
// adding an existing node is ok
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(10), TestUtils.getStartUpStatus(), handler);
response = result.get();
assertEquals(Response.RESPONSE_AGREE, response.getRespNum());
assertEquals(partitionTable.serialize(), response.partitionTableBytes);
// process the request as a follower
testMetaMember.setCharacter(FOLLOWER);
testMetaMember.setLeader(TestUtils.getNode(1));
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(11), TestUtils.getStartUpStatus(), handler);
while (result.get() == null) {}
response = result.get();
assertEquals(Response.RESPONSE_AGREE, response.getRespNum());
assertEquals(partitionTable.serialize(), response.partitionTableBytes);
// cannot add a node with conflict id
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
Node node = TestUtils.getNode(12).setNodeIdentifier(10);
new MetaAsyncService(testMetaMember).addNode(node, TestUtils.getStartUpStatus(), handler);
response = result.get();
assertEquals(Response.RESPONSE_IDENTIFIER_CONFLICT, response.getRespNum());
// cannot add a node due to configuration conflict, partition interval
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
node = TestUtils.getNode(13);
StartUpStatus startUpStatus = TestUtils.getStartUpStatus();
startUpStatus.setPartitionInterval(-1);
new MetaAsyncService(testMetaMember).addNode(node, startUpStatus, handler);
response = result.get();
assertEquals(Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT, response.getRespNum());
assertFalse(response.getCheckStatusResponse().isPartitionalIntervalEquals());
assertTrue(response.getCheckStatusResponse().isHashSaltEquals());
assertTrue(response.getCheckStatusResponse().isReplicationNumEquals());
// cannot add a node due to configuration conflict, hash salt
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
node = TestUtils.getNode(12);
startUpStatus = TestUtils.getStartUpStatus();
startUpStatus.setHashSalt(0);
new MetaAsyncService(testMetaMember).addNode(node, startUpStatus, handler);
response = result.get();
assertEquals(Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT, response.getRespNum());
assertTrue(response.getCheckStatusResponse().isPartitionalIntervalEquals());
assertFalse(response.getCheckStatusResponse().isHashSaltEquals());
assertTrue(response.getCheckStatusResponse().isReplicationNumEquals());
// cannot add a node due to configuration conflict, replication number
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
node = TestUtils.getNode(12);
startUpStatus = TestUtils.getStartUpStatus();
startUpStatus.setReplicationNumber(0);
new MetaAsyncService(testMetaMember).addNode(node, startUpStatus, handler);
response = result.get();
assertEquals(Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT, response.getRespNum());
assertTrue(response.getCheckStatusResponse().isPartitionalIntervalEquals());
assertTrue(response.getCheckStatusResponse().isHashSaltEquals());
assertFalse(response.getCheckStatusResponse().isReplicationNumEquals());
assertTrue(response.getCheckStatusResponse().isClusterNameEquals());
// cannot add a node due to network failure
dummyResponse.set(Response.RESPONSE_NO_CONNECTION);
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new Thread(
() -> {
await().atLeast(200, TimeUnit.MILLISECONDS);
dummyResponse.set(Response.RESPONSE_AGREE);
})
.start();
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(12), TestUtils.getStartUpStatus(), handler);
response = result.get();
assertEquals(Response.RESPONSE_AGREE, response.getRespNum());
// cannot add a node due to leadership lost
dummyResponse.set(100);
testMetaMember.setCharacter(LEADER);
result.set(null);
testMetaMember.setPartitionTable(partitionTable);
new MetaAsyncService(testMetaMember)
.addNode(TestUtils.getNode(13), TestUtils.getStartUpStatus(), handler);
response = result.get();
assertNull(response);
} finally {
testMetaMember.stop();
RaftServer.setConnectionTimeoutInMS(prevTimeout);
}
}
@Test
public void testLoadIdentifier() throws IOException, QueryProcessException {
System.out.println("Start testLoadIdentifier()");
try (RandomAccessFile raf =
new RandomAccessFile(MetaGroupMember.NODE_IDENTIFIER_FILE_NAME, "rw")) {
raf.writeBytes("100");
}
MetaGroupMember metaGroupMember = getMetaGroupMember(new Node());
assertEquals(100, metaGroupMember.getThisNode().getNodeIdentifier());
metaGroupMember.closeLogManager();
}
@Test
public void testRemoveNodeWithoutPartitionTable() throws LogExecutionException {
System.out.println("Start testRemoveNodeWithoutPartitionTable()");
testMetaMember.setPartitionTable(null);
try {
testMetaMember.removeNode(TestUtils.getNode(0));
fail("Expect PartitionTableUnavailableException");
} catch (PartitionTableUnavailableException
| InterruptedException
| CheckConsistencyException e) {
// ignore
}
}
@Test
public void testRemoveThisNode() {
System.out.println("Start testRemoveThisNode()");
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setLeader(testMetaMember.getThisNode());
testMetaMember.setCharacter(LEADER);
doRemoveNode(resultRef, testMetaMember.getThisNode());
assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
assertFalse(testMetaMember.getAllNodes().contains(testMetaMember.getThisNode()));
}
@Test
public void testRemoveLeader() {
System.out.println("Start testRemoveLeader()");
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setLeader(TestUtils.getNode(40));
testMetaMember.setCharacter(FOLLOWER);
doRemoveNode(resultRef, TestUtils.getNode(40));
assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(40)));
assertEquals(ELECTOR, testMetaMember.getCharacter());
}
@Test
public void testRemoveNonLeader() {
System.out.println("Start testRemoveNonLeader()");
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setLeader(TestUtils.getNode(40));
testMetaMember.setCharacter(FOLLOWER);
doRemoveNode(resultRef, TestUtils.getNode(20));
assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(20)));
assertEquals(0, testMetaMember.getLastHeartbeatReceivedTime());
}
@Test
public void testRemoveNodeAsLeader() {
System.out.println("Start testRemoveNodeAsLeader()");
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setLeader(testMetaMember.getThisNode());
testMetaMember.setCharacter(LEADER);
doRemoveNode(resultRef, TestUtils.getNode(20));
assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(20)));
System.out.println("Checking exiled node in testRemoveNodeAsLeader()");
assertEquals(TestUtils.getNode(20), exiledNode);
}
@Test
public void testRemoveNonExistNode() {
System.out.println("Start testRemoveNonExistNode()");
AtomicBoolean passed = new AtomicBoolean(false);
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
new MetaAsyncService(testMetaMember)
.removeNode(
TestUtils.getNode(120),
new AsyncMethodCallback<Long>() {
@Override
public void onComplete(Long aLong) {
passed.set(aLong.equals(Response.RESPONSE_REJECT));
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
});
assertTrue(passed.get());
}
@Test
public void testRemoveTooManyNodes() {
System.out.println("Start testRemoveTooManyNodes()");
for (int i = 0; i < 8; i++) {
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
doRemoveNode(resultRef, TestUtils.getNode(90 - i * 10));
assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(90 - i * 10)));
}
AtomicReference<Long> resultRef = new AtomicReference<>();
testMetaMember.setCharacter(LEADER);
doRemoveNode(resultRef, TestUtils.getNode(10));
assertEquals(Response.RESPONSE_CLUSTER_TOO_SMALL, (long) resultRef.get());
assertTrue(testMetaMember.getAllNodes().contains(TestUtils.getNode(10)));
}
@Test
public void testRouteIntervalsDisablePartition()
throws IllegalPathException, StorageEngineException {
boolean isEablePartition = StorageEngine.isEnablePartition();
StorageEngine.setEnablePartition(false);
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals();
intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
List<PartitionGroup> partitionGroups =
testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
assertEquals(1, partitionGroups.size());
StorageEngine.setEnablePartition(isEablePartition);
}
@Test
public void testRouteIntervalsEnablePartition()
throws IllegalPathException, StorageEngineException {
boolean isEablePartition = StorageEngine.isEnablePartition();
StorageEngine.setEnablePartition(true);
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals();
intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);
List<PartitionGroup> partitionGroups =
testMetaMember.routeIntervals(intervals, new PartialPath(TestUtils.getTestSg(0)));
assertTrue(partitionGroups.size() > 1);
StorageEngine.setEnablePartition(isEablePartition);
}
private void doRemoveNode(AtomicReference<Long> resultRef, Node nodeToRemove) {
mockDataClusterServer = true;
new MetaAsyncService(testMetaMember)
.removeNode(
nodeToRemove,
new AsyncMethodCallback<Long>() {
@Override
public void onComplete(Long o) {
resultRef.set(o);
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
});
while (resultRef.get() == null) {}
}
public MetaGroupMember getTestMetaGroupMember() {
return testMetaMember;
}
}