| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.pipeline; |
| |
| import com.google.common.base.Preconditions; |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import org.apache.commons.lang3.builder.HashCodeBuilder; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| |
| /** |
| * Holds the data structures which maintain the information about pipeline and |
| * its state. |
| * Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and |
| * pipeline2container would have a non-null mapping for it. |
| */ |
| class PipelineStateMap { |
| |
| private static final Logger LOG = LoggerFactory.getLogger( |
| PipelineStateMap.class); |
| |
| private final Map<PipelineID, Pipeline> pipelineMap; |
| private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container; |
| private final Map<PipelineQuery, List<Pipeline>> query2OpenPipelines; |
| |
| PipelineStateMap() { |
| |
| // TODO: Use TreeMap for range operations? |
| pipelineMap = new ConcurrentHashMap<>(); |
| pipeline2container = new ConcurrentHashMap<>(); |
| query2OpenPipelines = new HashMap<>(); |
| initializeQueryMap(); |
| |
| } |
| |
| private void initializeQueryMap() { |
| for (ReplicationType type : ReplicationType.values()) { |
| for (ReplicationFactor factor : ReplicationFactor.values()) { |
| query2OpenPipelines |
| .put(new PipelineQuery(type, factor), new CopyOnWriteArrayList<>()); |
| } |
| } |
| } |
| |
| /** |
| * Adds provided pipeline in the data structures. |
| * |
| * @param pipeline - Pipeline to add |
| * @throws IOException if pipeline with provided pipelineID already exists |
| */ |
| void addPipeline(Pipeline pipeline) throws IOException { |
| Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); |
| Preconditions.checkArgument( |
| pipeline.getNodes().size() == pipeline.getFactor().getNumber(), |
| String.format("Nodes size=%d, replication factor=%d do not match ", |
| pipeline.getNodes().size(), pipeline.getFactor().getNumber())); |
| |
| if (pipelineMap.putIfAbsent(pipeline.getId(), pipeline) != null) { |
| LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getId()); |
| throw new IOException(String |
| .format("Duplicate pipeline ID %s detected.", pipeline.getId())); |
| } |
| pipeline2container.put(pipeline.getId(), new TreeSet<>()); |
| if (pipeline.getPipelineState() == PipelineState.OPEN) { |
| query2OpenPipelines.get(new PipelineQuery(pipeline)).add(pipeline); |
| } |
| } |
| |
| /** |
| * Add container to an existing pipeline. |
| * |
| * @param pipelineID - PipelineID of the pipeline to which container is added |
| * @param containerID - ContainerID of the container to add |
| * @throws IOException if pipeline is not in open state or does not exist |
| */ |
| void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) |
| throws IOException { |
| Preconditions.checkNotNull(pipelineID, |
| "Pipeline Id cannot be null"); |
| Preconditions.checkNotNull(containerID, |
| "Container Id cannot be null"); |
| |
| Pipeline pipeline = getPipeline(pipelineID); |
| if (pipeline.isClosed()) { |
| throw new IOException(String |
| .format("Cannot add container to pipeline=%s in closed state", |
| pipelineID)); |
| } |
| pipeline2container.get(pipelineID).add(containerID); |
| } |
| |
| /** |
| * Get pipeline corresponding to specified pipelineID. |
| * |
| * @param pipelineID - PipelineID of the pipeline to be retrieved |
| * @return Pipeline |
| * @throws IOException if pipeline is not found |
| */ |
| Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { |
| Preconditions.checkNotNull(pipelineID, |
| "Pipeline Id cannot be null"); |
| |
| Pipeline pipeline = pipelineMap.get(pipelineID); |
| if (pipeline == null) { |
| throw new PipelineNotFoundException( |
| String.format("%s not found", pipelineID)); |
| } |
| return pipeline; |
| } |
| |
| /** |
| * Get list of pipelines in SCM. |
| * @return List of pipelines |
| */ |
| public List<Pipeline> getPipelines() { |
| return new ArrayList<>(pipelineMap.values()); |
| } |
| |
| /** |
| * Get pipeline corresponding to specified replication type. |
| * |
| * @param type - ReplicationType |
| * @return List of pipelines which have the specified replication type |
| */ |
| List<Pipeline> getPipelines(ReplicationType type) { |
| Preconditions.checkNotNull(type, "Replication type cannot be null"); |
| |
| List<Pipeline> pipelines = new ArrayList<>(); |
| for (Pipeline pipeline : pipelineMap.values()) { |
| if (pipeline.getType() == type) { |
| pipelines.add(pipeline); |
| } |
| } |
| |
| return pipelines; |
| } |
| |
| /** |
| * Get pipeline corresponding to specified replication type and factor. |
| * |
| * @param type - ReplicationType |
| * @param factor - ReplicationFactor |
| * @return List of pipelines with specified replication type and factor |
| */ |
| List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) { |
| Preconditions.checkNotNull(type, "Replication type cannot be null"); |
| Preconditions.checkNotNull(factor, "Replication factor cannot be null"); |
| |
| List<Pipeline> pipelines = new ArrayList<>(); |
| for (Pipeline pipeline : pipelineMap.values()) { |
| if (pipeline.getType() == type && pipeline.getFactor() == factor) { |
| pipelines.add(pipeline); |
| } |
| } |
| |
| return pipelines; |
| } |
| |
| /** |
| * Get list of pipeline corresponding to specified replication type and |
| * pipeline states. |
| * |
| * @param type - ReplicationType |
| * @param states - Array of required PipelineState |
| * @return List of pipelines with specified replication type and states |
| */ |
| List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) { |
| Preconditions.checkNotNull(type, "Replication type cannot be null"); |
| Preconditions.checkNotNull(states, "Pipeline state cannot be null"); |
| |
| Set<PipelineState> pipelineStates = new HashSet<>(); |
| pipelineStates.addAll(Arrays.asList(states)); |
| |
| List<Pipeline> pipelines = new ArrayList<>(); |
| for (Pipeline pipeline : pipelineMap.values()) { |
| if (pipeline.getType() == type |
| && pipelineStates.contains(pipeline.getPipelineState())) { |
| pipelines.add(pipeline); |
| } |
| } |
| |
| return pipelines; |
| } |
| |
| /** |
| * Get list of pipeline corresponding to specified replication type, |
| * replication factor and pipeline state. |
| * |
| * @param type - ReplicationType |
| * @param state - Required PipelineState |
| * @return List of pipelines with specified replication type, |
| * replication factor and pipeline state |
| */ |
| List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, |
| PipelineState state) { |
| Preconditions.checkNotNull(type, "Replication type cannot be null"); |
| Preconditions.checkNotNull(factor, "Replication factor cannot be null"); |
| Preconditions.checkNotNull(state, "Pipeline state cannot be null"); |
| |
| if (state == PipelineState.OPEN) { |
| return new ArrayList<>( |
| query2OpenPipelines.getOrDefault( |
| new PipelineQuery(type, factor), Collections.EMPTY_LIST)); |
| } |
| |
| List<Pipeline> pipelines = new ArrayList<>(); |
| for (Pipeline pipeline : pipelineMap.values()) { |
| if (pipeline.getType() == type |
| && pipeline.getPipelineState() == state |
| && pipeline.getFactor() == factor) { |
| pipelines.add(pipeline); |
| } |
| } |
| |
| return pipelines; |
| } |
| |
| /** |
| * Get list of pipeline corresponding to specified replication type, |
| * replication factor and pipeline state. |
| * |
| * @param type - ReplicationType |
| * @param state - Required PipelineState |
| * @param excludeDns dns to exclude |
| * @param excludePipelines pipelines to exclude |
| * @return List of pipelines with specified replication type, |
| * replication factor and pipeline state |
| */ |
| List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, |
| PipelineState state, Collection<DatanodeDetails> excludeDns, |
| Collection<PipelineID> excludePipelines) { |
| Preconditions.checkNotNull(type, "Replication type cannot be null"); |
| Preconditions.checkNotNull(factor, "Replication factor cannot be null"); |
| Preconditions.checkNotNull(state, "Pipeline state cannot be null"); |
| Preconditions |
| .checkNotNull(excludeDns, "Datanode exclude list cannot be null"); |
| Preconditions |
| .checkNotNull(excludeDns, "Pipeline exclude list cannot be null"); |
| |
| List<Pipeline> pipelines = null; |
| if (state == PipelineState.OPEN) { |
| pipelines = new ArrayList<>(query2OpenPipelines.getOrDefault( |
| new PipelineQuery(type, factor), Collections.EMPTY_LIST)); |
| } else { |
| pipelines = new ArrayList<>(pipelineMap.values()); |
| } |
| |
| Iterator<Pipeline> iter = pipelines.iterator(); |
| while (iter.hasNext()) { |
| Pipeline pipeline = iter.next(); |
| if (pipeline.getType() != type || |
| pipeline.getPipelineState() != state || |
| pipeline.getFactor() != factor || |
| excludePipelines.contains(pipeline.getId())) { |
| iter.remove(); |
| } else { |
| for (DatanodeDetails dn : pipeline.getNodes()) { |
| if (excludeDns.contains(dn)) { |
| iter.remove(); |
| break; |
| } |
| } |
| } |
| } |
| |
| return pipelines; |
| } |
| |
| /** |
| * Get set of containerIDs corresponding to a pipeline. |
| * |
| * @param pipelineID - PipelineID |
| * @return Set of containerIDs belonging to the pipeline |
| * @throws IOException if pipeline is not found |
| */ |
| NavigableSet<ContainerID> getContainers(PipelineID pipelineID) |
| throws PipelineNotFoundException { |
| Preconditions.checkNotNull(pipelineID, |
| "Pipeline Id cannot be null"); |
| |
| NavigableSet<ContainerID> containerIDs = pipeline2container.get(pipelineID); |
| if (containerIDs == null) { |
| throw new PipelineNotFoundException( |
| String.format("%s not found", pipelineID)); |
| } |
| return new TreeSet<>(containerIDs); |
| } |
| |
| /** |
| * Get number of containers corresponding to a pipeline. |
| * |
| * @param pipelineID - PipelineID |
| * @return Number of containers belonging to the pipeline |
| * @throws IOException if pipeline is not found |
| */ |
| int getNumberOfContainers(PipelineID pipelineID) |
| throws PipelineNotFoundException { |
| Preconditions.checkNotNull(pipelineID, |
| "Pipeline Id cannot be null"); |
| |
| Set<ContainerID> containerIDs = pipeline2container.get(pipelineID); |
| if (containerIDs == null) { |
| throw new PipelineNotFoundException( |
| String.format("%s not found", pipelineID)); |
| } |
| return containerIDs.size(); |
| } |
| |
| /** |
| * Remove pipeline from the data structures. |
| * |
| * @param pipelineID - PipelineID of the pipeline to be removed |
| * @throws IOException if the pipeline is not empty or does not exist |
| */ |
| Pipeline removePipeline(PipelineID pipelineID) throws IOException { |
| Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); |
| |
| Pipeline pipeline = getPipeline(pipelineID); |
| if (!pipeline.isClosed()) { |
| throw new IOException( |
| String.format("Pipeline with %s is not yet closed", pipelineID)); |
| } |
| |
| pipelineMap.remove(pipelineID); |
| pipeline2container.remove(pipelineID); |
| return pipeline; |
| } |
| |
| /** |
| * Remove container from a pipeline. |
| * |
| * @param pipelineID - PipelineID of the pipeline from which container needs |
| * to be removed |
| * @param containerID - ContainerID of the container to remove |
| * @throws IOException if pipeline does not exist |
| */ |
| void removeContainerFromPipeline(PipelineID pipelineID, |
| ContainerID containerID) throws IOException { |
| Preconditions.checkNotNull(pipelineID, |
| "Pipeline Id cannot be null"); |
| Preconditions.checkNotNull(containerID, |
| "container Id cannot be null"); |
| |
| Set<ContainerID> containerIDs = pipeline2container.get(pipelineID); |
| if (containerIDs == null) { |
| throw new PipelineNotFoundException( |
| String.format("%s not found", pipelineID)); |
| } |
| containerIDs.remove(containerID); |
| } |
| |
| /** |
| * Updates the state of pipeline. |
| * |
| * @param pipelineID - PipelineID of the pipeline whose state needs |
| * to be updated |
| * @param state - new state of the pipeline |
| * @return Pipeline with the updated state |
| * @throws IOException if pipeline does not exist |
| */ |
| Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state) |
| throws PipelineNotFoundException { |
| Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); |
| Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); |
| |
| final Pipeline pipeline = getPipeline(pipelineID); |
| Pipeline updatedPipeline = pipelineMap.compute(pipelineID, |
| (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build()); |
| PipelineQuery query = new PipelineQuery(pipeline); |
| List<Pipeline> pipelineList = query2OpenPipelines.get(query); |
| if (updatedPipeline.getPipelineState() == PipelineState.OPEN) { |
| // for transition to OPEN state add pipeline to query2OpenPipelines |
| if (pipelineList == null) { |
| pipelineList = new CopyOnWriteArrayList<>(); |
| query2OpenPipelines.put(query, pipelineList); |
| } |
| pipelineList.add(updatedPipeline); |
| } else { |
| // for transition from OPEN to CLOSED state remove pipeline from |
| // query2OpenPipelines |
| if (pipelineList != null) { |
| pipelineList.remove(pipeline); |
| } |
| } |
| return updatedPipeline; |
| } |
| |
| private static class PipelineQuery { |
| private ReplicationType type; |
| private ReplicationFactor factor; |
| |
| PipelineQuery(ReplicationType type, ReplicationFactor factor) { |
| this.type = Preconditions.checkNotNull(type); |
| this.factor = Preconditions.checkNotNull(factor); |
| } |
| |
| PipelineQuery(Pipeline pipeline) { |
| type = pipeline.getType(); |
| factor = pipeline.getFactor(); |
| } |
| |
| @Override |
| @SuppressFBWarnings("NP_EQUALS_SHOULD_HANDLE_NULL_ARGUMENT") |
| public boolean equals(Object other) { |
| if (this == other) { |
| return true; |
| } |
| if (!this.getClass().equals(other.getClass())) { |
| return false; |
| } |
| PipelineQuery otherQuery = (PipelineQuery) other; |
| return type == otherQuery.type && factor == otherQuery.factor; |
| } |
| |
| @Override |
| public int hashCode() { |
| return new HashCodeBuilder() |
| .append(type) |
| .append(factor) |
| .toHashCode(); |
| } |
| } |
| } |