blob: a113a0cd140f6792c9d26e2d7c57e3b82c037c39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding
* copyright ownership. The ASF licenses this file to you under the Apache
* License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Class to obtain a writable container for Ratis and Standalone pipelines.
*/
public class WritableRatisContainerProvider
implements WritableContainerProvider<ReplicationConfig> {
private static final Logger LOG = LoggerFactory
.getLogger(WritableRatisContainerProvider.class);
private final ConfigurationSource conf;
private final PipelineManager pipelineManager;
private final PipelineChoosePolicy pipelineChoosePolicy;
private final ContainerManager containerManager;
public WritableRatisContainerProvider(ConfigurationSource conf,
PipelineManager pipelineManager,
ContainerManager containerManager,
PipelineChoosePolicy pipelineChoosePolicy) {
this.conf = conf;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.pipelineChoosePolicy = pipelineChoosePolicy;
}
@Override
public ContainerInfo getContainer(final long size,
ReplicationConfig repConfig, String owner, ExcludeList excludeList)
throws IOException {
/*
Here is the high level logic.
1. We try to find pipelines in open state.
2. If there are no pipelines in OPEN state, then we try to create one.
3. We allocate a block from the available containers in the selected
pipeline.
TODO : #CLUTIL Support random picking of two containers from the list.
So we can use different kind of policies.
*/
ContainerInfo containerInfo = null;
//TODO we need to continue the refactor to use repConfig everywhere
//in downstream managers.
while (true) {
List<Pipeline> availablePipelines;
Pipeline pipeline;
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
try {
availablePipelines =
findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.OPEN);
if (availablePipelines.size() != 0) {
containerInfo = selectContainer(availablePipelines, size, owner,
excludeList);
}
if (containerInfo != null) {
return containerInfo;
}
} finally {
pipelineManager.releaseReadLock();
}
if (availablePipelines.size() == 0) {
try {
// TODO: #CLUTIL Remove creation logic when all replication types
// and factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(repConfig);
// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (SCMException se) {
LOG.warn("Pipeline creation failed for repConfig {} " +
"Datanodes may be used up. Try to see if any pipeline is in " +
"ALLOCATED state, and then will wait for it to be OPEN",
repConfig, se);
List<Pipeline> allocatedPipelines = findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.ALLOCATED);
if (!allocatedPipelines.isEmpty()) {
List<PipelineID> allocatedPipelineIDs =
allocatedPipelines.stream()
.map(p -> p.getId())
.collect(Collectors.toList());
try {
pipelineManager
.waitOnePipelineReady(allocatedPipelineIDs, 0);
} catch (IOException e) {
LOG.warn("Waiting for one of pipelines {} to be OPEN failed. ",
allocatedPipelineIDs, e);
}
}
} catch (IOException e) {
LOG.warn("Pipeline creation failed for repConfig: {}. "
+ "Retrying get pipelines call once.", repConfig, e);
}
pipelineManager.acquireReadLock();
try {
// If Exception occurred or successful creation of pipeline do one
// final try to fetch pipelines.
availablePipelines = findPipelinesByState(repConfig,
excludeList,
Pipeline.PipelineState.OPEN);
if (availablePipelines.size() == 0) {
LOG.info("Could not find available pipeline of repConfig: {} "
+ "even after retrying", repConfig);
break;
}
containerInfo = selectContainer(availablePipelines, size, owner,
excludeList);
if (containerInfo != null) {
return containerInfo;
}
} finally {
pipelineManager.releaseReadLock();
}
}
}
// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
LOG.error(
"Unable to allocate a block for the size: {}, repConfig: {}",
size, repConfig);
return null;
}
private List<Pipeline> findPipelinesByState(
final ReplicationConfig repConfig,
final ExcludeList excludeList,
final Pipeline.PipelineState pipelineState) {
List<Pipeline> pipelines = pipelineManager.getPipelines(repConfig,
pipelineState, excludeList.getDatanodes(),
excludeList.getPipelineIds());
if (pipelines.size() == 0 && !excludeList.isEmpty()) {
// if no pipelines can be found, try finding pipeline without
// exclusion
pipelines = pipelineManager.getPipelines(repConfig, pipelineState);
}
return pipelines;
}
private ContainerInfo selectContainer(List<Pipeline> availablePipelines,
long size, String owner, ExcludeList excludeList) {
Pipeline pipeline;
ContainerInfo containerInfo;
PipelineRequestInformation pri =
PipelineRequestInformation.Builder.getBuilder().setSize(size)
.build();
pipeline = pipelineChoosePolicy.choosePipeline(
availablePipelines, pri);
// look for OPEN containers that match the criteria.
containerInfo = containerManager.getMatchingContainer(size, owner,
pipeline, excludeList.getContainerIds());
return containerInfo;
}
}