blob: f3985847386d58d9aadd8859712babdf2b1acb1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
import org.apache.iotdb.commons.concurrent.ThreadModule;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.cpu.CpuUsageMetrics;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ConfigNode implements ConfigNodeMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
private static final int STARTUP_RETRY_NUM = 10;
private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
private static final int SCHEDULE_WAITING_RETRY_NUM =
(int) (COMMON_CONFIG.getConnectionTimeoutInMS() / STARTUP_RETRY_INTERVAL_IN_MS);
private static final int SEED_CONFIG_NODE_ID = 0;
private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
private static final String CONFIGURATION = "IoTDB configuration: {}";
private final String mbeanName =
String.format(
"%s:%s=%s",
IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
IoTDBConstant.JMX_TYPE,
ServiceType.CONFIG_NODE.getJmxName());
private final RegisterManager registerManager = new RegisterManager();
protected ConfigManager configManager;
private ConfigNode() {
// We do not init anything here, so that we can re-initialize the instance in IT.
}
public static void main(String[] args) {
LOGGER.info(
"{} environment variables: {}",
ConfigNodeConstant.GLOBAL_NAME,
ConfigNodeConfig.getEnvironmentVariables());
LOGGER.info(
"{} default charset is: {}",
ConfigNodeConstant.GLOBAL_NAME,
Charset.defaultCharset().displayName());
new ConfigNodeCommandLine().doMain(args);
}
public void active() {
LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
try {
processPid();
// Add shutdown hook
addShutDownHook();
// Set up internal services
setUpInternalServices();
// Init ConfigManager
initConfigManager();
/* Restart */
if (SystemPropertiesUtils.isRestarted()) {
LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
int configNodeId = CONF.getConfigNodeId();
configManager.initConsensusManager();
waitForLeaderElected();
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until ConfigNode is fully available
setUpRPCService();
LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
LOGGER.info(
"{} has successfully restarted and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getClusterName());
// Update item during restart
// This will always be executed until the consensus write succeeds
while (true) {
TSStatus status =
configManager
.getNodeManager()
.updateConfigNodeIfNecessary(
configNodeId,
new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
break;
} else {
startUpSleep("restart ConfigNode failed! ");
}
}
return;
}
/* Initial startup of Seed-ConfigNode */
if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
LOGGER.info(
"The current {} is now starting as the Seed-ConfigNode.",
ConfigNodeConstant.GLOBAL_NAME);
/* Always set ClusterId and ConfigNodeId before initConsensusManager */
CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
configManager.initConsensusManager();
// Persistence system parameters after the consensusGroup is built,
// or the consensusGroup will not be initialized successfully otherwise.
SystemPropertiesUtils.storeSystemParameters();
// Wait for ConfigNode-leader elected before applying itself
waitForLeaderElected();
// Seed-ConfigNode should apply itself when first start
configManager
.getNodeManager()
.applyConfigNode(
generateConfigNodeLocation(SEED_CONFIG_NODE_ID),
new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until Seed-ConfigNode is fully initialized
setUpRPCService();
// The initial startup of Seed-ConfigNode finished
LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
LOGGER.info(
"{} has successfully started and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getClusterName());
return;
}
/* Initial startup of Non-Seed-ConfigNode */
// We set up Non-Seed ConfigNode's RPC service before sending the register request
// in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
setUpRPCService();
sendRegisterConfigNodeRequest();
// The initial startup of Non-Seed-ConfigNode is not yet finished,
// we should wait for leader's scheduling
LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
LOGGER.info(
"{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getConfigNodeId(),
CONF.getClusterName());
setUpMetricService();
boolean isJoinedCluster = false;
for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
if (!configManager
.getConsensusManager()
.getConsensusImpl()
.getAllConsensusGroupIds()
.isEmpty()) {
isJoinedCluster = true;
break;
}
startUpSleep("Waiting leader's scheduling is interrupted.");
}
if (!isJoinedCluster) {
LOGGER.error(
"The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
stop();
}
} catch (StartupException | IOException e) {
LOGGER.error("Meet error while starting up.", e);
stop();
}
}
void processPid() {
String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
if (pidFile != null) {
new File(pidFile).deleteOnExit();
}
}
private void setUpInternalServices() throws StartupException {
// Setup JMXService
registerManager.register(new JMXService());
JMXService.registerMBean(this, mbeanName);
// Init Pipe Runtime Agent
registerManager.register(PipeConfigNodeAgent.runtime());
LOGGER.info("Successfully setup internal services.");
}
private void setUpMetricService() throws StartupException {
MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
registerManager.register(MetricService.getInstance());
// Bind predefined metric sets
MetricService.getInstance().addMetricSet(new UpTimeMetrics());
MetricService.getInstance().addMetricSet(new JvmMetrics());
MetricService.getInstance().addMetricSet(new LogbackMetrics());
MetricService.getInstance().addMetricSet(new ProcessMetrics());
MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
initCpuMetrics();
initSystemMetrics();
MetricService.getInstance()
.addMetricSet(new PipeConfigNodeMetrics(configManager.getPipeManager()));
}
private void initSystemMetrics() {
ArrayList<String> diskDirs = new ArrayList<>();
diskDirs.add(CONF.getSystemDir());
diskDirs.add(CONF.getConsensusDir());
SystemMetrics.getInstance().setDiskDirs(diskDirs);
MetricService.getInstance().addMetricSet(SystemMetrics.getInstance());
}
private void initCpuMetrics() {
List<String> threadModules = new ArrayList<>();
Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
List<String> pools = new ArrayList<>();
Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
MetricService.getInstance()
.addMetricSet(
new CpuUsageMetrics(
threadModules,
pools,
x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
}
private void initConfigManager() {
try {
this.configManager = new ConfigManager();
} catch (Exception e) {
LOGGER.error("Can't start ConfigNode consensus group!", e);
stop();
}
LOGGER.info("Successfully initialize ConfigManager.");
}
/**
* Register Non-seed {@link ConfigNode} when first startup.
*
* @throws StartupException if register failed.
* @throws IOException if {@link ConsensusManager} init failed.
*/
private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
TConfigNodeRegisterReq req =
new TConfigNodeRegisterReq(
configManager.getClusterParameters(),
generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
TEndPoint seedConfigNode = CONF.getSeedConfigNode();
if (seedConfigNode == null) {
LOGGER.error(
"Please set the cn_seed_config_node parameter in iotdb-confignode.properties file.");
throw new StartupException("The seedConfigNode setting in conf is empty");
}
for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
TSStatus status;
TConfigNodeRegisterResp resp = null;
Object obj =
SyncConfigNodeClientPool.getInstance()
.sendSyncRequestToConfigNodeWithRetry(
seedConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
if (obj instanceof TConfigNodeRegisterResp) {
resp = (TConfigNodeRegisterResp) obj;
status = resp.getStatus();
} else {
status = (TSStatus) obj;
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (resp == null) {
LOGGER.error("The result of register ConfigNode is empty!");
throw new StartupException("The result of register ConfigNode is empty!");
}
/* Always set ConfigNodeId before initConsensusManager */
CONF.setConfigNodeId(resp.getConfigNodeId());
configManager.initConsensusManager();
return;
} else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
seedConfigNode = status.getRedirectNode();
LOGGER.info("ConfigNode need redirect to {}, retry {} ...", seedConfigNode, retry);
} else if (status.getCode() == TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) {
LOGGER.warn("The result of register self ConfigNode is {}, retry {} ...", status, retry);
} else {
throw new StartupException(status.getMessage());
}
startUpSleep("Register ConfigNode failed!");
}
LOGGER.error(
"The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
stop();
}
private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
return new TConfigNodeLocation(
configNodeId,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
}
private void startUpSleep(String errorMessage) throws StartupException {
try {
TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException(errorMessage);
}
}
private void setUpRPCService() throws StartupException {
// Setup RPCService
ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
new ConfigNodeRPCServiceProcessor(configManager);
configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
registerManager.register(configNodeRPCService);
}
private void waitForLeaderElected() {
while (!configManager.getConsensusManager().isLeaderExist()) {
LOGGER.info("Leader has not been elected yet, wait for 1 second");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during waiting for leader election.");
}
}
}
/**
* Deactivating {@link ConfigNode} internal services.
*
* @throws IOException if close {@link ConfigNode} failed.
*/
public void deactivate() throws IOException {
LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
if (configManager != null) {
configManager.close();
}
LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
}
public void stop() {
try {
deactivate();
} catch (IOException e) {
LOGGER.error("Meet error when deactivate ConfigNode", e);
}
System.exit(-1);
}
public ConfigManager getConfigManager() {
return configManager;
}
protected void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
}
@TestOnly
public void setConfigManager(ConfigManager configManager) {
this.configManager = configManager;
}
private static class ConfigNodeHolder {
private static final ConfigNode INSTANCE = new ConfigNode();
private ConfigNodeHolder() {
// Empty constructor
}
}
public static ConfigNode getInstance() {
return ConfigNodeHolder.INSTANCE;
}
}