blob: a2e65e2009f14cefea1becf3e1f535702d1d7504 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
* The underlying RPC mechanism can be chosen via the constructor.
*/
public final class XceiverClientRatis extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final TimeDuration clientRequestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy, tlsConfig, clientRequestTimeout);
}
private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
private final TimeDuration clientRequestTimeout;
// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
// create a separate RaftClient for watchForCommit API
private RaftClient watchClient;
private XceiverClientMetrics metrics;
/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig, TimeDuration timeout) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
this.maxOutstandingRequests = maxOutStandingChunks;
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
this.clientRequestTimeout = timeout;
metrics = XceiverClientManager.getXceiverClientMetrics();
}
private void updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
if (commitInfoMap.isEmpty()) {
commitInfoProtos.forEach(proto -> commitInfoMap
.put(RatisHelper.toDatanodeId(proto.getServer()),
proto.getCommitIndex()));
// In case the commit is happening 2 way, just update the commitIndex
// for the servers which have been successfully updating the commit
// indexes. This is important because getReplicatedMinCommitIndex()
// should always return the min commit index out of the nodes which have
// been replicating data successfully.
} else {
commitInfoProtos.forEach(proto -> commitInfoMap
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
(address, index) -> {
index = proto.getCommitIndex();
return index;
}));
}
}
/**
* Returns Ratis as pipeline Type.
*
* @return - Ratis
*/
@Override
public HddsProtos.ReplicationType getPipelineType() {
return HddsProtos.ReplicationType.RATIS;
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public void connect() throws Exception {
LOG.debug("Connecting to pipeline:{} datanode:{}", getPipeline().getId(),
RatisHelper.toRaftPeerId(pipeline.getFirstNode()));
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
throw new IllegalStateException("Client is already connected.");
}
}
@Override
public void connect(String encodedToken) throws Exception {
throw new UnsupportedOperationException("Block tokens are not " +
"implemented for Ratis clients.");
}
@Override
public void close() {
final RaftClient c = client.getAndSet(null);
if (c != null) {
closeRaftClient(c);
}
if (watchClient != null) {
closeRaftClient(watchClient);
}
}
private void closeRaftClient(RaftClient raftClient) {
try {
raftClient.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private RaftClient getClient() {
return Objects.requireNonNull(client.get(), "client is null");
}
@VisibleForTesting
public ConcurrentHashMap<UUID, Long> getCommitInfoMap() {
return commitInfoMap;
}
private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientRatis." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
boolean isReadOnlyRequest = HddsUtils.isReadOnly(finalPayload);
ByteString byteString = finalPayload.toByteString();
LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, finalPayload);
return isReadOnlyRequest ?
getClient().sendReadOnlyAsync(() -> byteString) :
getClient().sendAsync(() -> byteString);
}
}
// gets the minimum log index replicated to all servers
@Override
public long getReplicatedMinCommitIndex() {
OptionalLong minIndex =
commitInfoMap.values().parallelStream().mapToLong(v -> v).min();
return minIndex.isPresent() ? minIndex.getAsLong() : 0;
}
private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
builder.setUuid(address.toString());
reply.addDatanode(builder.build());
}
@Override
public XceiverClientReply watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
long commitIndex = getReplicatedMinCommitIndex();
XceiverClientReply clientReply = new XceiverClientReply(null);
if (commitIndex >= index) {
// return the min commit index till which the log has been replicated to
// all servers
clientReply.setLogIndex(commitIndex);
return clientReply;
}
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
// create a new RaftClient instance for watch request
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
RaftClientReply reply;
try {
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException toe) {
LOG.warn("3 way commit failed ", toe);
closeRaftClient(watchClient);
// generate a new raft client instance again so that next watch request
// does not get blocked for the previous one
// TODO : need to remove the code to create the new RaftClient instance
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
reply.getCommitInfos().stream()
.filter(i -> i.getCommitIndex() < index)
.collect(Collectors.toList());
commitInfoProtoList.parallelStream().forEach(proto -> {
UUID address = RatisHelper.toDatanodeId(proto.getServer());
addDatanodetoReply(address, clientReply);
// since 3 way commit has failed, the updated map from now on will
// only store entries for those datanodes which have had successful
// replication.
commitInfoMap.remove(address);
LOG.info(
"Could not commit " + index + " to all the nodes. Server " + address
+ " has failed." + " Committed by majority.");
});
}
clientReply.setLogIndex(index);
return clientReply;
}
/**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
*/
@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) {
XceiverClientReply asyncReply = new XceiverClientReply(null);
long requestTime = Time.monotonicNowNanos();
CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request);
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
raftClientReply.whenComplete((reply, e) -> {
LOG.debug("received reply {} for request: cmdType={} containerID={}"
+ " pipelineID={} traceID={} exception: {}", reply,
request.getCmdType(), request.getContainerID(),
request.getPipelineID(), request.getTraceID(), e);
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.addContainerOpsLatency(request.getCmdType(),
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
// and refreshed again. In case the client cannot connect to
// leader, getClient call will fail.
// No need to set the failed Server ID here. Ozone client
// will directly exclude this pipeline in next allocate block
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
.parseFrom(reply.getMessage().getContent());
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos());
}
asyncReply.setLogIndex(reply.getLogIndex());
addDatanodetoReply(serverId, asyncReply);
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
}
});
asyncReply.setResponse(containerCommandResponse);
return asyncReply;
}
}