| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.confignode.service; |
| |
| import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.exception.StartupException; |
| import org.apache.iotdb.commons.service.JMXService; |
| import org.apache.iotdb.commons.service.RegisterManager; |
| import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager; |
| import org.apache.iotdb.commons.udf.service.UDFExecutableManager; |
| import org.apache.iotdb.commons.udf.service.UDFRegistrationService; |
| import org.apache.iotdb.confignode.client.ConfigNodeRequestType; |
| import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConstant; |
| import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
| import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; |
| import org.apache.iotdb.confignode.manager.ConfigManager; |
| import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; |
| import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService; |
| import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor; |
| import org.apache.iotdb.db.service.metrics.MetricService; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.concurrent.TimeUnit; |
| |
| public class ConfigNode implements ConfigNodeMBean { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class); |
| |
| private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); |
| |
| private static final int SCHEDULE_WAITING_RETRY_NUM = 20; |
| |
| private final String mbeanName = |
| String.format( |
| "%s:%s=%s", |
| ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode"); |
| private final RegisterManager registerManager = new RegisterManager(); |
| |
| private ConfigManager configManager; |
| |
| private ConfigNode() { |
| // we do not init anything here, so that we can re-initialize the instance in IT. |
| } |
| |
| public static void main(String[] args) { |
| new ConfigNodeCommandLine().doMain(args); |
| } |
| |
| public void active() { |
| LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME); |
| |
| try { |
| // Set up internal services |
| setUpInternalServices(); |
| // Init ConfigManager |
| initConfigManager(); |
| |
| /* Restart */ |
| if (SystemPropertiesUtils.isRestarted()) { |
| setUpRPCService(); |
| LOGGER.info( |
| "{} has successfully started and joined the cluster.", ConfigNodeConstant.GLOBAL_NAME); |
| return; |
| } |
| |
| /* Initial startup of Seed-ConfigNode */ |
| if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { |
| SystemPropertiesUtils.storeSystemParameters(); |
| // Seed-ConfigNode should apply itself when first start |
| configManager |
| .getNodeManager() |
| .applyConfigNode( |
| new TConfigNodeLocation( |
| 0, |
| new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), |
| new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))); |
| // We always set up Seed-ConfigNode's RPC service lastly to ensure that |
| // the external service is not provided until Seed-ConfigNode is fully initialized |
| setUpRPCService(); |
| // The initial startup of Seed-ConfigNode finished |
| LOGGER.info( |
| "{} has successfully started and joined the cluster.", ConfigNodeConstant.GLOBAL_NAME); |
| return; |
| } |
| |
| /* Initial startup of Non-Seed-ConfigNode */ |
| // We set up Non-Seed ConfigNode's RPC service before sending the register request |
| // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader |
| setUpRPCService(); |
| registerConfigNode(); |
| // The initial startup of Non-Seed-ConfigNode is not yet finished, |
| // we should wait for leader's scheduling |
| LOGGER.info( |
| "{} has registered successfully. Waiting for the leader's scheduling to join the cluster.", |
| ConfigNodeConstant.GLOBAL_NAME); |
| |
| boolean isJoinedCluster = false; |
| for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) { |
| if (configManager.getConsensusManager().getConsensusImpl().getAllConsensusGroupIds().size() |
| > 0) { |
| isJoinedCluster = true; |
| break; |
| } |
| |
| try { |
| TimeUnit.MILLISECONDS.sleep(1000); |
| } catch (InterruptedException e) { |
| LOGGER.warn("Waiting leader's scheduling is interrupted."); |
| } |
| } |
| |
| if (!isJoinedCluster) { |
| LOGGER.error( |
| "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect."); |
| stop(); |
| } |
| } catch (StartupException | IOException e) { |
| LOGGER.error("Meet error while starting up.", e); |
| try { |
| stop(); |
| } catch (IOException e2) { |
| LOGGER.error("Meet error when stop ConfigNode!", e); |
| } |
| } |
| } |
| |
| private void initConfigManager() { |
| try { |
| configManager = new ConfigManager(); |
| } catch (IOException e) { |
| LOGGER.error("Can't start ConfigNode consensus group!", e); |
| try { |
| stop(); |
| } catch (IOException e2) { |
| LOGGER.error("Meet error when stop ConfigNode!", e); |
| } |
| } |
| // Add some Metrics for configManager |
| configManager.addMetrics(); |
| LOGGER.info("Successfully initialize ConfigManager."); |
| } |
| |
| private void setUpInternalServices() throws StartupException, IOException { |
| // Setup JMXService |
| registerManager.register(new JMXService()); |
| JMXService.registerMBean(this, mbeanName); |
| |
| // Setup UDFService |
| registerManager.register( |
| UDFExecutableManager.setupAndGetInstance(CONF.getTemporaryLibDir(), CONF.getUdfLibDir())); |
| registerManager.register(UDFClassLoaderManager.setupAndGetInstance(CONF.getUdfLibDir())); |
| registerManager.register(UDFRegistrationService.setupAndGetInstance(CONF.getSystemUdfDir())); |
| |
| registerManager.register(MetricService.getInstance()); |
| LOGGER.info("Successfully setup internal services."); |
| } |
| |
| /** Register Non-seed ConfigNode when first startup */ |
| private void registerConfigNode() throws StartupException, IOException { |
| TConfigNodeRegisterReq req = |
| new TConfigNodeRegisterReq( |
| new TConfigNodeLocation( |
| -1, |
| new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), |
| new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())), |
| CONF.getDataRegionConsensusProtocolClass(), |
| CONF.getSchemaRegionConsensusProtocolClass(), |
| CONF.getSeriesPartitionSlotNum(), |
| CONF.getSeriesPartitionExecutorClass(), |
| CommonDescriptor.getInstance().getConfig().getDefaultTTL(), |
| CONF.getTimePartitionInterval(), |
| CONF.getSchemaReplicationFactor(), |
| CONF.getSchemaRegionPerDataNode(), |
| CONF.getDataReplicationFactor(), |
| CONF.getDataRegionPerProcessor(), |
| CONF.getReadConsistencyLevel(), |
| CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); |
| |
| TEndPoint targetConfigNode = CONF.getTargetConfigNode(); |
| if (targetConfigNode == null) { |
| LOGGER.error("The targetConfigNode setting in conf is empty"); |
| throw new StartupException("The targetConfigNode setting in conf is empty"); |
| } |
| |
| for (int retry = 0; retry < 3; retry++) { |
| TSStatus status = |
| (TSStatus) |
| SyncConfigNodeClientPool.getInstance() |
| .sendSyncRequestToConfigNodeWithRetry( |
| targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE); |
| if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| return; |
| } else if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) { |
| targetConfigNode = status.getRedirectNode(); |
| LOGGER.info("ConfigNode need redirect to {}.", targetConfigNode); |
| } else if (status.getCode() == TSStatusCode.ERROR_GLOBAL_CONFIG.getStatusCode()) { |
| LOGGER.error(status.getMessage()); |
| throw new StartupException("Configuration are not consistent!"); |
| } |
| |
| try { |
| TimeUnit.MILLISECONDS.sleep(1000); |
| } catch (InterruptedException e) { |
| throw new StartupException("Register ConfigNode failed!"); |
| } |
| } |
| |
| LOGGER.error( |
| "The current ConfigNode can't send register request to the Seed-ConfigNode after all retries!"); |
| stop(); |
| } |
| |
| private void setUpRPCService() throws StartupException { |
| // Setup RPCService |
| ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService(); |
| ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor = |
| new ConfigNodeRPCServiceProcessor(configManager); |
| configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor); |
| registerManager.register(configNodeRPCService); |
| } |
| |
| public void stop() throws IOException { |
| LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME); |
| registerManager.deregisterAll(); |
| JMXService.deregisterMBean(mbeanName); |
| if (configManager != null) { |
| configManager.close(); |
| } |
| LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME); |
| System.exit(-1); |
| } |
| |
| private static class ConfigNodeHolder { |
| |
| private static final ConfigNode INSTANCE = new ConfigNode(); |
| |
| private ConfigNodeHolder() { |
| // Empty constructor |
| } |
| } |
| |
| public static ConfigNode getInstance() { |
| return ConfigNodeHolder.INSTANCE; |
| } |
| } |