| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm; |
| |
| import java.io.IOException; |
| import java.security.cert.X509Certificate; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.OptionalLong; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.hdds.HddsUtils; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; |
| import org.apache.hadoop.hdds.ratis.RatisHelper; |
| import org.apache.hadoop.hdds.scm.client.HddsClientUtils; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.security.x509.SecurityConfig; |
| import org.apache.hadoop.hdds.tracing.TracingUtil; |
| import org.apache.hadoop.util.Time; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Supplier; |
| import org.apache.ratis.client.RaftClient; |
| import org.apache.ratis.grpc.GrpcTlsConfig; |
| import org.apache.ratis.proto.RaftProtos; |
| import org.apache.ratis.protocol.GroupMismatchException; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftException; |
| import org.apache.ratis.retry.RetryPolicy; |
| import org.apache.ratis.rpc.RpcType; |
| import org.apache.ratis.rpc.SupportedRpcType; |
| import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An abstract implementation of {@link XceiverClientSpi} using Ratis. |
| * The underlying RPC mechanism can be chosen via the constructor. |
| */ |
| public final class XceiverClientRatis extends XceiverClientSpi { |
| public static final Logger LOG = |
| LoggerFactory.getLogger(XceiverClientRatis.class); |
| |
| public static XceiverClientRatis newXceiverClientRatis( |
| org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, |
| ConfigurationSource ozoneConf) { |
| return newXceiverClientRatis(pipeline, ozoneConf, null); |
| } |
| |
| public static XceiverClientRatis newXceiverClientRatis( |
| org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, |
| ConfigurationSource ozoneConf, X509Certificate caCert) { |
| final String rpcType = ozoneConf |
| .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, |
| ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); |
| final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); |
| final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new |
| SecurityConfig(ozoneConf), caCert); |
| return new XceiverClientRatis(pipeline, |
| SupportedRpcType.valueOfIgnoreCase(rpcType), |
| retryPolicy, tlsConfig, ozoneConf); |
| } |
| |
| private final Pipeline pipeline; |
| private final RpcType rpcType; |
| private final AtomicReference<RaftClient> client = new AtomicReference<>(); |
| private final RetryPolicy retryPolicy; |
| private final GrpcTlsConfig tlsConfig; |
| private final ConfigurationSource ozoneConfiguration; |
| |
| // Map to track commit index at every server |
| private final ConcurrentHashMap<UUID, Long> commitInfoMap; |
| |
| private XceiverClientMetrics metrics; |
| |
| /** |
| * Constructs a client. |
| */ |
| private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, |
| RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, |
| ConfigurationSource configuration) { |
| super(); |
| this.pipeline = pipeline; |
| this.rpcType = rpcType; |
| this.retryPolicy = retryPolicy; |
| commitInfoMap = new ConcurrentHashMap<>(); |
| this.tlsConfig = tlsConfig; |
| metrics = XceiverClientManager.getXceiverClientMetrics(); |
| this.ozoneConfiguration = configuration; |
| } |
| |
| private void updateCommitInfosMap( |
| Collection<RaftProtos.CommitInfoProto> commitInfoProtos) { |
| // if the commitInfo map is empty, just update the commit indexes for each |
| // of the servers |
| if (commitInfoMap.isEmpty()) { |
| commitInfoProtos.forEach(proto -> commitInfoMap |
| .put(RatisHelper.toDatanodeId(proto.getServer()), |
| proto.getCommitIndex())); |
| // In case the commit is happening 2 way, just update the commitIndex |
| // for the servers which have been successfully updating the commit |
| // indexes. This is important because getReplicatedMinCommitIndex() |
| // should always return the min commit index out of the nodes which have |
| // been replicating data successfully. |
| } else { |
| commitInfoProtos.forEach(proto -> commitInfoMap |
| .computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()), |
| (address, index) -> { |
| index = proto.getCommitIndex(); |
| return index; |
| })); |
| } |
| } |
| |
| /** |
| * Returns Ratis as pipeline Type. |
| * |
| * @return - Ratis |
| */ |
| @Override |
| public HddsProtos.ReplicationType getPipelineType() { |
| return HddsProtos.ReplicationType.RATIS; |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return pipeline; |
| } |
| |
| @Override |
| public void connect() throws Exception { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Connecting to pipeline:{} datanode:{}", getPipeline().getId(), |
| RatisHelper.toRaftPeerId(pipeline.getFirstNode())); |
| } |
| |
| if (!client.compareAndSet(null, |
| RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, |
| tlsConfig, ozoneConfiguration))) { |
| throw new IllegalStateException("Client is already connected."); |
| } |
| } |
| |
| @Override |
| public void connect(String encodedToken) throws Exception { |
| throw new UnsupportedOperationException("Block tokens are not " + |
| "implemented for Ratis clients."); |
| } |
| |
| @Override |
| public void close() { |
| final RaftClient c = client.getAndSet(null); |
| if (c != null) { |
| closeRaftClient(c); |
| } |
| } |
| |
| private void closeRaftClient(RaftClient raftClient) { |
| try { |
| raftClient.close(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| private RaftClient getClient() { |
| return Objects.requireNonNull(client.get(), "client is null"); |
| } |
| |
| |
| @VisibleForTesting |
| public ConcurrentMap<UUID, Long> getCommitInfoMap() { |
| return commitInfoMap; |
| } |
| |
| private CompletableFuture<RaftClientReply> sendRequestAsync( |
| ContainerCommandRequestProto request) { |
| return TracingUtil.executeInNewSpan( |
| "XceiverClientRatis." + request.getCmdType().name(), |
| (Supplier<CompletableFuture<RaftClientReply>>) () -> { |
| final ContainerCommandRequestMessage message |
| = ContainerCommandRequestMessage.toMessage( |
| request, TracingUtil.exportCurrentSpan()); |
| if (HddsUtils.isReadOnly(request)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("sendCommandAsync ReadOnly {}", message); |
| } |
| return getClient().sendReadOnlyAsync(message); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("sendCommandAsync {}", message); |
| } |
| return getClient().sendAsync(message); |
| } |
| |
| } |
| |
| ); |
| } |
| |
| // gets the minimum log index replicated to all servers |
| @Override |
| public long getReplicatedMinCommitIndex() { |
| OptionalLong minIndex = |
| commitInfoMap.values().parallelStream().mapToLong(v -> v).min(); |
| return minIndex.isPresent() ? minIndex.getAsLong() : 0; |
| } |
| |
| private void addDatanodetoReply(UUID address, XceiverClientReply reply) { |
| DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); |
| builder.setUuid(address.toString()); |
| reply.addDatanode(builder.build()); |
| } |
| |
| @Override |
| public XceiverClientReply watchForCommit(long index) |
| throws InterruptedException, ExecutionException, TimeoutException, |
| IOException { |
| long commitIndex = getReplicatedMinCommitIndex(); |
| XceiverClientReply clientReply = new XceiverClientReply(null); |
| if (commitIndex >= index) { |
| // return the min commit index till which the log has been replicated to |
| // all servers |
| clientReply.setLogIndex(commitIndex); |
| return clientReply; |
| } |
| RaftClientReply reply; |
| try { |
| CompletableFuture<RaftClientReply> replyFuture = getClient() |
| .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); |
| replyFuture.get(); |
| } catch (Exception e) { |
| Throwable t = HddsClientUtils.checkForException(e); |
| LOG.warn("3 way commit failed on pipeline {}", pipeline, e); |
| if (t instanceof GroupMismatchException) { |
| throw e; |
| } |
| reply = getClient() |
| .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) |
| .get(); |
| List<RaftProtos.CommitInfoProto> commitInfoProtoList = |
| reply.getCommitInfos().stream() |
| .filter(i -> i.getCommitIndex() < index) |
| .collect(Collectors.toList()); |
| commitInfoProtoList.parallelStream().forEach(proto -> { |
| UUID address = RatisHelper.toDatanodeId(proto.getServer()); |
| addDatanodetoReply(address, clientReply); |
| // since 3 way commit has failed, the updated map from now on will |
| // only store entries for those datanodes which have had successful |
| // replication. |
| commitInfoMap.remove(address); |
| LOG.info( |
| "Could not commit index {} on pipeline {} to all the nodes. " + |
| "Server {} has failed. Committed by majority.", |
| index, pipeline, address); |
| }); |
| } |
| clientReply.setLogIndex(index); |
| return clientReply; |
| } |
| |
| /** |
| * Sends a given command to server gets a waitable future back. |
| * |
| * @param request Request |
| * @return Response to the command |
| */ |
| @Override |
| public XceiverClientReply sendCommandAsync( |
| ContainerCommandRequestProto request) { |
| XceiverClientReply asyncReply = new XceiverClientReply(null); |
| long requestTime = Time.monotonicNowNanos(); |
| CompletableFuture<RaftClientReply> raftClientReply = |
| sendRequestAsync(request); |
| metrics.incrPendingContainerOpsMetrics(request.getCmdType()); |
| CompletableFuture<ContainerCommandResponseProto> containerCommandResponse = |
| raftClientReply.whenComplete((reply, e) -> { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("received reply {} for request: cmdType={} containerID={}" |
| + " pipelineID={} traceID={} exception: {}", reply, |
| request.getCmdType(), request.getContainerID(), |
| request.getPipelineID(), request.getTraceID(), e); |
| } |
| metrics.decrPendingContainerOpsMetrics(request.getCmdType()); |
| metrics.addContainerOpsLatency(request.getCmdType(), |
| Time.monotonicNowNanos() - requestTime); |
| }).thenApply(reply -> { |
| try { |
| if (!reply.isSuccess()) { |
| // in case of raft retry failure, the raft client is |
| // not able to connect to the leader hence the pipeline |
| // can not be used but this instance of RaftClient will close |
| // and refreshed again. In case the client cannot connect to |
| // leader, getClient call will fail. |
| |
| // No need to set the failed Server ID here. Ozone client |
| // will directly exclude this pipeline in next allocate block |
| // to SCM as in this case, it is the raft client which is not |
| // able to connect to leader in the pipeline, though the |
| // pipeline can still be functional. |
| RaftException exception = reply.getException(); |
| Preconditions.checkNotNull(exception, "Raft reply failure but " + |
| "no exception propagated."); |
| throw new CompletionException(exception); |
| } |
| ContainerCommandResponseProto response = |
| ContainerCommandResponseProto |
| .parseFrom(reply.getMessage().getContent()); |
| UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId()); |
| if (response.getResult() == ContainerProtos.Result.SUCCESS) { |
| updateCommitInfosMap(reply.getCommitInfos()); |
| } |
| asyncReply.setLogIndex(reply.getLogIndex()); |
| addDatanodetoReply(serverId, asyncReply); |
| return response; |
| } catch (InvalidProtocolBufferException e) { |
| throw new CompletionException(e); |
| } |
| }); |
| asyncReply.setResponse(containerCommandResponse); |
| return asyncReply; |
| } |
| |
| } |