blob: 398eb3d52a262c18cc3a9863054bb17d55b1378b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.utils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ClusterUtils {
private static final Logger logger = LoggerFactory.getLogger(ClusterUtils.class);
public static final int WAIT_START_UP_CHECK_TIME_SEC = 5;
public static final long START_UP_TIME_THRESHOLD_MS = 5 * 60 * 1000L;
public static final long START_UP_CHECK_TIME_INTERVAL_MS = 3 * 1000L;
/**
* the data group member's heartbeat offset relative to the {@link
* ClusterConfig#getInternalDataPort()}, which means the dataHeartbeatPort = getInternalDataPort()
* + DATA_HEARTBEAT_OFFSET.
*/
public static final int DATA_HEARTBEAT_PORT_OFFSET = 1;
/**
* the meta group member's heartbeat offset relative to the {@link
* ClusterConfig#getInternalMetaPort()}, which means the metaHeartbeatPort = getInternalMetaPort()
* + META_HEARTBEAT_OFFSET.
*/
public static final int META_HEARTBEAT_PORT_OFFSET = 1;
public static final String UNKNOWN_CLIENT_IP = "UNKNOWN_IP";
private ClusterUtils() {
// util class
}
public static CheckStatusResponse checkStatus(
StartUpStatus remoteStartUpStatus, StartUpStatus localStartUpStatus) {
boolean partitionIntervalEquals = true;
boolean hashSaltEquals = true;
boolean replicationNumEquals = true;
boolean seedNodeListEquals = true;
boolean clusterNameEqual = true;
boolean multiRaftFactorEqual = true;
if (localStartUpStatus.getPartitionInterval() != remoteStartUpStatus.getPartitionInterval()) {
partitionIntervalEquals = false;
logger.error(
"Remote partition interval conflicts with local. local: {}, remote: {}",
localStartUpStatus.getPartitionInterval(),
remoteStartUpStatus.getPartitionInterval());
}
if (localStartUpStatus.getMultiRaftFactor() != remoteStartUpStatus.getMultiRaftFactor()) {
multiRaftFactorEqual = false;
logger.error(
"Remote multi-raft factor conflicts with local. local: {}, remote: {}",
localStartUpStatus.getMultiRaftFactor(),
remoteStartUpStatus.getMultiRaftFactor());
}
if (localStartUpStatus.getHashSalt() != remoteStartUpStatus.getHashSalt()) {
hashSaltEquals = false;
logger.error(
"Remote hash salt conflicts with local. local: {}, remote: {}",
localStartUpStatus.getHashSalt(),
remoteStartUpStatus.getHashSalt());
}
if (localStartUpStatus.getReplicationNumber() != remoteStartUpStatus.getReplicationNumber()) {
replicationNumEquals = false;
logger.error(
"Remote replication number conflicts with local. local: {}, remote: {}",
localStartUpStatus.getReplicationNumber(),
remoteStartUpStatus.getReplicationNumber());
}
if (!Objects.equals(
localStartUpStatus.getClusterName(), remoteStartUpStatus.getClusterName())) {
clusterNameEqual = false;
logger.error(
"Remote cluster name conflicts with local. local: {}, remote: {}",
localStartUpStatus.getClusterName(),
remoteStartUpStatus.getClusterName());
}
if (!ClusterUtils.checkSeedNodes(
false, localStartUpStatus.getSeedNodeList(), remoteStartUpStatus.getSeedNodeList())) {
seedNodeListEquals = false;
if (logger.isErrorEnabled()) {
logger.error(
"Remote seed node list conflicts with local. local: {}, remote: {}",
localStartUpStatus.getSeedNodeList(),
remoteStartUpStatus.getSeedNodeList());
}
}
return new CheckStatusResponse(
partitionIntervalEquals,
hashSaltEquals,
replicationNumEquals,
seedNodeListEquals,
clusterNameEqual,
multiRaftFactorEqual);
}
public static boolean checkSeedNodes(
boolean isClusterEstablished, List<Node> localSeedNodes, List<Node> remoteSeedNodes) {
return isClusterEstablished
? seedNodesContains(localSeedNodes, remoteSeedNodes)
: seedNodesEquals(localSeedNodes, remoteSeedNodes);
}
private static boolean seedNodesEquals(List<Node> thisNodeList, List<Node> thatNodeList) {
Node[] thisNodeArray = thisNodeList.toArray(new Node[0]);
Node[] thatNodeArray = thatNodeList.toArray(new Node[0]);
Arrays.sort(thisNodeArray, ClusterUtils::compareSeedNode);
Arrays.sort(thatNodeArray, ClusterUtils::compareSeedNode);
if (thisNodeArray.length != thatNodeArray.length) {
return false;
} else {
for (int i = 0; i < thisNodeArray.length; i++) {
if (compareSeedNode(thisNodeArray[i], thatNodeArray[i]) != 0) {
return false;
}
}
return true;
}
}
private static int compareSeedNode(Node thisSeedNode, Node thatSeedNode) {
int ipCompare = thisSeedNode.getInternalIp().compareTo(thatSeedNode.getInternalIp());
if (ipCompare != 0) {
return ipCompare;
} else {
return thisSeedNode.getMetaPort() - thatSeedNode.getMetaPort();
}
}
private static boolean seedNodesContains(List<Node> seedNodeList, List<Node> subSeedNodeList) {
// Because identifier is not compared here, List.contains() is not suitable
if (subSeedNodeList == null) {
return false;
}
seedNodeList.sort(ClusterUtils::compareSeedNode);
subSeedNodeList.sort(ClusterUtils::compareSeedNode);
int i = 0;
int j = 0;
while (i < seedNodeList.size() && j < subSeedNodeList.size()) {
int compareResult = compareSeedNode(seedNodeList.get(i), subSeedNodeList.get(j));
if (compareResult > 0) {
if (logger.isErrorEnabled()) {
logger.error("Node {} not found in cluster", subSeedNodeList.get(j));
}
return false;
} else if (compareResult < 0) {
i++;
} else {
j++;
}
}
return j == subSeedNodeList.size();
}
public static void examineCheckStatusResponse(
CheckStatusResponse response,
AtomicInteger consistentNum,
AtomicInteger inconsistentNum,
Node seedNode) {
boolean partitionIntervalEquals = response.partitionalIntervalEquals;
boolean hashSaltEquals = response.hashSaltEquals;
boolean replicationNumEquals = response.replicationNumEquals;
boolean seedNodeListEquals = response.seedNodeEquals;
boolean clusterNameEqual = response.clusterNameEquals;
if (!partitionIntervalEquals) {
logger.error("Local partition interval conflicts with seed node[{}].", seedNode);
}
if (!hashSaltEquals) {
logger.error("Local hash salt conflicts with seed node[{}]", seedNode);
}
if (!replicationNumEquals) {
logger.error("Local replication number conflicts with seed node[{}]", seedNode);
}
if (!seedNodeListEquals) {
logger.error("Local seed node list conflicts with seed node[{}]", seedNode);
}
if (!clusterNameEqual) {
logger.error("Local cluster name conflicts with seed node[{}]", seedNode);
}
if (partitionIntervalEquals
&& hashSaltEquals
&& replicationNumEquals
&& seedNodeListEquals
&& clusterNameEqual) {
consistentNum.incrementAndGet();
} else {
inconsistentNum.incrementAndGet();
}
}
public static boolean analyseStartUpCheckResult(
int consistentNum, int inconsistentNum, int totalSeedNum) throws ConfigInconsistentException {
if (consistentNum == totalSeedNum) {
// break the loop and establish the cluster
return true;
} else if (inconsistentNum > 0) {
// find config InConsistence, stop building cluster
throw new ConfigInconsistentException();
} else {
// The status of some nodes was not obtained, possibly because those node did not start
// successfully,
// this node can't connect to those nodes, try in next turn
return false;
}
}
public static TServer createTThreadPoolServer(
TServerTransport socket,
String clientThreadPrefix,
TProcessor processor,
TProtocolFactory protocolFactory) {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
int maxConcurrentClientNum =
Math.max(CommonUtils.getCpuCores(), config.getMaxConcurrentClientNum());
TThreadPoolServer.Args poolArgs =
new TThreadPoolServer.Args(socket)
.maxWorkerThreads(maxConcurrentClientNum)
.minWorkerThreads(CommonUtils.getCpuCores());
poolArgs.executorService(
new ThreadPoolExecutor(
poolArgs.minWorkerThreads,
poolArgs.maxWorkerThreads,
poolArgs.stopTimeoutVal,
poolArgs.stopTimeoutUnit,
new SynchronousQueue<>(),
new ThreadFactory() {
private AtomicLong threadIndex = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, clientThreadPrefix + threadIndex.incrementAndGet());
}
}));
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
// async service requires FramedTransport
poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
// run the thrift server in a separate thread so that the main thread is not blocked
return new TThreadPoolServer(poolArgs);
}
/**
* Convert a string representation of a Node to an object.
*
* @param str A string that is generated by Node.toString()
* @return a Node object
*/
public static Node stringToNode(String str) {
int ipFirstPos = str.indexOf("internalIp:") + "internalIp:".length();
int ipLastPos = str.indexOf(',', ipFirstPos);
int metaPortFirstPos = str.indexOf("metaPort:", ipLastPos) + "metaPort:".length();
int metaPortLastPos = str.indexOf(',', metaPortFirstPos);
int idFirstPos = str.indexOf("nodeIdentifier:", metaPortLastPos) + "nodeIdentifier:".length();
int idLastPos = str.indexOf(',', idFirstPos);
int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
int dataPortLastPos = str.indexOf(',', dataPortFirstPos);
int clientPortFirstPos = str.indexOf("clientPort:", dataPortLastPos) + "clientPort:".length();
int clientPortLastPos = str.indexOf(',', clientPortFirstPos);
int clientIpFirstPos = str.indexOf("clientIp:", clientPortLastPos) + "clientIp:".length();
int clientIpLastPos = str.indexOf(')', clientIpFirstPos);
String ip = str.substring(ipFirstPos, ipLastPos);
int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
int clientPort = Integer.parseInt(str.substring(clientPortFirstPos, clientPortLastPos));
String clientIp = str.substring(clientIpFirstPos, clientIpLastPos);
return new Node(ip, metaPort, id, dataPort, clientPort, clientIp);
}
public static Node parseNode(String nodeUrl) {
Node result = new Node();
String[] split = nodeUrl.split(":");
if (split.length != 2) {
logger.warn("Bad seed url: {}", nodeUrl);
return null;
}
String ip = split[0];
try {
int metaPort = Integer.parseInt(split[1]);
result.setInternalIp(ip).setMetaPort(metaPort).setClientIp(UNKNOWN_CLIENT_IP);
} catch (NumberFormatException e) {
logger.warn("Bad seed url: {}", nodeUrl);
}
return result;
}
public static PartitionGroup partitionByPathTimeWithSync(
PartialPath prefixPath, MetaGroupMember metaGroupMember) throws MetadataException {
PartitionGroup partitionGroup;
try {
partitionGroup = metaGroupMember.getPartitionTable().partitionByPathTime(prefixPath, 0);
} catch (StorageGroupNotSetException e) {
// the storage group is not found locally, but may be found in the leader, retry after
// synchronizing with the leader
try {
metaGroupMember.syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException checkConsistencyException) {
throw new MetadataException(checkConsistencyException.getMessage());
}
partitionGroup = metaGroupMember.getPartitionTable().partitionByPathTime(prefixPath, 0);
}
return partitionGroup;
}
public static int getSlotByPathTimeWithSync(
PartialPath prefixPath, MetaGroupMember metaGroupMember) throws MetadataException {
int slot;
try {
PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
slot =
SlotPartitionTable.getSlotStrategy()
.calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
} catch (StorageGroupNotSetException e) {
// the storage group is not found locally, but may be found in the leader, retry after
// synchronizing with the leader
try {
metaGroupMember.syncLeaderWithConsistencyCheck(true);
} catch (CheckConsistencyException checkConsistencyException) {
throw new MetadataException(checkConsistencyException.getMessage());
}
PartialPath storageGroup = IoTDB.metaManager.getStorageGroupPath(prefixPath);
slot =
SlotPartitionTable.getSlotStrategy()
.calculateSlotByPartitionNum(storageGroup.getFullPath(), 0, ClusterConstant.SLOT_NUM);
}
return slot;
}
public static ByteBuffer serializeMigrationStatus(Map<PartitionGroup, Integer> migrationStatus) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeInt(migrationStatus.size());
for (Entry<PartitionGroup, Integer> entry : migrationStatus.entrySet()) {
entry.getKey().serialize(dataOutputStream);
dataOutputStream.writeInt(entry.getValue());
}
} catch (IOException e) {
// ignored
}
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
}
public static Map<PartitionGroup, Integer> deserializeMigrationStatus(ByteBuffer buffer) {
Map<PartitionGroup, Integer> migrationStatus = new HashMap<>();
int size = buffer.getInt();
while (size-- > 0) {
PartitionGroup partitionGroup = new PartitionGroup();
partitionGroup.deserialize(buffer);
migrationStatus.put(partitionGroup, buffer.getInt());
}
return migrationStatus;
}
public static boolean nodeEqual(Node node1, Node node2) {
ClusterNode clusterNode1 = new ClusterNode(node1);
ClusterNode clusterNode2 = new ClusterNode(node2);
return clusterNode1.equals(clusterNode2);
}
}