blob: 1ba65aad10dddeba73290f40a9d0b4ebdb59a2d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.persistence.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
/**
* The {@link NodeInfo} stores cluster node information.
*
* <p>The cluster node information includes:
*
* <p>1. DataNode information
*
* <p>2. ConfigNode information
*/
public class NodeInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
private static final int MINIMUM_DATANODE =
Math.max(
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor(),
ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor());
// Registered ConfigNodes
private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
private final Map<Integer, TConfigNodeLocation> registeredConfigNodes;
// Registered DataNodes
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
private final ReentrantReadWriteLock versionInfoReadWriteLock;
private final AtomicInteger nextNodeId = new AtomicInteger(-1);
private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
private final Map<Integer, TNodeVersionInfo> nodeVersionInfo;
private static final String SNAPSHOT_FILENAME = "node_info.bin";
public NodeInfo() {
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.registeredConfigNodes = new ConcurrentHashMap<>();
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.registeredDataNodes = new ConcurrentHashMap<>();
this.nodeVersionInfo = new ConcurrentHashMap<>();
this.versionInfoReadWriteLock = new ReentrantReadWriteLock();
}
/**
* Persist DataNode info.
*
* @param registerDataNodePlan RegisterDataNodePlan
* @return {@link TSStatusCode#SUCCESS_STATUS}
*/
public TSStatus registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
TSStatus result;
TDataNodeConfiguration info = registerDataNodePlan.getDataNodeConfiguration();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
// To ensure that the nextNodeId is updated correctly when
// the ConfigNode-followers concurrently processes RegisterDataNodePlan,
// we need to add a synchronization lock here
synchronized (nextNodeId) {
if (nextNodeId.get() < info.getLocation().getDataNodeId()) {
nextNodeId.set(info.getLocation().getDataNodeId());
}
}
registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (nextNodeId.get() < MINIMUM_DATANODE) {
result.setMessage(
String.format(
"To enable IoTDB-Cluster's data service, please register %d more IoTDB-DataNode",
MINIMUM_DATANODE - nextNodeId.get()));
} else if (nextNodeId.get() == MINIMUM_DATANODE) {
result.setMessage("IoTDB-Cluster could provide data service, now enjoy yourself!");
}
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
return result;
}
/**
* Persist Information about remove dataNode.
*
* @param req RemoveDataNodePlan
* @return {@link TSStatus}
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
LOGGER.info(
"{}, There are {} data node in cluster before executed RemoveDataNodePlan",
REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
dataNodeInfoReadWriteLock.writeLock().lock();
versionInfoReadWriteLock.writeLock().lock();
try {
req.getDataNodeLocations()
.forEach(
removeDataNodes -> {
registeredDataNodes.remove(removeDataNodes.getDataNodeId());
nodeVersionInfo.remove(removeDataNodes.getDataNodeId());
LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
});
} finally {
versionInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info(
"{}, There are {} data node in cluster after executed RemoveDataNodePlan",
REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Update the specified DataNode‘s location.
*
* @param updateDataNodePlan UpdateDataNodePlan
* @return {@link TSStatusCode#SUCCESS_STATUS} if update DataNode info successfully.
*/
public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
dataNodeInfoReadWriteLock.writeLock().lock();
try {
TDataNodeConfiguration newConfiguration = updateDataNodePlan.getDataNodeConfiguration();
registeredDataNodes.replace(newConfiguration.getLocation().getDataNodeId(), newConfiguration);
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Get DataNodeConfiguration.
*
* @param getDataNodeConfigurationPlan GetDataNodeConfigurationPlan
* @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
* GetDataNodeConfigurationPlan is -1
*/
public DataNodeConfigurationResp getDataNodeConfiguration(
GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
DataNodeConfigurationResp result = new DataNodeConfigurationResp();
result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
int dataNodeId = getDataNodeConfigurationPlan.getDataNodeId();
dataNodeInfoReadWriteLock.readLock().lock();
try {
if (dataNodeId == -1) {
result.setDataNodeConfigurationMap(new HashMap<>(registeredDataNodes));
} else {
result.setDataNodeConfigurationMap(
registeredDataNodes.get(dataNodeId) == null
? new HashMap<>(0)
: Collections.singletonMap(dataNodeId, registeredDataNodes.get(dataNodeId)));
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** Return the number of registered Nodes. */
public int getRegisteredNodeCount() {
int result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
result = registeredDataNodes.size();
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
configNodeInfoReadWriteLock.readLock().lock();
try {
result += registeredConfigNodes.size();
} finally {
configNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** Return the number of registered DataNodes. */
public int getRegisteredDataNodeCount() {
int result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
result = registeredDataNodes.size();
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
public int getDataNodeCpuCoreCount(int dataNodeId) {
try {
return registeredDataNodes.get(dataNodeId).getResource().getCpuCoreNum();
} catch (Exception e) {
LOGGER.warn("Get DataNode {} cpu core fail, will be treated as zero.", dataNodeId, e);
return 0;
}
}
/** Return the number of total cpu cores in online DataNodes. */
public int getDataNodeTotalCpuCoreCount() {
int result = 0;
dataNodeInfoReadWriteLock.readLock().lock();
try {
for (TDataNodeConfiguration dataNodeConfiguration : registeredDataNodes.values()) {
result += dataNodeConfiguration.getResource().getCpuCoreNum();
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** @return All registered DataNodes. */
public List<TDataNodeConfiguration> getRegisteredDataNodes() {
List<TDataNodeConfiguration> result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
result = new ArrayList<>(registeredDataNodes.values());
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** @return The specified registered DataNode. */
public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
dataNodeInfoReadWriteLock.readLock().lock();
try {
return registeredDataNodes.getOrDefault(dataNodeId, new TDataNodeConfiguration()).deepCopy();
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
}
/** @return The specified registered DataNodes. */
public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
List<TDataNodeConfiguration> result = new ArrayList<>();
dataNodeInfoReadWriteLock.readLock().lock();
try {
dataNodeIds.forEach(
dataNodeId -> {
if (registeredDataNodes.containsKey(dataNodeId)) {
result.add(registeredDataNodes.get(dataNodeId).deepCopy());
}
});
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/**
* Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
*
* @param applyConfigNodePlan ApplyConfigNodePlan
* @return {@link TSStatusCode#ADD_CONFIGNODE_ERROR} if update online ConfigNode failed.
*/
public TSStatus applyConfigNode(ApplyConfigNodePlan applyConfigNodePlan) {
TSStatus status = new TSStatus();
configNodeInfoReadWriteLock.writeLock().lock();
try {
// To ensure that the nextNodeId is updated correctly when
// the ConfigNode-followers concurrently processes ApplyConfigNodePlan,
// We need to add a synchronization lock here
synchronized (nextNodeId) {
if (nextNodeId.get() < applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId()) {
nextNodeId.set(applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
}
}
registeredConfigNodes.put(
applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId(),
applyConfigNodePlan.getConfigNodeLocation());
SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
LOGGER.info(
"Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
applyConfigNodePlan.getConfigNodeLocation(),
registeredConfigNodes);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (IOException e) {
LOGGER.error("Update online ConfigNode failed.", e);
status.setCode(TSStatusCode.ADD_CONFIGNODE_ERROR.getStatusCode());
status.setMessage(
"Apply new ConfigNode failed because current ConfigNode can't store ConfigNode information.");
} finally {
configNodeInfoReadWriteLock.writeLock().unlock();
}
return status;
}
/**
* Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
*
* @param removeConfigNodePlan RemoveConfigNodePlan
* @return {@link TSStatusCode#REMOVE_CONFIGNODE_ERROR} if remove online ConfigNode failed.
*/
public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
TSStatus status = new TSStatus();
configNodeInfoReadWriteLock.writeLock().lock();
versionInfoReadWriteLock.writeLock().lock();
try {
registeredConfigNodes.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
nodeVersionInfo.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
LOGGER.info(
"Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
removeConfigNodePlan.getConfigNodeLocation(),
registeredConfigNodes);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (IOException e) {
LOGGER.error("Remove online ConfigNode failed.", e);
status.setCode(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode());
status.setMessage(
"Remove ConfigNode failed because current ConfigNode can't store ConfigNode information.");
} finally {
versionInfoReadWriteLock.writeLock().unlock();
configNodeInfoReadWriteLock.writeLock().unlock();
}
return status;
}
/**
* Update the specified Node‘s versionInfo.
*
* @param updateVersionInfoPlan UpdateVersionInfoPlan
* @return {@link TSStatusCode#SUCCESS_STATUS} if update build info successfully.
*/
public TSStatus updateVersionInfo(UpdateVersionInfoPlan updateVersionInfoPlan) {
versionInfoReadWriteLock.writeLock().lock();
try {
nodeVersionInfo.put(
updateVersionInfoPlan.getNodeId(), updateVersionInfoPlan.getVersionInfo());
} finally {
versionInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info("Successfully update Node {} 's version.", updateVersionInfoPlan.getNodeId());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/** @return All registered ConfigNodes. */
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
List<TConfigNodeLocation> result;
configNodeInfoReadWriteLock.readLock().lock();
try {
result = new ArrayList<>(registeredConfigNodes.values());
} finally {
configNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** @return The specified registered ConfigNode. */
public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
List<TConfigNodeLocation> result = new ArrayList<>();
configNodeInfoReadWriteLock.readLock().lock();
try {
configNodeIds.forEach(
configNodeId -> {
if (registeredConfigNodes.containsKey(configNodeId)) {
result.add(registeredConfigNodes.get(configNodeId).deepCopy());
}
});
} finally {
configNodeInfoReadWriteLock.readLock().unlock();
}
return result;
}
/** @return all nodes buildInfo */
public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
Map<Integer, TNodeVersionInfo> result = new HashMap<>(nodeVersionInfo.size());
versionInfoReadWriteLock.readLock().lock();
try {
result.putAll(nodeVersionInfo);
} finally {
versionInfoReadWriteLock.readLock().unlock();
}
return result;
}
public TNodeVersionInfo getVersionInfo(int nodeId) {
versionInfoReadWriteLock.readLock().lock();
try {
return nodeVersionInfo.getOrDefault(nodeId, new TNodeVersionInfo("Unknown", "Unknown"));
} finally {
versionInfoReadWriteLock.readLock().unlock();
}
}
public int generateNextNodeId() {
return nextNodeId.incrementAndGet();
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
"Failed to take snapshot, because snapshot file [{}] is already exist.",
snapshotFile.getAbsolutePath());
return false;
}
File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
configNodeInfoReadWriteLock.readLock().lock();
dataNodeInfoReadWriteLock.readLock().lock();
versionInfoReadWriteLock.readLock().lock();
try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
ReadWriteIOUtils.write(nextNodeId.get(), fileOutputStream);
serializeRegisteredConfigNode(fileOutputStream, protocol);
serializeRegisteredDataNode(fileOutputStream, protocol);
serializeVersionInfo(fileOutputStream);
tioStreamTransport.flush();
fileOutputStream.getFD().sync();
// The tmpFile can be renamed only after the stream is closed
tioStreamTransport.close();
return tmpFile.renameTo(snapshotFile);
} finally {
versionInfoReadWriteLock.readLock().unlock();
dataNodeInfoReadWriteLock.readLock().unlock();
configNodeInfoReadWriteLock.readLock().unlock();
for (int retry = 0; retry < 5; retry++) {
if (!tmpFile.exists() || tmpFile.delete()) {
break;
} else {
LOGGER.warn(
"Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
}
}
}
}
private void serializeRegisteredConfigNode(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(registeredConfigNodes.size(), outputStream);
for (Entry<Integer, TConfigNodeLocation> entry : registeredConfigNodes.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().write(protocol);
}
}
private void serializeRegisteredDataNode(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(registeredDataNodes.size(), outputStream);
for (Entry<Integer, TDataNodeConfiguration> entry : registeredDataNodes.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().write(protocol);
}
}
private void serializeVersionInfo(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(nodeVersionInfo.size(), outputStream);
for (Entry<Integer, TNodeVersionInfo> entry : nodeVersionInfo.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue().getVersion(), outputStream);
ReadWriteIOUtils.write(entry.getValue().getBuildInfo(), outputStream);
}
}
@Override
public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
"Failed to load snapshot,snapshot file [{}] is not exist.",
snapshotFile.getAbsolutePath());
return;
}
configNodeInfoReadWriteLock.writeLock().lock();
dataNodeInfoReadWriteLock.writeLock().lock();
versionInfoReadWriteLock.writeLock().lock();
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
clear();
nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
deserializeRegisteredConfigNode(fileInputStream, protocol);
deserializeRegisteredDataNode(fileInputStream, protocol);
deserializeBuildInfo(fileInputStream);
} finally {
versionInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
configNodeInfoReadWriteLock.writeLock().unlock();
}
}
private void deserializeRegisteredConfigNode(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
int configNodeId = ReadWriteIOUtils.readInt(inputStream);
TConfigNodeLocation configNodeLocation = new TConfigNodeLocation();
configNodeLocation.read(protocol);
registeredConfigNodes.put(configNodeId, configNodeLocation);
size--;
}
}
private void deserializeRegisteredDataNode(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
int dataNodeId = ReadWriteIOUtils.readInt(inputStream);
TDataNodeConfiguration dataNodeInfo = new TDataNodeConfiguration();
dataNodeInfo.read(protocol);
registeredDataNodes.put(dataNodeId, dataNodeInfo);
size--;
}
}
private void deserializeBuildInfo(InputStream inputStream) throws IOException {
// old version may not have build info,
// thus we need to check inputStream before deserialize.
if (inputStream.available() != 0) {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
int nodeId = ReadWriteIOUtils.readInt(inputStream);
String version = ReadWriteIOUtils.readString(inputStream);
String buildInfo = ReadWriteIOUtils.readString(inputStream);
nodeVersionInfo.put(nodeId, new TNodeVersionInfo(version, buildInfo));
size--;
}
}
}
public static int getMinimumDataNode() {
return MINIMUM_DATANODE;
}
public void clear() {
nextNodeId.set(-1);
registeredDataNodes.clear();
registeredConfigNodes.clear();
nodeVersionInfo.clear();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NodeInfo nodeInfo = (NodeInfo) o;
return registeredConfigNodes.equals(nodeInfo.registeredConfigNodes)
&& nextNodeId.get() == nodeInfo.nextNodeId.get()
&& registeredDataNodes.equals(nodeInfo.registeredDataNodes)
&& nodeVersionInfo.equals(nodeInfo.nodeVersionInfo);
}
@Override
public int hashCode() {
return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes, nodeVersionInfo);
}
}