| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.confignode.manager; |
| |
| import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.consensus.ConsensusGroupId; |
| import org.apache.iotdb.commons.consensus.PartitionRegionId; |
| import org.apache.iotdb.commons.exception.BadNodeUrlException; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
| import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; |
| import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; |
| import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine; |
| import org.apache.iotdb.confignode.exception.AddPeerException; |
| import org.apache.iotdb.confignode.manager.node.NodeManager; |
| import org.apache.iotdb.consensus.ConsensusFactory; |
| import org.apache.iotdb.consensus.IConsensus; |
| import org.apache.iotdb.consensus.common.Peer; |
| import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; |
| import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; |
| import org.apache.iotdb.consensus.config.ConsensusConfig; |
| import org.apache.iotdb.consensus.config.RatisConfig; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.apache.ratis.util.SizeInBytes; |
| import org.apache.ratis.util.TimeDuration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| /** ConsensusManager maintains consensus class, request will redirect to consensus layer */ |
| public class ConsensusManager { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class); |
| private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); |
| |
| private final IManager configManager; |
| |
| private ConsensusGroupId consensusGroupId; |
| private IConsensus consensusImpl; |
| private final int seedConfigNodeId = 0; |
| |
| public ConsensusManager(IManager configManager, PartitionRegionStateMachine stateMachine) |
| throws IOException { |
| this.configManager = configManager; |
| setConsensusLayer(stateMachine); |
| } |
| |
| public void close() throws IOException { |
| consensusImpl.stop(); |
| } |
| |
| /** ConsensusLayer local implementation */ |
| private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws IOException { |
| // There is only one ConfigNodeGroup |
| consensusGroupId = new PartitionRegionId(CONF.getPartitionRegionId()); |
| |
| // Implement local ConsensusLayer by ConfigNodeConfig |
| consensusImpl = |
| ConsensusFactory.getConsensusImpl( |
| CONF.getConfigNodeConsensusProtocolClass(), |
| ConsensusConfig.newBuilder() |
| .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())) |
| .setRatisConfig( |
| RatisConfig.newBuilder() |
| .setLeaderLogAppender( |
| RatisConfig.LeaderLogAppender.newBuilder() |
| .setBufferByteLimit( |
| CONF |
| .getPartitionRegionRatisConsensusLogAppenderBufferSize()) |
| .build()) |
| .setSnapshot( |
| RatisConfig.Snapshot.newBuilder() |
| .setAutoTriggerThreshold( |
| CONF.getPartitionRegionRatisSnapshotTriggerThreshold()) |
| .build()) |
| .setLog( |
| RatisConfig.Log.newBuilder() |
| .setUnsafeFlushEnabled( |
| CONF.isPartitionRegionRatisLogUnsafeFlushEnable()) |
| .setSegmentCacheSizeMax( |
| SizeInBytes.valueOf( |
| CONF.getPartitionRegionRatisLogSegmentSizeMax())) |
| .build()) |
| .setGrpc( |
| RatisConfig.Grpc.newBuilder() |
| .setFlowControlWindow( |
| SizeInBytes.valueOf( |
| CONF.getPartitionRegionRatisGrpcFlowControlWindow())) |
| .build()) |
| .setRpc( |
| RatisConfig.Rpc.newBuilder() |
| .setTimeoutMin( |
| TimeDuration.valueOf( |
| CONF |
| .getPartitionRegionRatisRpcLeaderElectionTimeoutMinMs(), |
| TimeUnit.MILLISECONDS)) |
| .setTimeoutMax( |
| TimeDuration.valueOf( |
| CONF |
| .getPartitionRegionRatisRpcLeaderElectionTimeoutMaxMs(), |
| TimeUnit.MILLISECONDS)) |
| .build()) |
| .build()) |
| .setStorageDir(CONF.getConsensusDir()) |
| .build(), |
| gid -> stateMachine) |
| .orElseThrow( |
| () -> |
| new IllegalArgumentException( |
| String.format( |
| ConsensusFactory.CONSTRUCT_FAILED_MSG, |
| CONF.getConfigNodeConsensusProtocolClass()))); |
| consensusImpl.start(); |
| |
| if (SystemPropertiesUtils.isRestarted()) { |
| try { |
| // Create ConsensusGroup from confignode-system.properties file when restart |
| // TODO: Check and notify if current ConfigNode's ip or port has changed |
| createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); |
| } catch (BadNodeUrlException e) { |
| throw new IOException(e); |
| } |
| } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { |
| // Create ConsensusGroup that contains only itself |
| // if the current ConfigNode is Seed-ConfigNode |
| createPeerForConsensusGroup( |
| Collections.singletonList( |
| new TConfigNodeLocation( |
| seedConfigNodeId, |
| new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), |
| new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); |
| } |
| } |
| |
| /** |
| * Create peer in new node to build consensus group |
| * |
| * @param configNodeLocations All registered ConfigNodes |
| */ |
| public void createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { |
| if (configNodeLocations.size() == 0) { |
| LOGGER.warn("configNodeLocations is empty, createPeerForConsensusGroup failed."); |
| return; |
| } |
| |
| LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations); |
| |
| List<Peer> peerList = new ArrayList<>(); |
| for (TConfigNodeLocation configNodeLocation : configNodeLocations) { |
| peerList.add(new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint())); |
| } |
| consensusImpl.createPeer(consensusGroupId, peerList); |
| } |
| |
| /** |
| * Add new ConfigNode Peer into PartitionRegion |
| * |
| * @param configNodeLocation The new ConfigNode |
| * @throws AddPeerException When addPeer doesn't success |
| */ |
| public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException { |
| boolean result = |
| consensusImpl |
| .addPeer( |
| consensusGroupId, |
| new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint())) |
| .isSuccess(); |
| |
| if (!result) { |
| throw new AddPeerException(configNodeLocation); |
| } |
| } |
| |
| /** |
| * Remove a ConfigNode Peer out of PartitionRegion |
| * |
| * @param tConfigNodeLocation config node location |
| * @return True if successfully removePeer. False if another ConfigNode is being removed to the |
| * PartitionRegion |
| */ |
| public boolean removeConfigNodePeer(TConfigNodeLocation tConfigNodeLocation) { |
| return consensusImpl |
| .removePeer( |
| consensusGroupId, |
| new Peer(consensusGroupId, tConfigNodeLocation.getConsensusEndPoint())) |
| .isSuccess(); |
| } |
| |
| /** Transmit PhysicalPlan to confignode.consensus.statemachine */ |
| public ConsensusWriteResponse write(ConfigPhysicalPlan plan) { |
| return consensusImpl.write(consensusGroupId, plan); |
| } |
| |
| /** Transmit PhysicalPlan to confignode.consensus.statemachine */ |
| public ConsensusReadResponse read(ConfigPhysicalPlan plan) { |
| return consensusImpl.read(consensusGroupId, plan); |
| } |
| |
| public boolean isLeader() { |
| return consensusImpl.isLeader(consensusGroupId); |
| } |
| |
| /** @return ConfigNode-leader's location if leader exists, null otherwise. */ |
| public TConfigNodeLocation getLeader() { |
| for (int retry = 0; retry < 50; retry++) { |
| Peer leaderPeer = consensusImpl.getLeader(consensusGroupId); |
| if (leaderPeer != null) { |
| List<TConfigNodeLocation> registeredConfigNodes = |
| getNodeManager().getRegisteredConfigNodes(); |
| TConfigNodeLocation leaderLocation = |
| registeredConfigNodes.stream() |
| .filter(leader -> leader.getConsensusEndPoint().equals(leaderPeer.getEndpoint())) |
| .findFirst() |
| .orElse(null); |
| if (leaderLocation != null) { |
| return leaderLocation; |
| } |
| } |
| |
| try { |
| TimeUnit.MILLISECONDS.sleep(100); |
| } catch (InterruptedException e) { |
| LOGGER.warn("ConsensusManager getLeader been interrupted, ", e); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Confirm the current ConfigNode's leadership |
| * |
| * @return SUCCESS_STATUS if the current ConfigNode is leader, NEED_REDIRECTION otherwise |
| */ |
| public TSStatus confirmLeader() { |
| TSStatus result = new TSStatus(); |
| |
| if (isLeader()) { |
| return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| } else { |
| result.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); |
| result.setMessage( |
| "The current ConfigNode is not leader, please redirect to a new ConfigNode."); |
| |
| TConfigNodeLocation leaderLocation = getLeader(); |
| if (leaderLocation != null) { |
| result.setRedirectNode(leaderLocation.getInternalEndPoint()); |
| } |
| |
| return result; |
| } |
| } |
| |
| public ConsensusGroupId getConsensusGroupId() { |
| return consensusGroupId; |
| } |
| |
| public IConsensus getConsensusImpl() { |
| return consensusImpl; |
| } |
| |
| private NodeManager getNodeManager() { |
| return configManager.getNodeManager(); |
| } |
| |
| @TestOnly |
| public void singleCopyMayWaitUntilLeaderReady() { |
| long startTime = System.currentTimeMillis(); |
| long maxWaitTime = 1000 * 60; // milliseconds, which is 60s |
| try { |
| while (!consensusImpl.isLeader(consensusGroupId)) { |
| TimeUnit.MILLISECONDS.sleep(100); |
| long elapsed = System.currentTimeMillis() - startTime; |
| if (elapsed > maxWaitTime) { |
| return; |
| } |
| } |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |