| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.common.transport.server.ratis; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.hadoop.conf.StorageUnit; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; |
| import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; |
| import org.apache.hadoop.hdds.scm.HddsServerUtil; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.security.x509.SecurityConfig; |
| import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; |
| import org.apache.hadoop.hdds.tracing.TracingUtil; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| |
| import org.apache.hadoop.ozone.OzoneConsts; |
| import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; |
| import org.apache.hadoop.ozone.container.common.statemachine.StateContext; |
| import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; |
| |
| import io.opentracing.Scope; |
| import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; |
| import org.apache.ratis.RaftConfigKeys; |
| import org.apache.hadoop.hdds.ratis.RatisHelper; |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.grpc.GrpcConfigKeys; |
| import org.apache.ratis.grpc.GrpcFactory; |
| import org.apache.ratis.grpc.GrpcTlsConfig; |
| import org.apache.ratis.netty.NettyConfigKeys; |
| import org.apache.ratis.protocol.*; |
| import org.apache.ratis.rpc.RpcType; |
| import org.apache.ratis.rpc.SupportedRpcType; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.proto.RaftProtos; |
| import org.apache.ratis.proto.RaftProtos.RoleInfoProto; |
| import org.apache.ratis.proto.RaftProtos.ReplicationLevel; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.impl.RaftServerProxy; |
| import org.apache.ratis.util.SizeInBytes; |
| import org.apache.ratis.util.TimeDuration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Collections; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.ArrayList; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * Creates a ratis server endpoint that acts as the communication layer for |
| * Ozone containers. |
| */ |
| public final class XceiverServerRatis extends XceiverServer { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(XceiverServerRatis.class); |
| private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); |
| |
| private static long nextCallId() { |
| return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; |
| } |
| |
| private int port; |
| private final RaftServer server; |
| private ThreadPoolExecutor chunkExecutor; |
| private final ContainerDispatcher dispatcher; |
| private final ContainerController containerController; |
| private ClientId clientId = ClientId.randomId(); |
| private final StateContext context; |
| private final ReplicationLevel replicationLevel; |
| private long nodeFailureTimeoutMs; |
| private final long cacheEntryExpiryInteval; |
| private boolean isStarted = false; |
| private DatanodeDetails datanodeDetails; |
| private final OzoneConfiguration conf; |
| // TODO: Remove the gids set when Ratis supports an api to query active |
| // pipelines |
| private final Set<RaftGroupId> raftGids = new HashSet<>(); |
| |
| @SuppressWarnings("parameternumber") |
| private XceiverServerRatis(DatanodeDetails dd, int port, |
| ContainerDispatcher dispatcher, ContainerController containerController, |
| StateContext context, GrpcTlsConfig tlsConfig, CertificateClient caClient, |
| OzoneConfiguration conf) |
| throws IOException { |
| super(conf, caClient); |
| this.conf = conf; |
| Objects.requireNonNull(dd, "id == null"); |
| datanodeDetails = dd; |
| this.port = port; |
| RaftProperties serverProperties = newRaftProperties(); |
| final int numWriteChunkThreads = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); |
| chunkExecutor = |
| new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, |
| 100, TimeUnit.SECONDS, |
| new ArrayBlockingQueue<>(1024), |
| new ThreadPoolExecutor.CallerRunsPolicy()); |
| this.context = context; |
| this.replicationLevel = |
| conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); |
| cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL, |
| OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| this.dispatcher = dispatcher; |
| this.containerController = containerController; |
| |
| RaftServer.Builder builder = |
| RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd)) |
| .setProperties(serverProperties) |
| .setStateMachineRegistry(this::getStateMachine); |
| if (tlsConfig != null) { |
| builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig)); |
| } |
| this.server = builder.build(); |
| } |
| |
| private ContainerStateMachine getStateMachine(RaftGroupId gid) { |
| return new ContainerStateMachine(gid, dispatcher, containerController, |
| chunkExecutor, this, cacheEntryExpiryInteval, |
| getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier(), |
| conf); |
| } |
| |
| private RaftProperties newRaftProperties() { |
| final RaftProperties properties = new RaftProperties(); |
| |
| // Set rpc type |
| final RpcType rpc = setRpcType(properties); |
| |
| // set raft segment size |
| setRaftSegmentSize(properties); |
| |
| // set raft segment pre-allocated size |
| final int raftSegmentPreallocatedSize = |
| setRaftSegmentPreallocatedSize(properties); |
| |
| // Set max write buffer size, which is the scm chunk size |
| final int maxChunkSize = setMaxWriteBuffer(properties); |
| TimeUnit timeUnit; |
| long duration; |
| |
| // set the configs enable and set the stateMachineData sync timeout |
| RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); |
| timeUnit = OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT, |
| OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration dataSyncTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.Log.StateMachineData |
| .setSyncTimeout(properties, dataSyncTimeout); |
| |
| // Set the server Request timeout |
| setServerRequestTimeout(properties); |
| |
| // set timeout for a retry cache entry |
| setTimeoutForRetryCache(properties); |
| |
| // Set the ratis leader election timeout |
| setRatisLeaderElectionTimeout(properties); |
| |
| // Set the maximum cache segments |
| RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); |
| |
| // set the node failure timeout |
| setNodeFailureTimeout(properties); |
| |
| // Set the ratis storage directory |
| String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); |
| RaftServerConfigKeys.setStorageDirs(properties, |
| Collections.singletonList(new File(storageDir))); |
| |
| // For grpc set the maximum message size |
| GrpcConfigKeys.setMessageSizeMax(properties, |
| SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize)); |
| |
| // Set the ratis port number |
| if (rpc == SupportedRpcType.GRPC) { |
| GrpcConfigKeys.Server.setPort(properties, port); |
| } else if (rpc == SupportedRpcType.NETTY) { |
| NettyConfigKeys.Server.setPort(properties, port); |
| } |
| |
| long snapshotThreshold = |
| conf.getLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, |
| OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT); |
| RaftServerConfigKeys.Snapshot. |
| setAutoTriggerEnabled(properties, true); |
| RaftServerConfigKeys.Snapshot. |
| setAutoTriggerThreshold(properties, snapshotThreshold); |
| int maxPendingRequets = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS_DEFAULT |
| ); |
| RaftServerConfigKeys.Write.setElementLimit(properties, maxPendingRequets); |
| int logQueueNumElements = |
| conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT); |
| final int logQueueByteLimit = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.setQueueElementLimit( |
| properties, logQueueNumElements); |
| RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit); |
| |
| int numSyncRetries = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES, |
| OzoneConfigKeys. |
| DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT); |
| RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, |
| numSyncRetries); |
| |
| // Enable the StateMachineCaching |
| RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled( |
| properties, true); |
| |
| RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, |
| false); |
| |
| int purgeGap = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT); |
| RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap); |
| |
| //Set the number of Snapshots Retained. |
| RatisServerConfiguration ratisServerConfiguration = |
| conf.getObject(RatisServerConfiguration.class); |
| int numSnapshotsRetained = |
| ratisServerConfiguration.getNumSnapshotsRetained(); |
| RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, |
| numSnapshotsRetained); |
| return properties; |
| } |
| |
| private void setNodeFailureTimeout(RaftProperties properties) { |
| TimeUnit timeUnit; |
| long duration; |
| timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, |
| OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration nodeFailureTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties, |
| nodeFailureTimeout); |
| RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, |
| nodeFailureTimeout); |
| nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); |
| } |
| |
| private void setRatisLeaderElectionTimeout(RaftProperties properties) { |
| long duration; |
| TimeUnit leaderElectionMinTimeoutUnit = |
| OzoneConfigKeys. |
| DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, |
| OzoneConfigKeys. |
| DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT |
| .getDuration(), leaderElectionMinTimeoutUnit); |
| final TimeDuration leaderElectionMinTimeout = |
| TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit); |
| RaftServerConfigKeys.Rpc |
| .setTimeoutMin(properties, leaderElectionMinTimeout); |
| long leaderElectionMaxTimeout = |
| leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200; |
| RaftServerConfigKeys.Rpc.setTimeoutMax(properties, |
| TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); |
| } |
| |
| private void setTimeoutForRetryCache(RaftProperties properties) { |
| TimeUnit timeUnit; |
| long duration; |
| timeUnit = |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY, |
| OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration retryCacheTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.RetryCache |
| .setExpiryTime(properties, retryCacheTimeout); |
| } |
| |
| private void setServerRequestTimeout(RaftProperties properties) { |
| TimeUnit timeUnit; |
| long duration; |
| timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT |
| .getUnit(); |
| duration = conf.getTimeDuration( |
| OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY, |
| OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT |
| .getDuration(), timeUnit); |
| final TimeDuration serverRequestTimeout = |
| TimeDuration.valueOf(duration, timeUnit); |
| RaftServerConfigKeys.Rpc |
| .setRequestTimeout(properties, serverRequestTimeout); |
| } |
| |
| private int setMaxWriteBuffer(RaftProperties properties) { |
| final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE; |
| RaftServerConfigKeys.Log.setWriteBufferSize(properties, |
| SizeInBytes.valueOf(maxChunkSize)); |
| return maxChunkSize; |
| } |
| |
| private int setRaftSegmentPreallocatedSize(RaftProperties properties) { |
| final int raftSegmentPreallocatedSize = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| int logAppenderQueueNumElements = conf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS, |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT); |
| final int logAppenderQueueByteLimit = (int) conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, |
| OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.Appender |
| .setBufferElementLimit(properties, logAppenderQueueNumElements); |
| RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, |
| SizeInBytes.valueOf(logAppenderQueueByteLimit)); |
| RaftServerConfigKeys.Log.setPreallocatedSize(properties, |
| SizeInBytes.valueOf(raftSegmentPreallocatedSize)); |
| return raftSegmentPreallocatedSize; |
| } |
| |
| private void setRaftSegmentSize(RaftProperties properties) { |
| final int raftSegmentSize = (int)conf.getStorageSize( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| RaftServerConfigKeys.Log.setSegmentSizeMax(properties, |
| SizeInBytes.valueOf(raftSegmentSize)); |
| } |
| |
| private RpcType setRpcType(RaftProperties properties) { |
| final String rpcType = conf.get( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); |
| final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); |
| RaftConfigKeys.Rpc.setType(properties, rpc); |
| return rpc; |
| } |
| |
| public static XceiverServerRatis newXceiverServerRatis( |
| DatanodeDetails datanodeDetails, OzoneConfiguration ozoneConf, |
| ContainerDispatcher dispatcher, ContainerController containerController, |
| CertificateClient caClient, StateContext context) throws IOException { |
| int localPort = ozoneConf.getInt( |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); |
| |
| // Get an available port on current node and |
| // use that as the container port |
| if (ozoneConf.getBoolean(OzoneConfigKeys |
| .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, |
| OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { |
| localPort = 0; |
| } |
| GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN( |
| new SecurityConfig(ozoneConf), caClient); |
| |
| return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, |
| containerController, context, tlsConfig, caClient, ozoneConf); |
| } |
| |
| @Override |
| public void start() throws IOException { |
| if (!isStarted) { |
| LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), |
| server.getId(), getIPCPort()); |
| chunkExecutor.prestartAllCoreThreads(); |
| server.start(); |
| |
| int realPort = |
| ((RaftServerProxy) server).getServerRpc().getInetSocketAddress() |
| .getPort(); |
| |
| if (port == 0) { |
| LOG.info("{} {} is started using port {}", getClass().getSimpleName(), |
| server.getId(), realPort); |
| port = realPort; |
| } |
| |
| //register the real port to the datanode details. |
| datanodeDetails.setPort(DatanodeDetails |
| .newPort(DatanodeDetails.Port.Name.RATIS, |
| realPort)); |
| |
| isStarted = true; |
| } |
| } |
| |
| @Override |
| public void stop() { |
| if (isStarted) { |
| try { |
| // shutdown server before the executors as while shutting down, |
| // some of the tasks would be executed using the executors. |
| server.close(); |
| chunkExecutor.shutdown(); |
| isStarted = false; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public int getIPCPort() { |
| return port; |
| } |
| |
| /** |
| * Returns the Replication type supported by this end-point. |
| * |
| * @return enum -- {Stand_Alone, Ratis, Chained} |
| */ |
| @Override |
| public HddsProtos.ReplicationType getServerType() { |
| return HddsProtos.ReplicationType.RATIS; |
| } |
| |
| @VisibleForTesting |
| public RaftServer getServer() { |
| return server; |
| } |
| |
| private void processReply(RaftClientReply reply) throws IOException { |
| // NotLeader exception is thrown only when the raft server to which the |
| // request is submitted is not the leader. The request will be rejected |
| // and will eventually be executed once the request comes via the leader |
| // node. |
| NotLeaderException notLeaderException = reply.getNotLeaderException(); |
| if (notLeaderException != null) { |
| throw notLeaderException; |
| } |
| StateMachineException stateMachineException = |
| reply.getStateMachineException(); |
| if (stateMachineException != null) { |
| throw stateMachineException; |
| } |
| } |
| |
| @Override |
| public void submitRequest(ContainerCommandRequestProto request, |
| HddsProtos.PipelineID pipelineID) throws IOException { |
| super.submitRequest(request, pipelineID); |
| RaftClientReply reply; |
| try (Scope scope = TracingUtil |
| .importAndCreateScope( |
| "XceiverServerRatis." + request.getCmdType().name(), |
| request.getTraceID())) { |
| |
| RaftClientRequest raftClientRequest = |
| createRaftClientRequest(request, pipelineID, |
| RaftClientRequest.writeRequestType()); |
| try { |
| reply = server.submitClientRequestAsync(raftClientRequest).get(); |
| } catch (Exception e) { |
| throw new IOException(e.getMessage(), e); |
| } |
| processReply(reply); |
| } |
| } |
| |
| private RaftClientRequest createRaftClientRequest( |
| ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID, |
| RaftClientRequest.Type type) { |
| return new RaftClientRequest(clientId, server.getId(), |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), |
| nextCallId(), ContainerCommandRequestMessage.toMessage(request, null), |
| type, null); |
| } |
| |
| private GroupInfoRequest createGroupInfoRequest( |
| HddsProtos.PipelineID pipelineID) { |
| return new GroupInfoRequest(clientId, server.getId(), |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), |
| nextCallId()); |
| } |
| |
| private void handlePipelineFailure(RaftGroupId groupId, |
| RoleInfoProto roleInfoProto) { |
| String msg; |
| UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf()); |
| RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); |
| switch (roleInfoProto.getRole()) { |
| case CANDIDATE: |
| msg = datanode + " is in candidate state for " + |
| roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms"; |
| break; |
| case LEADER: |
| StringBuilder sb = new StringBuilder(); |
| sb.append(datanode).append(" has not seen follower/s"); |
| for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo() |
| .getFollowerInfoList()) { |
| if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) { |
| sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId())) |
| .append(" for ").append(follower.getLastRpcElapsedTimeMs()) |
| .append("ms"); |
| } |
| } |
| msg = sb.toString(); |
| break; |
| default: |
| LOG.error("unknown state:" + roleInfoProto.getRole()); |
| throw new IllegalStateException("node" + id + " is in illegal role " |
| + roleInfoProto.getRole()); |
| } |
| |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.PIPELINE_FAILED, false); |
| } |
| |
| private void triggerPipelineClose(RaftGroupId groupId, String detail, |
| ClosePipelineInfo.Reason reasonCode, boolean triggerHB) { |
| PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); |
| ClosePipelineInfo.Builder closePipelineInfo = |
| ClosePipelineInfo.newBuilder() |
| .setPipelineID(pipelineID.getProtobuf()) |
| .setReason(reasonCode) |
| .setDetailedReason(detail); |
| |
| PipelineAction action = PipelineAction.newBuilder() |
| .setClosePipeline(closePipelineInfo) |
| .setAction(PipelineAction.Action.CLOSE) |
| .build(); |
| context.addPipelineActionIfAbsent(action); |
| // wait for the next HB timeout or right away? |
| if (triggerHB) { |
| context.getParent().triggerHeartbeat(); |
| } |
| LOG.error( |
| "pipeline Action " + action.getAction() + " on pipeline " + pipelineID |
| + ".Reason : " + action.getClosePipeline().getDetailedReason()); |
| } |
| |
| @Override |
| public boolean isExist(HddsProtos.PipelineID pipelineId) { |
| return raftGids.contains( |
| RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId())); |
| } |
| |
| @Override |
| public List<PipelineReport> getPipelineReport() { |
| try { |
| Iterable<RaftGroupId> gids = server.getGroupIds(); |
| List<PipelineReport> reports = new ArrayList<>(); |
| for (RaftGroupId groupId : gids) { |
| reports.add(PipelineReport.newBuilder() |
| .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) |
| .build()); |
| } |
| return reports; |
| } catch (Exception e) { |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| public List<PipelineID> getPipelineIds() { |
| Iterable<RaftGroupId> gids = server.getGroupIds(); |
| List<PipelineID> pipelineIDs = new ArrayList<>(); |
| for (RaftGroupId groupId : gids) { |
| pipelineIDs.add(PipelineID.valueOf(groupId.getUuid())); |
| LOG.info("pipeline id {}", PipelineID.valueOf(groupId.getUuid())); |
| } |
| return pipelineIDs; |
| } |
| |
| void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) { |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| void handleApplyTransactionFailure(RaftGroupId groupId, |
| RaftProtos.RaftPeerRole role) { |
| UUID dnId = RatisHelper.toDatanodeId(getServer().getId()); |
| String msg = |
| "Ratis Transaction failure in datanode " + dnId + " with role " + role |
| + " .Triggering pipeline close action."; |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true); |
| } |
| /** |
| * The fact that the snapshot contents cannot be used to actually catch up |
| * the follower, it is the reason to initiate close pipeline and |
| * not install the snapshot. The follower will basically never be able to |
| * catch up. |
| * |
| * @param groupId raft group information |
| * @param roleInfoProto information about the current node role and |
| * rpc delay information. |
| * @param firstTermIndexInLog After the snapshot installation is complete, |
| * return the last included term index in the snapshot. |
| */ |
| void handleInstallSnapshotFromLeader(RaftGroupId groupId, |
| RoleInfoProto roleInfoProto, |
| TermIndex firstTermIndexInLog) { |
| LOG.warn("Install snapshot notification received from Leader with " + |
| "termIndex: {}, terminating pipeline: {}", |
| firstTermIndexInLog, groupId); |
| handlePipelineFailure(groupId, roleInfoProto); |
| } |
| |
| /** |
| * Notify the Datanode Ratis endpoint of Ratis log failure. |
| * Expected to be invoked from the Container StateMachine |
| * @param groupId the Ratis group/pipeline for which log has failed |
| * @param t exception encountered at the time of the failure |
| * |
| */ |
| @VisibleForTesting |
| public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) { |
| String msg = (t == null) ? "Unspecified failure reported in Ratis log" |
| : t.getMessage(); |
| |
| triggerPipelineClose(groupId, msg, |
| ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true); |
| } |
| |
| public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { |
| Long minIndex; |
| GroupInfoReply reply = getServer() |
| .getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf())); |
| minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos()); |
| return minIndex == null ? -1 : minIndex.longValue(); |
| } |
| |
| void notifyGroupRemove(RaftGroupId gid) { |
| raftGids.remove(gid); |
| } |
| |
| void notifyGroupAdd(RaftGroupId gid) { |
| raftGids.add(gid); |
| } |
| } |