blob: 6a76f02c4ca416c72158c158b3f70b725c57a4c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.CONTAINER_ID;
/**
* TODO: Add javadoc.
*/
public class ContainerManagerImpl implements ContainerManager {
/*
* TODO: Introduce container level locks.
*/
/**
*
*/
private static final Logger LOG = LoggerFactory.getLogger(
ContainerManagerImpl.class);
/**
*
*/
// Limit the number of on-going ratis operations.
private final Lock lock;
/**
*
*/
private final PipelineManager pipelineManager;
/**
*
*/
private final ContainerStateManager containerStateManager;
private final SCMHAManager haManager;
private final SequenceIdGenerator sequenceIdGen;
// TODO: Revisit this.
// Metrics related to operations should be moved to ProtocolServer
private final SCMContainerManagerMetrics scmContainerManagerMetrics;
private final int numContainerPerVolume;
@SuppressWarnings("java:S2245") // no need for secure random
private final Random random = new Random();
// Used to track pending replication and delete for container replicas. In
// ContainerManager, we try to remove any replicas we see added or deleted
// in case they have been created by replication / delete command
private final ContainerReplicaPendingOps containerReplicaPendingOps;
/**
*
*/
public ContainerManagerImpl(
final Configuration conf,
final SCMHAManager scmHaManager,
final SequenceIdGenerator sequenceIdGen,
final PipelineManager pipelineManager,
final Table<ContainerID, ContainerInfo> containerStore,
final ContainerReplicaPendingOps containerReplicaPendingOps)
throws IOException {
// Introduce builder for this class?
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.haManager = scmHaManager;
this.sequenceIdGen = sequenceIdGen;
this.containerStateManager = ContainerStateManagerImpl.newBuilder()
.setConfiguration(conf)
.setPipelineManager(pipelineManager)
.setRatisServer(scmHaManager.getRatisServer())
.setContainerStore(containerStore)
.setSCMDBTransactionBuffer(scmHaManager.getDBTransactionBuffer())
.build();
this.numContainerPerVolume = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
this.containerReplicaPendingOps = containerReplicaPendingOps;
}
@Override
public void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
throws IOException {
lock.lock();
try {
containerStateManager.reinitialize(containerStore);
} catch (IOException ioe) {
LOG.error("Failed to reinitialize containerManager", ioe);
throw ioe;
} finally {
lock.unlock();
}
}
@Override
public ContainerInfo getContainer(final ContainerID id)
throws ContainerNotFoundException {
return Optional.ofNullable(containerStateManager
.getContainer(id))
.orElseThrow(() -> new ContainerNotFoundException("ID " + id));
}
@Override
public List<ContainerInfo> getContainers(final ContainerID startID,
final int count) {
scmContainerManagerMetrics.incNumListContainersOps();
// TODO: Remove the null check, startID should not be null. Fix the unit
// test before removing the check.
final long start = startID == null ? 0 : startID.getId();
final List<ContainerID> containersIds =
new ArrayList<>(containerStateManager.getContainerIDs());
Collections.sort(containersIds);
List<ContainerInfo> containers;
lock.lock();
try {
containers = containersIds.stream()
.filter(id -> id.getId() >= start).limit(count)
.map(containerStateManager::getContainer)
.collect(Collectors.toList());
} finally {
lock.unlock();
}
return containers;
}
@Override
public List<ContainerInfo> getContainers(final LifeCycleState state) {
List<ContainerInfo> containers;
lock.lock();
try {
containers = containerStateManager.getContainerIDs(state).stream()
.map(containerStateManager::getContainer)
.filter(Objects::nonNull).collect(Collectors.toList());
} finally {
lock.unlock();
}
return containers;
}
@Override
public int getContainerStateCount(final LifeCycleState state) {
return containerStateManager.getContainerIDs(state).size();
}
@Override
public ContainerInfo allocateContainer(
final ReplicationConfig replicationConfig, final String owner)
throws IOException, TimeoutException {
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock();
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
ContainerInfo containerInfo = null;
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
}
if (pipelines.isEmpty()) {
try {
pipeline = pipelineManager.createPipeline(replicationConfig);
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (IOException e) {
scmContainerManagerMetrics.incNumFailureCreateContainers();
throw new IOException("Could not allocate container. Cannot get any" +
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
pipelineManager.acquireReadLock();
lock.lock();
try {
pipelines = pipelineManager
.getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
if (!pipelines.isEmpty()) {
pipeline = pipelines.get(random.nextInt(pipelines.size()));
containerInfo = createContainer(pipeline, owner);
} else {
throw new IOException("Could not allocate container. Cannot get any" +
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN");
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock();
}
}
return containerInfo;
}
private ContainerInfo createContainer(Pipeline pipeline, String owner)
throws IOException, TimeoutException {
final ContainerInfo containerInfo = allocateContainer(pipeline, owner);
if (LOG.isTraceEnabled()) {
LOG.trace("New container allocated: {}", containerInfo);
}
return containerInfo;
}
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException, TimeoutException {
final long uniqueId = sequenceIdGen.getNextId(CONTAINER_ID);
Preconditions.checkState(uniqueId > 0,
"Cannot allocate container, negative container id" +
" generated. %s.", uniqueId);
final ContainerID containerID = ContainerID.valueOf(uniqueId);
final ContainerInfoProto.Builder containerInfoBuilder = ContainerInfoProto
.newBuilder()
.setState(LifeCycleState.OPEN)
.setPipelineID(pipeline.getId().getProtobuf())
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.now())
.setOwner(owner)
.setContainerID(containerID.getId())
.setDeleteTransactionId(0)
.setReplicationType(pipeline.getType());
if (pipeline.getReplicationConfig() instanceof ECReplicationConfig) {
containerInfoBuilder.setEcReplicationConfig(
((ECReplicationConfig) pipeline.getReplicationConfig()).toProto());
} else {
containerInfoBuilder.setReplicationFactor(
ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
}
containerStateManager.addContainer(containerInfoBuilder.build());
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
return containerStateManager.getContainer(containerID);
}
@Override
public void updateContainerState(final ContainerID cid,
final LifeCycleEvent event)
throws IOException, InvalidStateTransitionException, TimeoutException {
HddsProtos.ContainerID protoId = cid.getProtobuf();
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerState(protoId, event);
} else {
throwContainerNotFoundException(cid);
}
} finally {
lock.unlock();
}
}
@Override
public Set<ContainerReplica> getContainerReplicas(final ContainerID id)
throws ContainerNotFoundException {
return Optional.ofNullable(containerStateManager
.getContainerReplicas(id))
.orElseThrow(() -> new ContainerNotFoundException("ID " + id));
}
@Override
public void updateContainerReplica(final ContainerID cid,
final ContainerReplica replica)
throws ContainerNotFoundException {
if (containerExist(cid)) {
containerStateManager.updateContainerReplica(cid, replica);
// Clear any pending additions for this replica as we have now seen it.
containerReplicaPendingOps.completeAddReplica(cid,
replica.getDatanodeDetails(), replica.getReplicaIndex());
} else {
throwContainerNotFoundException(cid);
}
}
@Override
public void removeContainerReplica(final ContainerID cid,
final ContainerReplica replica)
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
if (containerExist(cid)) {
containerStateManager.removeContainerReplica(cid, replica);
// Remove any pending delete replication operations for the deleted
// replica.
containerReplicaPendingOps.completeDeleteReplica(cid,
replica.getDatanodeDetails(), replica.getReplicaIndex());
} else {
throwContainerNotFoundException(cid);
}
}
@Override
public void updateDeleteTransactionId(
final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
containerStateManager.updateDeleteTransactionId(deleteTransactionMap);
}
@Override
public ContainerInfo getMatchingContainer(final long size, final String owner,
final Pipeline pipeline, final Set<ContainerID> excludedContainerIDs) {
NavigableSet<ContainerID> containerIDs;
ContainerInfo containerInfo;
try {
synchronized (pipeline.getId()) {
containerIDs = getContainersForOwner(pipeline, owner);
if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
allocateContainer(pipeline, owner);
containerIDs = getContainersForOwner(pipeline, owner);
}
containerIDs.removeAll(excludedContainerIDs);
containerInfo = containerStateManager.getMatchingContainer(
size, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
containerInfo = allocateContainer(pipeline, owner);
}
return containerInfo;
}
} catch (Exception e) {
LOG.warn("Container allocation failed on pipeline={}", pipeline, e);
return null;
}
}
private int getOpenContainerCountPerPipeline(Pipeline pipeline) {
int minContainerCountPerDn = numContainerPerVolume *
pipelineManager.minHealthyVolumeNum(pipeline);
int minPipelineCountPerDn = pipelineManager.minPipelineLimit(pipeline);
return (int) Math.ceil(
((double) minContainerCountPerDn / minPipelineCountPerDn));
}
/**
* Returns the container ID's matching with specified owner.
* @param pipeline
* @param owner
* @return NavigableSet<ContainerID>
*/
private NavigableSet<ContainerID> getContainersForOwner(
Pipeline pipeline, String owner) throws IOException {
NavigableSet<ContainerID> containerIDs =
pipelineManager.getContainersInPipeline(pipeline.getId());
Iterator<ContainerID> containerIDIterator = containerIDs.iterator();
while (containerIDIterator.hasNext()) {
ContainerID cid = containerIDIterator.next();
try {
if (!getContainer(cid).getOwner().equals(owner)) {
containerIDIterator.remove();
}
} catch (ContainerNotFoundException e) {
LOG.error("Could not find container info for container {}", cid, e);
containerIDIterator.remove();
}
}
return containerIDs;
}
@Override
public void notifyContainerReportProcessing(final boolean isFullReport,
final boolean success) {
if (isFullReport) {
if (success) {
scmContainerManagerMetrics.incNumContainerReportsProcessedSuccessful();
} else {
scmContainerManagerMetrics.incNumContainerReportsProcessedFailed();
}
} else {
if (success) {
scmContainerManagerMetrics.incNumICRReportsProcessedSuccessful();
} else {
scmContainerManagerMetrics.incNumICRReportsProcessedFailed();
}
}
}
@Override
public void deleteContainer(final ContainerID cid)
throws IOException, TimeoutException {
HddsProtos.ContainerID protoId = cid.getProtobuf();
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.removeContainer(protoId);
scmContainerManagerMetrics.incNumSuccessfulDeleteContainers();
} else {
scmContainerManagerMetrics.incNumFailureDeleteContainers();
throwContainerNotFoundException(cid);
}
} finally {
lock.unlock();
}
}
@Override
public boolean containerExist(final ContainerID id) {
return containerStateManager.contains(id);
}
private void throwContainerNotFoundException(final ContainerID id)
throws ContainerNotFoundException {
throw new ContainerNotFoundException("Container with id " +
id + " not found.");
}
@Override
public void close() throws IOException {
containerStateManager.close();
}
// Remove this after fixing Recon
@VisibleForTesting
public ContainerStateManager getContainerStateManager() {
return containerStateManager;
}
@VisibleForTesting
public SCMHAManager getSCMHAManager() {
return haManager;
}
public Set<ContainerID> getContainerIDs() {
return containerStateManager.getContainerIDs();
}
}