| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm.container; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Proxy; |
| import java.util.EnumMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import com.google.common.base.Preconditions; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.StorageUnit; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; |
| import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.container.states.ContainerState; |
| import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; |
| import org.apache.hadoop.hdds.scm.ha.CheckedConsumer; |
| import org.apache.hadoop.hdds.scm.ha.ExecutionUtil; |
| import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; |
| import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; |
| import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; |
| import org.apache.hadoop.hdds.utils.db.Table; |
| import org.apache.hadoop.hdds.utils.db.Table.KeyValue; |
| import org.apache.hadoop.hdds.utils.db.TableIterator; |
| import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; |
| import org.apache.hadoop.ozone.common.statemachine.StateMachine; |
| import org.apache.hadoop.ozone.lock.LockManager; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP; |
| |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; |
| |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK_DEFAULT; |
| |
| /** |
| * Default implementation of ContainerStateManager. This implementation |
| * holds the Container States in-memory which is backed by a persistent store. |
| * The persistent store is always kept in sync with the in-memory state changes. |
| * |
| * This class is NOT thread safe. All the calls are idempotent. |
| */ |
| public final class ContainerStateManagerImpl |
| implements ContainerStateManager { |
| |
| private ConfigurationSource confSrc; |
| |
| private final LockManager<ContainerID> lockManager; |
| |
| /** |
| * Logger instance of ContainerStateManagerImpl. |
| */ |
| private static final Logger LOG = LoggerFactory.getLogger( |
| ContainerStateManagerImpl.class); |
| |
| /** |
| * Configured container size. |
| */ |
| private final long containerSize; |
| |
| /** |
| * In-memory representation of Container States. |
| */ |
| private ContainerStateMap containers; |
| |
| /** |
| * Persistent store for Container States. |
| */ |
| private Table<ContainerID, ContainerInfo> containerStore; |
| |
| private final DBTransactionBuffer transactionBuffer; |
| |
| /** |
| * PipelineManager instance. |
| */ |
| private final PipelineManager pipelineManager; |
| |
| /** |
| * Container lifecycle state machine. |
| */ |
| private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine; |
| |
| /** |
| * We use the containers in round-robin fashion for operations like block |
| * allocation. This map is used for remembering the last used container. |
| */ |
| private ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; |
| |
| private final Map<LifeCycleEvent, CheckedConsumer<ContainerInfo, IOException>> |
| containerStateChangeActions; |
| |
| // Protect containers and containerStore against the potential |
| // contentions between RaftServer and ContainerManager. |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(true); |
| |
| /** |
| * constructs ContainerStateManagerImpl instance and loads the containers |
| * form the persistent storage. |
| * |
| * @param conf the Configuration |
| * @param pipelineManager the {@link PipelineManager} instance |
| * @param containerStore the persistent storage |
| * @throws IOException in case of error while loading the containers |
| */ |
| private ContainerStateManagerImpl(final Configuration conf, |
| final PipelineManager pipelineManager, |
| final Table<ContainerID, ContainerInfo> containerStore, |
| final DBTransactionBuffer buffer) |
| throws IOException { |
| this.confSrc = OzoneConfiguration.of(conf); |
| this.pipelineManager = pipelineManager; |
| this.containerStore = containerStore; |
| this.stateMachine = newStateMachine(); |
| this.containerSize = getConfiguredContainerSize(conf); |
| this.containers = new ContainerStateMap(); |
| this.lastUsedMap = new ConcurrentHashMap<>(); |
| this.containerStateChangeActions = getContainerStateChangeActions(); |
| this.transactionBuffer = buffer; |
| boolean fair = conf.getBoolean(OZONE_MANAGER_FAIR_LOCK, |
| OZONE_MANAGER_FAIR_LOCK_DEFAULT); |
| this.lockManager = |
| new LockManager<>(confSrc, fair); |
| initialize(); |
| } |
| |
| /** |
| * Creates and initializes a new Container Lifecycle StateMachine. |
| * |
| * @return the Container Lifecycle StateMachine |
| */ |
| private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() { |
| |
| final Set<LifeCycleState> finalStates = new HashSet<>(); |
| |
| // These are the steady states of a container. |
| finalStates.add(CLOSED); |
| finalStates.add(DELETED); |
| |
| final StateMachine<LifeCycleState, LifeCycleEvent> containerLifecycleSM = |
| new StateMachine<>(OPEN, finalStates); |
| |
| containerLifecycleSM.addTransition(OPEN, CLOSING, FINALIZE); |
| containerLifecycleSM.addTransition(CLOSING, QUASI_CLOSED, QUASI_CLOSE); |
| containerLifecycleSM.addTransition(CLOSING, CLOSED, CLOSE); |
| containerLifecycleSM.addTransition(QUASI_CLOSED, CLOSED, FORCE_CLOSE); |
| containerLifecycleSM.addTransition(CLOSED, DELETING, DELETE); |
| containerLifecycleSM.addTransition(DELETING, DELETED, CLEANUP); |
| |
| /* The following set of transitions are to make state machine |
| * transition idempotent. |
| */ |
| makeStateTransitionIdempotent(containerLifecycleSM, FINALIZE, |
| CLOSING, QUASI_CLOSED, CLOSED, DELETING, DELETED); |
| makeStateTransitionIdempotent(containerLifecycleSM, QUASI_CLOSE, |
| QUASI_CLOSED, CLOSED, DELETING, DELETED); |
| makeStateTransitionIdempotent(containerLifecycleSM, CLOSE, |
| CLOSED, DELETING, DELETED); |
| makeStateTransitionIdempotent(containerLifecycleSM, FORCE_CLOSE, |
| CLOSED, DELETING, DELETED); |
| makeStateTransitionIdempotent(containerLifecycleSM, DELETE, |
| DELETING, DELETED); |
| makeStateTransitionIdempotent(containerLifecycleSM, CLEANUP, DELETED); |
| |
| return containerLifecycleSM; |
| } |
| |
| private void makeStateTransitionIdempotent( |
| final StateMachine<LifeCycleState, LifeCycleEvent> sm, |
| final LifeCycleEvent event, final LifeCycleState... states) { |
| for (LifeCycleState state : states) { |
| sm.addTransition(state, state, event); |
| } |
| } |
| |
| /** |
| * Returns the configured container size. |
| * |
| * @return the max size of container |
| */ |
| private long getConfiguredContainerSize(final Configuration conf) { |
| return (long) conf.getStorageSize( |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, |
| StorageUnit.BYTES); |
| } |
| |
| /** |
| * Loads the containers from container store into memory. |
| * |
| * @throws IOException in case of error while loading the containers |
| */ |
| private void initialize() throws IOException { |
| try (TableIterator<ContainerID, |
| ? extends KeyValue<ContainerID, ContainerInfo>> iterator = |
| containerStore.iterator()) { |
| |
| while (iterator.hasNext()) { |
| final ContainerInfo container = iterator.next().getValue(); |
| Preconditions.checkNotNull(container); |
| containers.addContainer(container); |
| if (container.getState() == LifeCycleState.OPEN) { |
| try { |
| pipelineManager.addContainerToPipelineSCMStart( |
| container.getPipelineID(), container.containerID()); |
| } catch (PipelineNotFoundException ex) { |
| LOG.warn("Found container {} which is in OPEN state with " + |
| "pipeline {} that does not exist. Marking container for " + |
| "closing.", container, container.getPipelineID()); |
| try { |
| updateContainerState(container.containerID().getProtobuf(), |
| LifeCycleEvent.FINALIZE); |
| } catch (InvalidStateTransitionException e) { |
| // This cannot happen. |
| LOG.warn("Unable to finalize Container {}.", container); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private Map<LifeCycleEvent, CheckedConsumer<ContainerInfo, IOException>> |
| getContainerStateChangeActions() { |
| final Map<LifeCycleEvent, CheckedConsumer<ContainerInfo, IOException>> |
| actions = new EnumMap<>(LifeCycleEvent.class); |
| actions.put(FINALIZE, info -> pipelineManager |
| .removeContainerFromPipeline(info.getPipelineID(), info.containerID())); |
| return actions; |
| } |
| |
| @Override |
| public Set<ContainerID> getContainerIDs() { |
| lock.readLock().lock(); |
| try { |
| return containers.getAllContainerIDs(); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ContainerID> getContainerIDs(final LifeCycleState state) { |
| lock.readLock().lock(); |
| try { |
| return containers.getContainerIDsByState(state); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public ContainerInfo getContainer(final ContainerID id) { |
| lockManager.readLock(id); |
| try { |
| return containers.getContainerInfo(id); |
| } finally { |
| lockManager.readUnlock(id); |
| } |
| } |
| |
| @Override |
| public void addContainer(final ContainerInfoProto containerInfo) |
| throws IOException { |
| |
| // Change the exception thrown to PipelineNotFound and |
| // ClosedPipelineException once ClosedPipelineException is introduced |
| // in PipelineManager. |
| |
| Preconditions.checkNotNull(containerInfo); |
| final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo); |
| final ContainerID containerID = container.containerID(); |
| final PipelineID pipelineID = container.getPipelineID(); |
| |
| lock.writeLock().lock(); |
| try { |
| lockManager.writeLock(containerID); |
| try { |
| if (!containers.contains(containerID)) { |
| ExecutionUtil.create(() -> { |
| transactionBuffer.addToBuffer(containerStore, |
| containerID, container); |
| containers.addContainer(container); |
| if (pipelineManager.containsPipeline(pipelineID)) { |
| pipelineManager.addContainerToPipeline(pipelineID, containerID); |
| } else if (containerInfo.getState(). |
| equals(LifeCycleState.OPEN)) { |
| // Pipeline should exist, but not |
| throw new PipelineNotFoundException(); |
| } |
| //recon may receive report of closed container, |
| // no corresponding Pipeline can be synced for scm. |
| // just only add the container. |
| }).onException(() -> { |
| containers.removeContainer(containerID); |
| transactionBuffer.removeFromBuffer(containerStore, containerID); |
| }).execute(); |
| } |
| } finally { |
| lockManager.writeUnlock(containerID); |
| } |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public boolean contains(ContainerID id) { |
| lockManager.readLock(id); |
| try { |
| return containers.contains(id); |
| } finally { |
| lockManager.readUnlock(id); |
| } |
| } |
| |
| @Override |
| public void updateContainerState(final HddsProtos.ContainerID containerID, |
| final LifeCycleEvent event) |
| throws IOException, InvalidStateTransitionException { |
| // TODO: Remove the protobuf conversion after fixing ContainerStateMap. |
| final ContainerID id = ContainerID.getFromProtobuf(containerID); |
| |
| lockManager.writeLock(id); |
| try { |
| if (containers.contains(id)) { |
| final ContainerInfo oldInfo = containers.getContainerInfo(id); |
| final LifeCycleState oldState = oldInfo.getState(); |
| final LifeCycleState newState = stateMachine.getNextState( |
| oldInfo.getState(), event); |
| if (newState.getNumber() > oldState.getNumber()) { |
| ExecutionUtil.create(() -> { |
| containers.updateState(id, oldState, newState); |
| transactionBuffer.addToBuffer(containerStore, id, |
| containers.getContainerInfo(id)); |
| }).onException(() -> { |
| transactionBuffer.addToBuffer(containerStore, id, oldInfo); |
| containers.updateState(id, newState, oldState); |
| }).execute(); |
| containerStateChangeActions.getOrDefault(event, info -> { |
| }).execute(oldInfo); |
| } |
| } |
| } finally { |
| lockManager.writeUnlock(id); |
| } |
| } |
| |
| |
| @Override |
| public Set<ContainerReplica> getContainerReplicas(final ContainerID id) { |
| lockManager.readLock(id); |
| try { |
| return containers.getContainerReplicas(id); |
| } finally { |
| lockManager.readUnlock(id); |
| } |
| } |
| |
| @Override |
| public void updateContainerReplica(final ContainerID id, |
| final ContainerReplica replica) { |
| lockManager.writeLock(id); |
| try { |
| containers.updateContainerReplica(id, replica); |
| } finally { |
| lockManager.writeUnlock(id); |
| } |
| } |
| |
| @Override |
| public void removeContainerReplica(final ContainerID id, |
| final ContainerReplica replica) { |
| lockManager.writeLock(id); |
| try { |
| containers.removeContainerReplica(id, |
| replica); |
| } finally { |
| lockManager.writeUnlock(id); |
| } |
| } |
| |
| @Override |
| public void updateDeleteTransactionId( |
| final Map<ContainerID, Long> deleteTransactionMap) throws IOException { |
| |
| // TODO: Refactor this. Error handling is not done. |
| for (Map.Entry<ContainerID, Long> transaction : |
| deleteTransactionMap.entrySet()) { |
| ContainerID containerID = transaction.getKey(); |
| try { |
| lockManager.writeLock(containerID); |
| final ContainerInfo info = containers.getContainerInfo( |
| transaction.getKey()); |
| if (info == null) { |
| LOG.warn("Cannot find container {}, transaction id is {}", |
| transaction.getKey(), transaction.getValue()); |
| continue; |
| } |
| info.updateDeleteTransactionId(transaction.getValue()); |
| transactionBuffer.addToBuffer(containerStore, info.containerID(), info); |
| } finally { |
| lockManager.writeUnlock(containerID); |
| } |
| } |
| } |
| |
| public ContainerInfo getMatchingContainer(final long size, String owner, |
| PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) { |
| if (containerIDs.isEmpty()) { |
| return null; |
| } |
| |
| // Get the last used container and find container above the last used |
| // container ID. |
| final ContainerState key = new ContainerState(owner, pipelineID); |
| final ContainerID lastID = |
| lastUsedMap.getOrDefault(key, containerIDs.first()); |
| |
| |
| // There is a small issue here. The first time, we will skip the first |
| // container. But in most cases it will not matter. |
| NavigableSet<ContainerID> resultSet = containerIDs.tailSet(lastID, false); |
| if (resultSet.isEmpty()) { |
| resultSet = containerIDs; |
| } |
| ContainerInfo selectedContainer = findContainerWithSpace(size, resultSet); |
| if (selectedContainer == null) { |
| |
| // If we did not find any space in the tailSet, we need to look for |
| // space in the headset, we need to pass true to deal with the |
| // situation that we have a lone container that has space. That is we |
| // ignored the last used container under the assumption we can find |
| // other containers with space, but if have a single container that is |
| // not true. Hence we need to include the last used container as the |
| // last element in the sorted set. |
| |
| resultSet = containerIDs.headSet(lastID, true); |
| selectedContainer = findContainerWithSpace(size, resultSet); |
| } |
| |
| // TODO: cleanup entries in lastUsedMap |
| if (selectedContainer != null) { |
| lastUsedMap.put(key, selectedContainer.containerID()); |
| } |
| return selectedContainer; |
| } |
| |
| private ContainerInfo findContainerWithSpace(final long size, |
| final NavigableSet<ContainerID> |
| searchSet) { |
| // Get the container with space to meet our request. |
| for (ContainerID id : searchSet) { |
| try { |
| lockManager.readLock(id); |
| final ContainerInfo containerInfo = containers.getContainerInfo(id); |
| if (containerInfo.getUsedBytes() + size <= this.containerSize) { |
| containerInfo.updateLastUsedTime(); |
| return containerInfo; |
| } |
| } finally { |
| lockManager.readUnlock(id); |
| } |
| } |
| return null; |
| } |
| |
| |
| public void removeContainer(final HddsProtos.ContainerID id) |
| throws IOException { |
| final ContainerID cid = ContainerID.getFromProtobuf(id); |
| lock.writeLock().lock(); |
| try { |
| lockManager.writeLock(cid); |
| try { |
| final ContainerInfo containerInfo = containers.getContainerInfo(cid); |
| ExecutionUtil.create(() -> { |
| transactionBuffer.removeFromBuffer(containerStore, cid); |
| containers.removeContainer(cid); |
| }).onException(() -> containerStore.put(cid, containerInfo)).execute(); |
| } finally { |
| lockManager.writeUnlock(cid); |
| } |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public void reinitialize( |
| Table<ContainerID, ContainerInfo> store) throws IOException { |
| lock.writeLock().lock(); |
| try { |
| close(); |
| this.containerStore = store; |
| this.containers = new ContainerStateMap(); |
| this.lastUsedMap = new ConcurrentHashMap<>(); |
| initialize(); |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| containerStore.close(); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| public static Builder newBuilder() { |
| return new Builder(); |
| } |
| |
| /** |
| * Builder for ContainerStateManager. |
| */ |
| public static class Builder { |
| private Configuration conf; |
| private PipelineManager pipelineMgr; |
| private SCMRatisServer scmRatisServer; |
| private Table<ContainerID, ContainerInfo> table; |
| private DBTransactionBuffer transactionBuffer; |
| |
| public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { |
| this.transactionBuffer = buffer; |
| return this; |
| } |
| public Builder setConfiguration(final Configuration config) { |
| conf = config; |
| return this; |
| } |
| |
| public Builder setPipelineManager(final PipelineManager pipelineManager) { |
| pipelineMgr = pipelineManager; |
| return this; |
| } |
| |
| public Builder setRatisServer(final SCMRatisServer ratisServer) { |
| scmRatisServer = ratisServer; |
| return this; |
| } |
| |
| public Builder setContainerStore( |
| final Table<ContainerID, ContainerInfo> containerStore) { |
| table = containerStore; |
| return this; |
| } |
| |
| public ContainerStateManager build() throws IOException { |
| Preconditions.checkNotNull(conf); |
| Preconditions.checkNotNull(pipelineMgr); |
| Preconditions.checkNotNull(table); |
| |
| final ContainerStateManager csm = new ContainerStateManagerImpl( |
| conf, pipelineMgr, table, transactionBuffer); |
| |
| final SCMHAInvocationHandler invocationHandler = |
| new SCMHAInvocationHandler(RequestType.CONTAINER, csm, |
| scmRatisServer); |
| |
| return (ContainerStateManager) Proxy.newProxyInstance( |
| SCMHAInvocationHandler.class.getClassLoader(), |
| new Class<?>[]{ContainerStateManager.class}, invocationHandler); |
| } |
| |
| } |
| } |