blob: c7bf6819a73abf05527e79b9ae9ca4caa4db2512 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Class to create pipelines for EC containers.
*/
public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(ECPipelineProvider.class);
// TODO - EC Placement Policy. Standard Network Aware topology will not work
// for EC as it stands. We may want an "as many racks as possible"
// policy. HDDS-5326.
private final ConfigurationSource conf;
private final PlacementPolicy placementPolicy;
private final long containerSizeBytes;
public ECPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager,
ConfigurationSource conf,
PlacementPolicy placementPolicy) {
super(nodeManager, stateManager);
this.conf = conf;
this.placementPolicy = placementPolicy;
this.containerSizeBytes = (long) this.conf
.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}
@Override
public synchronized Pipeline create(ECReplicationConfig replicationConfig)
throws IOException {
return create(replicationConfig, Collections.emptyList(),
Collections.emptyList());
}
@Override
protected Pipeline create(ECReplicationConfig replicationConfig,
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
throws IOException {
List<DatanodeDetails> dns = placementPolicy
.chooseDatanodes(excludedNodes, favoredNodes,
replicationConfig.getRequiredNodes(), 0, this.containerSizeBytes);
return create(replicationConfig, dns);
}
@Override
protected Pipeline create(ECReplicationConfig replicationConfig,
List<DatanodeDetails> nodes) {
Map<DatanodeDetails, Integer> dnIndexes = new HashMap<>();
int ecIndex = 1;
for (DatanodeDetails dn : nodes) {
dnIndexes.put(dn, ecIndex);
ecIndex++;
}
return createPipelineInternal(replicationConfig, nodes, dnIndexes);
}
@Override
public Pipeline createForRead(
ECReplicationConfig replicationConfig,
Set<ContainerReplica> replicas) {
Map<DatanodeDetails, Integer> map = new HashMap<>();
List<DatanodeDetails> dns = new ArrayList<>(replicas.size());
Map<DatanodeDetails, NodeStatus> nodeStatusMap = new HashMap<>();
for (ContainerReplica r : replicas) {
DatanodeDetails dn = r.getDatanodeDetails();
try {
NodeStatus nodeStatus = getNodeManager().getNodeStatus(dn);
if (!nodeStatus.isDead()) {
map.put(dn, r.getReplicaIndex());
dns.add(dn);
nodeStatusMap.put(dn, nodeStatus);
}
} catch (NodeNotFoundException e) {
LOG.error("Node not found", e);
}
}
dns.sort(Comparator.comparing(nodeStatusMap::get));
return createPipelineInternal(replicationConfig, dns, map);
}
private Pipeline createPipelineInternal(ECReplicationConfig repConfig,
List<DatanodeDetails> dns, Map<DatanodeDetails, Integer> indexes) {
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.ALLOCATED)
.setReplicationConfig(repConfig)
.setNodes(dns)
.setReplicaIndexes(indexes)
.build();
}
@Override
protected void close(Pipeline pipeline) throws IOException {
}
@Override
protected void shutdown() {
}
}