blob: 9409728d6b485a1f85164395d6913feee31d44b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Implements Api for creating ratis pipelines.
*/
public class RatisPipelineProvider implements PipelineProvider {
private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
private final ForkJoinPool.ForkJoinWorkerThreadFactory factory =
(pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.
defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex());
return worker;
});
private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelismForPool, factory, null, false);
private final GrpcTlsConfig tlsConfig;
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf,
GrpcTlsConfig tlsConfig) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
this.tlsConfig = tlsConfig;
}
/**
* Create pluggable container placement policy implementation instance.
*
* @param nodeManager - SCM node manager.
* @param conf - configuration.
* @return SCM container placement policy implementation instance.
*/
@SuppressWarnings("unchecked")
// TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
private static PlacementPolicy createContainerPlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
Class<? extends PlacementPolicy> implClass =
(Class<? extends PlacementPolicy>) conf.getClass(
ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementRandom.class);
try {
Constructor<? extends PlacementPolicy> ctor =
implClass.getDeclaredConstructor(NodeManager.class,
Configuration.class);
return ctor.newInstance(nodeManager, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(implClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
// LOG.error("Unhandled exception occurred, Placement policy will not " +
// "be functional.");
throw new IllegalArgumentException("Unable to load " +
"PlacementPolicy", e);
}
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
// Get set of datanodes already used for ratis pipeline
Set<DatanodeDetails> dnsUsed = new HashSet<>();
stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
p -> p.getPipelineState().equals(PipelineState.OPEN) ||
p.getPipelineState().equals(PipelineState.DORMANT) ||
p.getPipelineState().equals(PipelineState.ALLOCATED))
.forEach(p -> dnsUsed.addAll(p.getNodes()));
// Get list of healthy nodes
List<DatanodeDetails> dns =
nodeManager.getNodes(NodeState.HEALTHY)
.parallelStream()
.filter(dn -> !dnsUsed.contains(dn))
.limit(factor.getNumber())
.collect(Collectors.toList());
if (dns.size() < factor.getNumber()) {
String e = String
.format("Cannot create pipeline of factor %d using %d nodes.",
factor.getNumber(), dns.size());
throw new InsufficientDatanodesException(e);
}
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(PipelineState.OPEN)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.build();
initializePipeline(pipeline);
return pipeline;
}
@Override
public Pipeline create(ReplicationFactor factor,
List<DatanodeDetails> nodes) {
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(PipelineState.OPEN)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.build();
}
@Override
public void shutdown() {
forkJoinPool.shutdownNow();
try {
forkJoinPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Unexpected exception occurred during shutdown of " +
"RatisPipelineProvider", e);
}
}
protected void initializePipeline(Pipeline pipeline) throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getNodes(),
(raftClient, peer) -> {
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
if (reply == null || !reply.isSuccess()) {
String msg = "Pipeline initialization failed for pipeline:"
+ pipeline.getId() + " node:" + peer.getId();
LOG.error(msg);
throw new IOException(msg);
}
});
}
private void callRatisRpc(List<DatanodeDetails> datanodes,
CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}
final String rpcType = conf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
final List< IOException > exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf);
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(conf);
try {
forkJoinPool.submit(() -> {
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig,
requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
"Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
LOG.error(errMsg, ioe);
exceptions.add(new IOException(errMsg, ioe));
}
});
}).get();
} catch (ExecutionException | RejectedExecutionException ex) {
LOG.error(ex.getClass().getName() + " exception occurred during " +
"createPipeline", ex);
throw new IOException(ex.getClass().getName() + " exception occurred " +
"during createPipeline", ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Interrupt exception occurred during " +
"createPipeline", ex);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
}