blob: 293663e71ec1a02582ef1a9687b93058d9478e6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.IoTDBStartCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.protocol.mpprest.MPPRestService;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
import org.apache.iotdb.db.sync.receiver.ReceiverService;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DataNode implements DataNodeMBean {
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final String mbeanName =
String.format(
"%s:%s=%s", "org.apache.iotdb.datanode.service", IoTDBConstant.JMX_TYPE, "DataNode");
/**
* when joining a cluster this node will retry at most "DEFAULT_JOIN_RETRY" times before returning
* a failure to the client
*/
private static final int DEFAULT_JOIN_RETRY = 10;
private final TEndPoint thisNode = new TEndPoint();
private DataNode() {
// we do not init anything here, so that we can re-initialize the instance in IT.
}
private static final RegisterManager registerManager = new RegisterManager();
public static ServiceProvider serviceProvider;
public static DataNode getInstance() {
return DataNodeHolder.INSTANCE;
}
public static void main(String[] args) {
new DataNodeServerCommandLine().doMain(args);
}
protected void serverCheckAndInit() throws ConfigurationException, IOException {
IoTDBStartCheck.getInstance().checkConfig();
// TODO: check configuration for data node
// if client ip is the default address, set it same with internal ip
if (config.getRpcAddress().equals("0.0.0.0")) {
config.setRpcAddress(config.getInternalAddress());
}
thisNode.setIp(IoTDBDescriptor.getInstance().getConfig().getInternalAddress());
thisNode.setPort(IoTDBDescriptor.getInstance().getConfig().getInternalPort());
}
protected void doAddNode(String[] args) {
try {
// prepare cluster IoTDB-DataNode
prepareDataNode();
// register current DataNode to ConfigNode
registerInConfigNode();
// active DataNode
active();
// setup rpc service
setUpRPCService();
logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
} catch (StartupException e) {
logger.error("Fail to start server", e);
stop();
}
}
/** initialize the current node and its services */
public boolean initLocalEngines() {
IoTDB.setClusterMode();
return true;
}
/** Prepare cluster IoTDB-DataNode */
private void prepareDataNode() throws StartupException {
// check iotdb server first
StartupChecks checks = new StartupChecks().withDefaultTest();
checks.verify();
// Register services
JMXService.registerMBean(getInstance(), mbeanName);
// set the mpp mode to true
IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
IoTDBDescriptor.getInstance().getConfig().setClusterMode(true);
}
/** register DataNode with ConfigNode */
private void registerInConfigNode() throws StartupException {
int retry = DEFAULT_JOIN_RETRY;
ConfigNodeInfo.getInstance()
.updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
while (retry > 0) {
logger.info("start registering to the cluster.");
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
TDataNodeRegisterReq req = new TDataNodeRegisterReq();
req.setDataNodeConfiguration(generateDataNodeConfiguration());
TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
// store config node lists from resp
List<TEndPoint> configNodeList = new ArrayList<>();
for (TConfigNodeLocation configNodeLocation : dataNodeRegisterResp.getConfigNodeList()) {
configNodeList.add(configNodeLocation.getInternalEndPoint());
}
ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
ClusterTemplateManager.getInstance()
.updateTemplateSetInfo(dataNodeRegisterResp.getTemplateInfo());
if (dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
logger.info(dataNodeRegisterResp.getStatus().getMessage());
int dataNodeID = dataNodeRegisterResp.getDataNodeId();
if (dataNodeID != config.getDataNodeId()) {
IoTDBStartCheck.getInstance().serializeDataNodeId(dataNodeID);
config.setDataNodeId(dataNodeID);
}
IoTDBDescriptor.getInstance().loadGlobalConfig(dataNodeRegisterResp.globalConfig);
if (!IoTDBStartCheck.getInstance()
.checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
config.setDataRegionConsensusProtocolClass(
dataNodeRegisterResp.globalConfig.getDataRegionConsensusProtocolClass());
IoTDBStartCheck.getInstance()
.serializeConsensusProtocol(
dataNodeRegisterResp.globalConfig.getDataRegionConsensusProtocolClass(),
TConsensusGroupType.DataRegion);
}
if (!IoTDBStartCheck.getInstance()
.checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
config.setSchemaRegionConsensusProtocolClass(
dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass());
IoTDBStartCheck.getInstance()
.serializeConsensusProtocol(
dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass(),
TConsensusGroupType.SchemaRegion);
}
config.setSeriesPartitionExecutorClass(
dataNodeRegisterResp.globalConfig.getSeriesPartitionExecutorClass());
config.setSeriesPartitionSlotNum(
dataNodeRegisterResp.globalConfig.getSeriesPartitionSlotNum());
config.setReadConsistencyLevel(
dataNodeRegisterResp.globalConfig.getReadConsistencyLevel());
logger.info("Register to the cluster successfully");
return;
}
} catch (IOException e) {
logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
} catch (TException e) {
// read config nodes from system.properties
logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
ConfigNodeInfo.getInstance().loadConfigNodeList();
}
try {
// wait 5s to start the next try
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getJoinClusterTimeOutMs());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when waiting to register to the cluster", e);
break;
}
// start the next try
retry--;
}
// all tries failed
logger.error("Cannot register to the cluster after {} retries", DEFAULT_JOIN_RETRY);
throw new StartupException("Cannot register to the cluster.");
}
/** register services and set up DataNode */
private void active() throws StartupException {
try {
setUp();
} catch (StartupException | QueryProcessException e) {
logger.error("meet error while starting up.", e);
throw new StartupException("Error in activating IoTDB DataNode.");
}
logger.info("IoTDB DataNode has started.");
try {
// TODO: Start consensus layer in some where else
SchemaRegionConsensusImpl.setupAndGetInstance().start();
DataRegionConsensusImpl.setupAndGetInstance().start();
} catch (IOException e) {
throw new StartupException(e);
}
}
private void setUp() throws StartupException, QueryProcessException {
logger.info("Setting up IoTDB DataNode...");
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
initServiceProvider();
// init metric service
registerManager.register(MetricsService.getInstance());
logger.info("recover the schema...");
initSchemaEngine();
registerManager.register(new JMXService());
registerManager.register(FlushManager.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
// close wal when using ratis consensus
if (config.isClusterMode()
&& config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus)) {
config.setWalMode(WALMode.DISABLE);
}
registerManager.register(WALManager.getInstance());
// in mpp mode we need to start some other services
registerManager.register(StorageEngineV2.getInstance());
registerManager.register(MPPDataExchangeService.getInstance());
registerManager.register(DriverScheduler.getInstance());
registerUdfServices();
registerManager.register(ReceiverService.getInstance());
logger.info(
"IoTDB DataNode is setting up, some storage groups may not be ready now, please wait several seconds...");
while (!StorageEngineV2.getInstance().isAllSgReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.warn("IoTDB DataNode failed to set up.", e);
Thread.currentThread().interrupt();
return;
}
}
registerManager.register(SenderService.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
// in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
// in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
// registerManager.register(SettleService.getINSTANCE());
registerManager.register(TriggerRegistrationService.getInstance());
registerManager.register(ContinuousQueryService.getInstance());
// start reporter
MetricsService.getInstance().startAllReporter();
// start region migrate service
registerManager.register(RegionMigrateService.getInstance());
}
/** set up RPC and protocols after DataNode is available */
private void setUpRPCService() throws StartupException {
// Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
registerManager.register(DataNodeInternalRPCService.getInstance());
// Notice: During the period between starting the internal RPC service
// and starting the client RPC service , some requests may fail because
// DataNode is not marked as RUNNING by ConfigNode-leader yet.
// Start client RPCService to indicate that the current DataNode provide external services
IoTDBDescriptor.getInstance()
.getConfig()
.setRpcImplClassName(ClientRPCServiceImpl.class.getName());
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
registerManager.register(RPCService.getInstance());
}
// init service protocols
initProtocols();
}
/**
* generate dataNodeConfiguration
*
* @return TDataNodeConfiguration
*/
private TDataNodeConfiguration generateDataNodeConfiguration() {
// Set DataNodeLocation
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(config.getDataNodeId());
location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
location.setInternalEndPoint(
new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
location.setMPPDataExchangeEndPoint(
new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
location.setDataRegionConsensusEndPoint(
new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
location.setSchemaRegionConsensusEndPoint(
new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
// Set NodeResource
TNodeResource resource = new TNodeResource();
resource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
resource.setMaxMemory(Runtime.getRuntime().totalMemory());
return new TDataNodeConfiguration(location, resource);
}
private void registerUdfServices() throws StartupException {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(
UDFExecutableManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getTemporaryLibDir(),
IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
registerManager.register(
UDFClassLoaderManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
registerManager.register(
UDFRegistrationService.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ File.separator
+ "udf"
+ File.separator));
}
private void initSchemaEngine() {
long time = System.currentTimeMillis();
SchemaEngine.getInstance().init();
long end = System.currentTimeMillis() - time;
logger.info("spend {}ms to recover schema.", end);
logger.info(
"After initializing, sequence tsFile threshold is {}, unsequence tsFile threshold is {}",
IoTDBDescriptor.getInstance().getConfig().getSeqTsFileSize(),
IoTDBDescriptor.getInstance().getConfig().getUnSeqTsFileSize());
}
public void stop() {
deactivate();
// QSW
try {
MetricsService.getInstance().stop();
SchemaRegionConsensusImpl.getInstance().stop();
DataRegionConsensusImpl.getInstance().stop();
} catch (Exception e) {
logger.error("stop data node error", e);
}
}
private void initServiceProvider() throws QueryProcessException {
serviceProvider = new StandaloneServiceProvider();
}
private void initProtocols() throws StartupException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableInfluxDBRpcService()) {
registerManager.register(InfluxDBRPCService.getInstance());
IoTDB.initInfluxDBMManager();
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
registerManager.register(MQTTService.getInstance());
}
if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
registerManager.register(MPPRestService.getInstance());
}
}
private void deactivate() {
logger.info("Deactivating IoTDB DataNode...");
// stopThreadPools();
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
logger.info("IoTDB DataNode is deactivated.");
}
private void setUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
}
private static class DataNodeHolder {
private static final DataNode INSTANCE = new DataNode();
private DataNodeHolder() {}
}
}