| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.scm.client.HddsClientUtils; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.ContainerProtos |
| .ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.proto.ContainerProtos |
| .ContainerCommandResponseProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.ratis.RatisHelper; |
| import org.apache.ratis.client.RaftClient; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftGroup; |
| import org.apache.ratis.protocol.RaftPeer; |
| import org.apache.ratis.rpc.RpcType; |
| import org.apache.ratis.rpc.SupportedRpcType; |
| import org.apache.ratis.shaded.com.google.protobuf.ByteString; |
| import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * An abstract implementation of {@link XceiverClientSpi} using Ratis. |
| * The underlying RPC mechanism can be chosen via the constructor. |
| */ |
| public final class XceiverClientRatis extends XceiverClientSpi { |
| static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); |
| |
| public static XceiverClientRatis newXceiverClientRatis( |
| Pipeline pipeline, Configuration ozoneConf) { |
| final String rpcType = ozoneConf.get( |
| ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, |
| ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); |
| final int maxOutstandingRequests = |
| HddsClientUtils.getMaxOutstandingRequests(ozoneConf); |
| return new XceiverClientRatis(pipeline, |
| SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests); |
| } |
| |
| private final Pipeline pipeline; |
| private final RpcType rpcType; |
| private final AtomicReference<RaftClient> client = new AtomicReference<>(); |
| private final int maxOutstandingRequests; |
| |
| /** |
| * Constructs a client. |
| */ |
| private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, |
| int maxOutStandingChunks) { |
| super(); |
| this.pipeline = pipeline; |
| this.rpcType = rpcType; |
| this.maxOutstandingRequests = maxOutStandingChunks; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void createPipeline(String clusterId, List<DatanodeDetails> datanodes) |
| throws IOException { |
| RaftGroup group = RatisHelper.newRaftGroup(datanodes); |
| LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, |
| group.getPeers()); |
| reinitialize(datanodes, group); |
| } |
| |
| /** |
| * Returns Ratis as pipeline Type. |
| * |
| * @return - Ratis |
| */ |
| @Override |
| public HddsProtos.ReplicationType getPipelineType() { |
| return HddsProtos.ReplicationType.RATIS; |
| } |
| |
| private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group) |
| throws IOException { |
| if (datanodes.isEmpty()) { |
| return; |
| } |
| |
| IOException exception = null; |
| for (DatanodeDetails d : datanodes) { |
| try { |
| reinitialize(d, group); |
| } catch (IOException ioe) { |
| if (exception == null) { |
| exception = new IOException( |
| "Failed to reinitialize some of the RaftPeer(s)", ioe); |
| } else { |
| exception.addSuppressed(ioe); |
| } |
| } |
| } |
| if (exception != null) { |
| throw exception; |
| } |
| } |
| |
| /** |
| * Adds a new peers to the Ratis Ring. |
| * |
| * @param datanode - new datanode |
| * @param group - Raft group |
| * @throws IOException - on Failure. |
| */ |
| private void reinitialize(DatanodeDetails datanode, RaftGroup group) |
| throws IOException { |
| final RaftPeer p = RatisHelper.toRaftPeer(datanode); |
| try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { |
| client.reinitialize(group, p.getId()); |
| } catch (IOException ioe) { |
| LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", |
| p, datanode, ioe); |
| throw new IOException("Failed to reinitialize RaftPeer " + p |
| + "(datanode=" + datanode + ")", ioe); |
| } |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return pipeline; |
| } |
| |
| @Override |
| public void connect() throws Exception { |
| LOG.debug("Connecting to pipeline:{} leader:{}", |
| getPipeline().getPipelineName(), |
| RatisHelper.toRaftPeerId(pipeline.getLeader())); |
| // TODO : XceiverClient ratis should pass the config value of |
| // maxOutstandingRequests so as to set the upper bound on max no of async |
| // requests to be handled by raft client |
| if (!client.compareAndSet(null, |
| RatisHelper.newRaftClient(rpcType, getPipeline()))) { |
| throw new IllegalStateException("Client is already connected."); |
| } |
| } |
| |
| @Override |
| public void close() { |
| final RaftClient c = client.getAndSet(null); |
| if (c != null) { |
| try { |
| c.close(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| } |
| |
| private RaftClient getClient() { |
| return Objects.requireNonNull(client.get(), "client is null"); |
| } |
| |
| private boolean isReadOnly(ContainerCommandRequestProto proto) { |
| switch (proto.getCmdType()) { |
| case ReadContainer: |
| case ReadChunk: |
| case ListKey: |
| case GetKey: |
| case GetSmallFile: |
| case ListContainer: |
| case ListChunk: |
| return true; |
| case CloseContainer: |
| case WriteChunk: |
| case UpdateContainer: |
| case CompactChunk: |
| case CreateContainer: |
| case DeleteChunk: |
| case DeleteContainer: |
| case DeleteKey: |
| case PutKey: |
| case PutSmallFile: |
| default: |
| return false; |
| } |
| } |
| |
| private RaftClientReply sendRequest(ContainerCommandRequestProto request) |
| throws IOException { |
| boolean isReadOnlyRequest = isReadOnly(request); |
| ByteString byteString = |
| ShadedProtoUtil.asShadedByteString(request.toByteArray()); |
| LOG.debug("sendCommand {} {}", isReadOnlyRequest, request); |
| final RaftClientReply reply = isReadOnlyRequest ? |
| getClient().sendReadOnly(() -> byteString) : |
| getClient().send(() -> byteString); |
| LOG.debug("reply {} {}", isReadOnlyRequest, reply); |
| return reply; |
| } |
| |
| private CompletableFuture<RaftClientReply> sendRequestAsync( |
| ContainerCommandRequestProto request) throws IOException { |
| boolean isReadOnlyRequest = isReadOnly(request); |
| ByteString byteString = |
| ShadedProtoUtil.asShadedByteString(request.toByteArray()); |
| LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); |
| return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : |
| getClient().sendAsync(() -> byteString); |
| } |
| |
| @Override |
| public ContainerCommandResponseProto sendCommand( |
| ContainerCommandRequestProto request) throws IOException { |
| final RaftClientReply reply = sendRequest(request); |
| Preconditions.checkState(reply.isSuccess()); |
| return ContainerCommandResponseProto.parseFrom( |
| ShadedProtoUtil.asByteString(reply.getMessage().getContent())); |
| } |
| |
| /** |
| * Sends a given command to server gets a waitable future back. |
| * |
| * @param request Request |
| * @return Response to the command |
| * @throws IOException |
| */ |
| @Override |
| public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( |
| ContainerCommandRequestProto request) |
| throws IOException, ExecutionException, InterruptedException { |
| return sendRequestAsync(request).whenComplete((reply, e) -> |
| LOG.debug("received reply {} for request: {} exception: {}", request, |
| reply, e)) |
| .thenApply(reply -> { |
| try { |
| return ContainerCommandResponseProto.parseFrom( |
| ShadedProtoUtil.asByteString(reply.getMessage().getContent())); |
| } catch (InvalidProtocolBufferException e) { |
| throw new CompletionException(e); |
| } |
| }); |
| } |
| } |