| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.common.transport.server.ratis; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.security.cert.X509Certificate; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.EnumMap; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; |
| import org.apache.hadoop.hdds.conf.StorageUnit; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; |
| import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; |
| import org.apache.hadoop.hdds.ratis.RatisHelper; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.security.x509.SecurityConfig; |
| import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; |
| import org.apache.hadoop.hdds.tracing.TracingUtil; |
| import org.apache.hadoop.hdds.utils.HAUtils; |
| import org.apache.hadoop.hdds.utils.HddsServerUtil; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.container.common.impl.ContainerData; |
| import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; |
| import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; |
| import org.apache.hadoop.ozone.container.common.statemachine.StateContext; |
| import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; |
| import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; |
| import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import io.opentracing.Scope; |
| import io.opentracing.Span; |
| import io.opentracing.util.GlobalTracer; |
| import org.apache.ratis.RaftConfigKeys; |
| import org.apache.ratis.conf.Parameters; |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.grpc.GrpcConfigKeys; |
| import org.apache.ratis.grpc.GrpcTlsConfig; |
| import org.apache.ratis.netty.NettyConfigKeys; |
| import org.apache.ratis.proto.RaftProtos; |
| import org.apache.ratis.proto.RaftProtos.RoleInfoProto; |
| import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| import org.apache.ratis.protocol.exceptions.StateMachineException; |
| import org.apache.ratis.protocol.ClientId; |
| import org.apache.ratis.protocol.GroupInfoReply; |
| import org.apache.ratis.protocol.GroupInfoRequest; |
| import org.apache.ratis.protocol.GroupManagementRequest; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftClientRequest; |
| import org.apache.ratis.protocol.RaftGroup; |
| import org.apache.ratis.protocol.RaftGroupId; |
| import org.apache.ratis.protocol.RaftGroupMemberId; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.rpc.RpcType; |
| import org.apache.ratis.rpc.SupportedRpcType; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.RaftServerRpc; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.util.SizeInBytes; |
| import org.apache.ratis.util.TimeDuration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.hadoop.hdds.DatanodeVersions.SEPARATE_RATIS_PORTS_AVAILABLE; |
| |
| /** |
| * Creates a ratis server endpoint that acts as the communication layer for |
| * Ozone containers. |
| */ |
| public final class XceiverServerRatis implements XceiverServerSpi { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(XceiverServerRatis.class); |
| private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); |
| private static final List<Integer> DEFAULT_PRIORITY_LIST = |
| new ArrayList<>( |
| Collections.nCopies(HddsProtos.ReplicationFactor.THREE_VALUE, 0)); |
| |
| private static long nextCallId() { |
| return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; |
| } |
| |
| private int serverPort; |
| private int adminPort; |
| private int clientPort; |
| private final RaftServer server; |
| private final List<ThreadPoolExecutor> chunkExecutors; |
| private final ContainerDispatcher dispatcher; |
| private final ContainerController containerController; |
| private ClientId clientId = ClientId.randomId(); |
| private final StateContext context; |
| private long nodeFailureTimeoutMs; |
| private boolean isStarted = false; |
| private DatanodeDetails datanodeDetails; |
| private final ConfigurationSource conf; |
| // TODO: Remove the gids set when Ratis supports an api to query active |
| // pipelines |
| private final Set<RaftGroupId> raftGids = new HashSet<>(); |
| private final RaftPeerId raftPeerId; |
| // pipelines for which I am the leader |
| private Map<RaftGroupId, Boolean> groupLeaderMap = new ConcurrentHashMap<>(); |
| // Timeout used while calling submitRequest directly. |
| private long requestTimeout; |
| private boolean shouldDeleteRatisLogDirectory; |
| |
| /** |
| * Maintains a list of active volumes per StorageType. |
| */ |
| private EnumMap<StorageType, List<String>> ratisVolumeMap; |
| |
| private XceiverServerRatis(DatanodeDetails dd, |
| ContainerDispatcher dispatcher, ContainerController containerController, |
| StateContext context, ConfigurationSource conf, Parameters parameters) |
| throws IOException { |
| this.conf = conf; |
| Objects.requireNonNull(dd, "id == null"); |
| datanodeDetails = dd; |
| assignPorts(); |
| RaftProperties serverProperties = newRaftProperties(); |
| this.context = context; |
| this.dispatcher = dispatcher; |
| this.containerController = containerController; |
| this.raftPeerId = RatisHelper.toRaftPeerId(dd); |
| chunkExecutors = createChunkExecutors(conf); |
| nodeFailureTimeoutMs = |
| conf.getObject(DatanodeRatisServerConfig.class) |
| .getFollowerSlownessTimeout(); |
| shouldDeleteRatisLogDirectory = |
| conf.getObject(DatanodeRatisServerConfig.class) |
| .shouldDeleteRatisLogDirectory(); |
| |
| RaftServer.Builder builder = |
| RaftServer.newBuilder().setServerId(raftPeerId) |
| .setProperties(serverProperties) |
| .setStateMachineRegistry(this::getStateMachine) |
| .setParameters(parameters); |
| this.server = builder.build(); |
| this.requestTimeout = conf.getTimeDuration( |
| HddsConfigKeys.HDDS_DATANODE_RATIS_SERVER_REQUEST_TIMEOUT, |
| HddsConfigKeys.HDDS_DATANODE_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| initializeRatisVolumeMap(); |
| } |
| |
| private void assignPorts() { |
| clientPort = determinePort( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); |
| |
| if (datanodeDetails.getInitialVersion() >= SEPARATE_RATIS_PORTS_AVAILABLE) { |
| adminPort = determinePort( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT_DEFAULT); |
| serverPort = determinePort( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT_DEFAULT); |
| } else { |
| adminPort = clientPort; |
| serverPort = clientPort; |
| } |
| } |
| |
| private int determinePort(String key, int defaultValue) { |
| boolean randomPort = conf.getBoolean( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT); |
| return randomPort ? 0 : conf.getInt(key, defaultValue); |
| } |
| |
| private ContainerStateMachine getStateMachine(RaftGroupId gid) { |
| return new ContainerStateMachine(gid, dispatcher, containerController, |
| chunkExecutors, this, conf); |
| } |
| |
| private RaftProperties newRaftProperties() { |
| final RaftProperties properties = new RaftProperties(); |
| |
| // Set rpc type |
| final RpcType rpc = setRpcType(properties); |
| |
| // set raft segment size |
| setRaftSegmentAndWriteBufferSize(properties); |
| |
| // set raft segment pre-allocated size |
| final int raftSegmentPreallocatedSize = |
| setRaftSegmentPreallocatedSize(properties); |
| |
| TimeUnit timeUnit; |
| long duration; |
| |
| // set the configs enable and set the stateMachineData sync timeout |
| RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); |
| timeUnit = OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT, |
| OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration dataSyncTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.Log.StateMachineData |
| .setSyncTimeout(properties, dataSyncTimeout); |
| // typically a pipeline close will be initiated after a node failure |
| // timeout from Ratis in case a follower does not respond. |
| // By this time, all the writeStateMachine calls should be stopped |
| // and IOs should fail. |
| // Even if the leader is not able to complete write calls within |
| // the timeout seconds, it should just fail the operation and trigger |
| // pipeline close. failing the writeStateMachine call with limited retries |
| // will ensure even the leader initiates a pipeline close if its not |
| // able to complete write in the timeout configured. |
| |
| // NOTE : the default value for the retry count in ratis is -1, |
| // which means retry indefinitely. |
| RaftServerConfigKeys.Log.StateMachineData |
| .setSyncTimeoutRetry(properties, (int) nodeFailureTimeoutMs / |
| dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS)); |
| |
| // set timeout for a retry cache entry |
| setTimeoutForRetryCache(properties); |
| |
| // Set the ratis leader election timeout |
| setRatisLeaderElectionTimeout(properties); |
| |
| // Set the maximum cache segments |
| RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2); |
| |
| // Disable the pre vote feature in Ratis |
| RaftServerConfigKeys.LeaderElection.setPreVote(properties, false); |
| |
| // Set the ratis storage directory |
| Collection<String> storageDirPaths = |
| HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); |
| List<File> storageDirs= new ArrayList<>(storageDirPaths.size()); |
| storageDirPaths.stream().forEach(d -> storageDirs.add(new File(d))); |
| |
| RaftServerConfigKeys.setStorageDir(properties, storageDirs); |
| |
| // For grpc set the maximum message size |
| GrpcConfigKeys.setMessageSizeMax(properties, |
| SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE |
| + raftSegmentPreallocatedSize)); |
| |
| // Set the ratis port number |
| if (rpc == SupportedRpcType.GRPC) { |
| GrpcConfigKeys.Admin.setPort(properties, adminPort); |
| GrpcConfigKeys.Client.setPort(properties, clientPort); |
| GrpcConfigKeys.Server.setPort(properties, serverPort); |
| } else if (rpc == SupportedRpcType.NETTY) { |
| NettyConfigKeys.Server.setPort(properties, serverPort); |
| } |
| |
| long snapshotThreshold = |
| conf.getLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, |
| OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT); |
| RaftServerConfigKeys.Snapshot. |
| setAutoTriggerEnabled(properties, true); |
| RaftServerConfigKeys.Snapshot. |
| setAutoTriggerThreshold(properties, snapshotThreshold); |
| |
| // Set the limit on num/ bytes of pending requests a Ratis leader can hold |
| setPendingRequestsLimits(properties); |
| |
| int logQueueNumElements = |
| conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT); |
| final int logQueueByteLimit = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.setQueueElementLimit( |
| properties, logQueueNumElements); |
| RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit); |
| |
| int numSyncRetries = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES, |
| OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT); |
| RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, |
| numSyncRetries); |
| |
| // Enable the StateMachineCaching |
| RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled( |
| properties, true); |
| |
| RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, |
| false); |
| |
| int purgeGap = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT); |
| RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap); |
| |
| //Set the number of Snapshots Retained. |
| RatisServerConfiguration ratisServerConfiguration = |
| conf.getObject(RatisServerConfiguration.class); |
| int numSnapshotsRetained = |
| ratisServerConfiguration.getNumSnapshotsRetained(); |
| RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, |
| numSnapshotsRetained); |
| |
| // Set properties starting with prefix raft.server |
| RatisHelper.createRaftServerProperties(conf, properties); |
| |
| return properties; |
| } |
| |
| private void setRatisLeaderElectionTimeout(RaftProperties properties) { |
| long duration; |
| TimeUnit leaderElectionMinTimeoutUnit = |
| OzoneConfigKeys. |
| DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, |
| OzoneConfigKeys. |
| DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT |
| .getDuration(), leaderElectionMinTimeoutUnit); |
| final TimeDuration leaderElectionMinTimeout = |
| TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit); |
| RaftServerConfigKeys.Rpc |
| .setTimeoutMin(properties, leaderElectionMinTimeout); |
| long leaderElectionMaxTimeout = |
| leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200; |
| RaftServerConfigKeys.Rpc.setTimeoutMax(properties, |
| TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); |
| } |
| |
| private void setTimeoutForRetryCache(RaftProperties properties) { |
| TimeUnit timeUnit; |
| long duration; |
| timeUnit = |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY, |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration retryCacheTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.RetryCache |
| .setExpiryTime(properties, retryCacheTimeout); |
| } |
| |
| private int setRaftSegmentPreallocatedSize(RaftProperties properties) { |
| final int raftSegmentPreallocatedSize = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| int logAppenderQueueNumElements = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS, |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT); |
| final int logAppenderQueueByteLimit = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.Appender |
| .setBufferElementLimit(properties, logAppenderQueueNumElements); |
| RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, |
| SizeInBytes.valueOf(logAppenderQueueByteLimit)); |
| RaftServerConfigKeys.Log.setPreallocatedSize(properties, |
| SizeInBytes.valueOf(raftSegmentPreallocatedSize)); |
| return raftSegmentPreallocatedSize; |
| } |
| |
| private void setRaftSegmentAndWriteBufferSize(RaftProperties properties) { |
| final int raftSegmentSize = (int)conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.setSegmentSizeMax(properties, |
| SizeInBytes.valueOf(raftSegmentSize)); |
| RaftServerConfigKeys.Log.setWriteBufferSize(properties, |
| SizeInBytes.valueOf(raftSegmentSize)); |
| } |
| |
| private RpcType setRpcType(RaftProperties properties) { |
| final String rpcType = conf.get( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); |
| final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); |
| RaftConfigKeys.Rpc.setType(properties, rpc); |
| return rpc; |
| } |
| |
| private void setPendingRequestsLimits(RaftProperties properties) { |
| |
| final int pendingRequestsByteLimit = (int)conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Write.setByteLimit(properties, |
| SizeInBytes.valueOf(pendingRequestsByteLimit)); |
| } |
| |
| public static XceiverServerRatis newXceiverServerRatis( |
| DatanodeDetails datanodeDetails, ConfigurationSource ozoneConf, |
| ContainerDispatcher dispatcher, ContainerController containerController, |
| CertificateClient caClient, StateContext context) throws IOException { |
| Parameters parameters = createTlsParameters( |
| new SecurityConfig(ozoneConf), caClient); |
| |
| return new XceiverServerRatis(datanodeDetails, dispatcher, |
| containerController, context, ozoneConf, parameters); |
| } |
| |
| // For gRPC server running DN container service with gPRC TLS |
| // In summary: |
| // authenticate from server to client is via TLS. |
| // authenticate from client to server is via block token (or container token). |
| // DN Ratis server act as both SSL client and server and we must pass TLS |
| // configuration for both. |
| private static Parameters createTlsParameters(SecurityConfig conf, |
| CertificateClient caClient) throws IOException { |
| Parameters parameters = new Parameters(); |
| |
| if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) { |
| List<X509Certificate> caList = HAUtils.buildCAX509List(caClient, |
| conf.getConfiguration()); |
| GrpcTlsConfig serverConfig = new GrpcTlsConfig( |
| caClient.getPrivateKey(), caClient.getCertificate(), |
| caList, true); |
| GrpcConfigKeys.Server.setTlsConf(parameters, serverConfig); |
| GrpcConfigKeys.Admin.setTlsConf(parameters, serverConfig); |
| |
| GrpcTlsConfig clientConfig = new GrpcTlsConfig( |
| caClient.getPrivateKey(), caClient.getCertificate(), |
| caList, false); |
| GrpcConfigKeys.Client.setTlsConf(parameters, clientConfig); |
| } |
| |
| return parameters; |
| } |
| |
| @Override |
| public void start() throws IOException { |
| if (!isStarted) { |
| LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId()); |
| for (ThreadPoolExecutor executor : chunkExecutors) { |
| executor.prestartAllCoreThreads(); |
| } |
| server.start(); |
| |
| RaftServerRpc serverRpc = server.getServerRpc(); |
| clientPort = getRealPort(serverRpc.getClientServerAddress(), |
| Port.Name.RATIS); |
| adminPort = getRealPort(serverRpc.getAdminServerAddress(), |
| Port.Name.RATIS_ADMIN); |
| serverPort = getRealPort(serverRpc.getInetSocketAddress(), |
| Port.Name.RATIS_SERVER); |
| |
| isStarted = true; |
| } |
| } |
| |
| private int getRealPort(InetSocketAddress address, Port.Name name) { |
| int realPort = address.getPort(); |
| datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort)); |
| LOG.info("{} {} is started using port {} for {}", |
| getClass().getSimpleName(), server.getId(), realPort, name); |
| return realPort; |
| } |
| |
| @Override |
| public void stop() { |
| if (isStarted) { |
| try { |
| // shutdown server before the executors as while shutting down, |
| // some of the tasks would be executed using the executors. |
| server.close(); |
| for (ExecutorService executor : chunkExecutors) { |
| executor.shutdown(); |
| } |
| isStarted = false; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public int getIPCPort() { |
| return clientPort; |
| } |
| |
| /** |
| * Returns the Replication type supported by this end-point. |
| * |
| * @return enum -- {Stand_Alone, Ratis, Chained} |
| */ |
| @Override |
| public HddsProtos.ReplicationType getServerType() { |
| return HddsProtos.ReplicationType.RATIS; |
| } |
| |
| @VisibleForTesting |
| public RaftServer getServer() { |
| return server; |
| } |
| |
| public RaftServer.Division getServerDivision() throws IOException { |
| return getServerDivision(server.getGroupIds().iterator().next()); |
| } |
| |
| public RaftServer.Division getServerDivision(RaftGroupId id) |
| throws IOException { |
| return server.getDivision(id); |
| } |
| |
| private void processReply(RaftClientReply reply) throws IOException { |
| // NotLeader exception is thrown only when the raft server to which the |
| // request is submitted is not the leader. The request will be rejected |
| // and will eventually be executed once the request comes via the leader |
| // node. |
| NotLeaderException notLeaderException = reply.getNotLeaderException(); |
| if (notLeaderException != null) { |
| throw notLeaderException; |
| } |
| StateMachineException stateMachineException = |
| reply.getStateMachineException(); |
| if (stateMachineException != null) { |
| throw stateMachineException; |
| } |
| } |
| |
| @Override |
| public void submitRequest(ContainerCommandRequestProto request, |
| HddsProtos.PipelineID pipelineID) throws IOException { |
| RaftClientReply reply; |
| Span span = TracingUtil |
| .importAndCreateSpan( |
| "XceiverServerRatis." + request.getCmdType().name(), |
| request.getTraceID()); |
| try (Scope scope = GlobalTracer.get().activateSpan(span)) { |
| |
| RaftClientRequest raftClientRequest = |
| createRaftClientRequest(request, pipelineID, |
| RaftClientRequest.writeRequestType()); |
| try { |
| reply = server.submitClientRequestAsync(raftClientRequest) |
| .get(requestTimeout, TimeUnit.MILLISECONDS); |
| } catch (Exception e) { |
| throw new IOException(e.getMessage(), e); |
| } |
| processReply(reply); |
| } finally { |
| span.finish(); |
| } |
| } |
| |
| private void initializeRatisVolumeMap() throws IOException { |
| ratisVolumeMap = new EnumMap<>(StorageType.class); |
| Collection<String> rawLocations = HddsServerUtil. |
| getOzoneDatanodeRatisDirectory(conf); |
| |
| for (String locationString : rawLocations) { |
| try { |
| StorageLocation location = StorageLocation.parse(locationString); |
| StorageType type = location.getStorageType(); |
| ratisVolumeMap.computeIfAbsent(type, k -> new ArrayList<String>(1)); |
| ratisVolumeMap.get(location.getStorageType()). |
| add(location.getUri().getPath()); |
| |
| } catch (IOException e) { |
| LOG.error("Failed to parse the storage location: " + |
| locationString, e); |
| } |
| } |
| } |
| |
| @Override |
| public List<MetadataStorageReportProto> getStorageReport() |
| throws IOException { |
| List<MetadataStorageReportProto> reportProto = new ArrayList<>(); |
| for (StorageType storageType : ratisVolumeMap.keySet()) { |
| for (String path : ratisVolumeMap.get(storageType)) { |
| MetadataStorageReportProto.Builder builder = MetadataStorageReportProto. |
| newBuilder(); |
| builder.setStorageLocation(path); |
| builder.setStorageType(StorageLocationReport. |
| getStorageTypeProto(storageType)); |
| reportProto.add(builder.build()); |
| } |
| } |
| return reportProto; |
| } |
| |
| private RaftClientRequest createRaftClientRequest( |
| ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID, |
| RaftClientRequest.Type type) { |
| return RaftClientRequest.newBuilder() |
| .setClientId(clientId) |
| .setServerId(server.getId()) |
| .setGroupId( |
| RaftGroupId.valueOf( |
| PipelineID.getFromProtobuf(pipelineID).getId())) |
| .setCallId(nextCallId()) |
| .setMessage(ContainerCommandRequestMessage.toMessage(request, null)) |
| .setType(type) |
| .build(); |
| } |
| |
| private GroupInfoRequest createGroupInfoRequest( |
| HddsProtos.PipelineID pipelineID) { |
| return new GroupInfoRequest(clientId, server.getId(), |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), |
| nextCallId()); |
| } |
| |
| private void handlePipelineFailure(RaftGroupId groupId, |
| RoleInfoProto roleInfoProto) { |
| String msg; |
| UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf()); |
| RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); |
| switch (roleInfoProto.getRole()) { |
| case CANDIDATE: |
| msg = datanode + " is in candidate state for " + |
| roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms"; |
| break; |
| case FOLLOWER: |
| msg = datanode + " closes pipeline when installSnapshot from leader " + |
| "because leader snapshot doesn't contain any data to replay, " + |
| "all the log entries prior to the snapshot might have been purged." + |
| "So follower should not try to install snapshot from leader but" + |
| "can close the pipeline here. It's in follower state for " + |
| roleInfoProto.getRoleElapsedTimeMs() + "ms"; |
| break; |
| case LEADER: |
| StringBuilder sb = new StringBuilder(); |
| sb.append(datanode).append(" has not seen follower/s"); |
| for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo() |
| .getFollowerInfoList()) { |
| if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) { |
| sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId())) |
| .append(" for ").append(follower.getLastRpcElapsedTimeMs()) |
| .append("ms"); |
| } |
| } |
| msg = sb.toString(); |
| break; |
| default: |
| LOG.error("unknown state: {}", roleInfoProto.getRole()); |
| throw new IllegalStateException("node" + id + " is in illegal role " |
| + roleInfoProto.getRole()); |
| } |
| |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.PIPELINE_FAILED, false); |
| } |
| |
| private void triggerPipelineClose(RaftGroupId groupId, String detail, |
| ClosePipelineInfo.Reason reasonCode, boolean triggerHB) { |
| PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); |
| ClosePipelineInfo.Builder closePipelineInfo = |
| ClosePipelineInfo.newBuilder() |
| .setPipelineID(pipelineID.getProtobuf()) |
| .setReason(reasonCode) |
| .setDetailedReason(detail); |
| |
| PipelineAction action = PipelineAction.newBuilder() |
| .setClosePipeline(closePipelineInfo) |
| .setAction(PipelineAction.Action.CLOSE) |
| .build(); |
| context.addPipelineActionIfAbsent(action); |
| // wait for the next HB timeout or right away? |
| if (triggerHB) { |
| context.getParent().triggerHeartbeat(); |
| } |
| LOG.error("pipeline Action {} on pipeline {}.Reason : {}", |
| action.getAction(), pipelineID, |
| action.getClosePipeline().getDetailedReason()); |
| } |
| |
| @Override |
| public boolean isExist(HddsProtos.PipelineID pipelineId) { |
| return raftGids.contains( |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId())); |
| } |
| |
| private long calculatePipelineBytesWritten(HddsProtos.PipelineID pipelineID) { |
| long bytesWritten = 0; |
| Iterator<org.apache.hadoop.ozone.container.common.interfaces.Container<?>> |
| containerIt = containerController.getContainers(); |
| while(containerIt.hasNext()) { |
| ContainerData containerData = containerIt.next().getContainerData(); |
| if (containerData.getOriginPipelineId() |
| .compareTo(pipelineID.getId()) == 0) { |
| bytesWritten += containerData.getWriteBytes(); |
| } |
| } |
| return bytesWritten; |
| } |
| |
| @Override |
| public List<PipelineReport> getPipelineReport() { |
| try { |
| Iterable<RaftGroupId> gids = server.getGroupIds(); |
| List<PipelineReport> reports = new ArrayList<>(); |
| for (RaftGroupId groupId : gids) { |
| HddsProtos.PipelineID pipelineID = PipelineID |
| .valueOf(groupId.getUuid()).getProtobuf(); |
| reports.add(PipelineReport.newBuilder() |
| .setPipelineID(pipelineID) |
| .setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE)) |
| .setBytesWritten(calculatePipelineBytesWritten(pipelineID)) |
| .build()); |
| } |
| return reports; |
| } catch (Exception e) { |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| public List<PipelineID> getPipelineIds() { |
| Iterable<RaftGroupId> gids = server.getGroupIds(); |
| List<PipelineID> pipelineIDs = new ArrayList<>(); |
| for (RaftGroupId groupId : gids) { |
| pipelineIDs.add(PipelineID.valueOf(groupId.getUuid())); |
| LOG.info("pipeline id {}", PipelineID.valueOf(groupId.getUuid())); |
| } |
| return pipelineIDs; |
| } |
| |
| @Override |
| public void addGroup(HddsProtos.PipelineID pipelineId, |
| List<DatanodeDetails> peers) throws IOException { |
| if (peers.size() == getDefaultPriorityList().size()) { |
| addGroup(pipelineId, peers, getDefaultPriorityList()); |
| } else { |
| addGroup(pipelineId, peers, |
| new ArrayList<>(Collections.nCopies(peers.size(), 0))); |
| } |
| } |
| |
| @Override |
| public void addGroup(HddsProtos.PipelineID pipelineId, |
| List<DatanodeDetails> peers, |
| List<Integer> priorityList) throws IOException { |
| final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId); |
| final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId()); |
| final RaftGroup group = |
| RatisHelper.newRaftGroup(groupId, peers, priorityList); |
| GroupManagementRequest request = GroupManagementRequest.newAdd( |
| clientId, server.getId(), nextCallId(), group); |
| |
| RaftClientReply reply; |
| LOG.debug("Received addGroup request for pipeline {}", pipelineID); |
| |
| try { |
| reply = server.groupManagement(request); |
| } catch (Exception e) { |
| throw new IOException(e.getMessage(), e); |
| } |
| processReply(reply); |
| LOG.info("Created group {}", pipelineID); |
| } |
| |
| @Override |
| public void removeGroup(HddsProtos.PipelineID pipelineId) |
| throws IOException { |
| // if shouldDeleteRatisLogDirectory is set to false, the raft log |
| // directory will be renamed and kept aside for debugging. |
| // In case, its set to true, the raft log directory will be removed |
| GroupManagementRequest request = GroupManagementRequest.newRemove( |
| clientId, server.getId(), nextCallId(), |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()), |
| shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory); |
| |
| RaftClientReply reply; |
| try { |
| reply = server.groupManagement(request); |
| } catch (Exception e) { |
| throw new IOException(e.getMessage(), e); |
| } |
| processReply(reply); |
| } |
| |
| void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) { |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| void handleApplyTransactionFailure(RaftGroupId groupId, |
| RaftProtos.RaftPeerRole role) { |
| UUID dnId = RatisHelper.toDatanodeId(getServer().getId()); |
| String msg = |
| "Ratis Transaction failure in datanode " + dnId + " with role " + role |
| + " .Triggering pipeline close action."; |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true); |
| } |
| /** |
| * The fact that the snapshot contents cannot be used to actually catch up |
| * the follower, it is the reason to initiate close pipeline and |
| * not install the snapshot. The follower will basically never be able to |
| * catch up. |
| * |
| * @param groupId raft group information |
| * @param roleInfoProto information about the current node role and |
| * rpc delay information. |
| * @param firstTermIndexInLog After the snapshot installation is complete, |
| * return the last included term index in the snapshot. |
| */ |
| void handleInstallSnapshotFromLeader(RaftGroupId groupId, |
| RoleInfoProto roleInfoProto, |
| TermIndex firstTermIndexInLog) { |
| LOG.warn("Install snapshot notification received from Leader with " + |
| "termIndex: {}, terminating pipeline: {}", |
| firstTermIndexInLog, groupId); |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| /** |
| * Notify the Datanode Ratis endpoint of Ratis log failure. |
| * Expected to be invoked from the Container StateMachine |
| * @param groupId the Ratis group/pipeline for which log has failed |
| * @param t exception encountered at the time of the failure |
| * |
| */ |
| @VisibleForTesting |
| public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) { |
| String msg = (t == null) ? "Unspecified failure reported in Ratis log" |
| : t.getMessage(); |
| |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true); |
| } |
| |
| public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { |
| Long minIndex; |
| GroupInfoReply reply = getServer() |
| .getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf())); |
| minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos()); |
| return minIndex == null ? -1 : minIndex.longValue(); |
| } |
| |
| void notifyGroupRemove(RaftGroupId gid) { |
| raftGids.remove(gid); |
| // Remove any entries for group leader map |
| groupLeaderMap.remove(gid); |
| } |
| |
| void notifyGroupAdd(RaftGroupId gid) { |
| raftGids.add(gid); |
| sendPipelineReport(); |
| } |
| |
| void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, |
| RaftPeerId raftPeerId1) { |
| LOG.info("Leader change notification received for group: {} with new " + |
| "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1); |
| // Save the reported leader to be sent with the report to SCM |
| boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); |
| groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup); |
| if (context != null && leaderForGroup) { |
| // Publish new report from leader |
| sendPipelineReport(); |
| } |
| } |
| |
| private void sendPipelineReport() { |
| if (context != null) { |
| // TODO: Send IncrementalPipelineReport instead of full PipelineReport |
| context.addReport(context.getParent().getContainer().getPipelineReport()); |
| context.getParent().triggerHeartbeat(); |
| } |
| } |
| |
| private static List<ThreadPoolExecutor> createChunkExecutors( |
| ConfigurationSource conf) { |
| // TODO create single pool with N threads if using non-incremental chunks |
| final int threadCountPerDisk = conf.getInt( |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_KEY, |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_DEFAULT); |
| |
| final int numberOfDisks = |
| MutableVolumeSet.getDatanodeStorageDirs(conf).size(); |
| |
| ThreadPoolExecutor[] executors = |
| new ThreadPoolExecutor[threadCountPerDisk * numberOfDisks]; |
| for (int i = 0; i < executors.length; i++) { |
| ThreadFactory threadFactory = new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat("ChunkWriter-" + i + "-%d") |
| .build(); |
| BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>(); |
| executors[i] = new ThreadPoolExecutor(1, 1, |
| 0, TimeUnit.SECONDS, workQueue, threadFactory); |
| } |
| return ImmutableList.copyOf(executors); |
| } |
| |
| /** |
| * @return list of default priority |
| */ |
| public static List<Integer> getDefaultPriorityList() { |
| return DEFAULT_PRIORITY_LIST; |
| } |
| } |