| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import java.util.*; |
| |
| import org.apache.hadoop.hdfs.protocol.Block; |
| |
| /* Class for keeping track of under replication blocks |
| * Blocks have replication priority, with priority 0 indicating the highest |
| * Blocks have only one replicas has the highest |
| */ |
| class UnderReplicatedBlocks implements Iterable<Block> { |
| static final int LEVEL = 4; |
| private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>(); |
| |
| /* constructor */ |
| UnderReplicatedBlocks() { |
| for(int i=0; i<LEVEL; i++) { |
| priorityQueues.add(new TreeSet<Block>()); |
| } |
| } |
| |
| /** |
| * Empty the queues. |
| */ |
| void clear() { |
| for(int i=0; i<LEVEL; i++) { |
| priorityQueues.get(i).clear(); |
| } |
| } |
| |
| /* Return the total number of under replication blocks */ |
| synchronized int size() { |
| int size = 0; |
| for (int i=0; i<LEVEL; i++) { |
| size += priorityQueues.get(i).size(); |
| } |
| return size; |
| } |
| |
| /* Check if a block is in the neededReplication queue */ |
| synchronized boolean contains(Block block) { |
| for(TreeSet<Block> set:priorityQueues) { |
| if(set.contains(block)) { return true; } |
| } |
| return false; |
| } |
| |
| /* Return the priority of a block |
| * @param block a under replication block |
| * @param curReplicas current number of replicas of the block |
| * @param expectedReplicas expected number of replicas of the block |
| */ |
| private int getPriority(Block block, |
| int curReplicas, |
| int decommissionedReplicas, |
| int expectedReplicas) { |
| if (curReplicas<0) { |
| return LEVEL; |
| } else if (curReplicas>=expectedReplicas) { |
| return 3; // Block doesn't have enough racks |
| } else if(curReplicas==0) { |
| // If there are zero non-decommissioned replica but there are |
| // some decommissioned replicas, then assign them highest priority |
| if (decommissionedReplicas > 0) { |
| return 0; |
| } |
| return 2; // keep these blocks in needed replication. |
| } else if(curReplicas==1) { |
| return 0; // highest priority |
| } else if(curReplicas*3<expectedReplicas) { |
| return 1; |
| } else { |
| return 2; |
| } |
| } |
| |
| /* add a block to a under replication queue according to its priority |
| * @param block a under replication block |
| * @param curReplicas current number of replicas of the block |
| * @param expectedReplicas expected number of replicas of the block |
| */ |
| synchronized boolean add( |
| Block block, |
| int curReplicas, |
| int decomissionedReplicas, |
| int expectedReplicas) { |
| if(curReplicas<0) { |
| return false; |
| } |
| int priLevel = getPriority(block, curReplicas, decomissionedReplicas, |
| expectedReplicas); |
| if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) { |
| NameNode.stateChangeLog.debug( |
| "BLOCK* NameSystem.UnderReplicationBlock.add:" |
| + block |
| + " has only "+curReplicas |
| + " replicas and need " + expectedReplicas |
| + " replicas so is added to neededReplications" |
| + " at priority level " + priLevel); |
| return true; |
| } |
| return false; |
| } |
| |
| /* remove a block from a under replication queue */ |
| synchronized boolean remove(Block block, |
| int oldReplicas, |
| int decommissionedReplicas, |
| int oldExpectedReplicas) { |
| int priLevel = getPriority(block, oldReplicas, |
| decommissionedReplicas, |
| oldExpectedReplicas); |
| return remove(block, priLevel); |
| } |
| |
| /* remove a block from a under replication queue given a priority*/ |
| boolean remove(Block block, int priLevel) { |
| if(priLevel >= 0 && priLevel < LEVEL |
| && priorityQueues.get(priLevel).remove(block)) { |
| NameNode.stateChangeLog.debug( |
| "BLOCK* NameSystem.UnderReplicationBlock.remove: " |
| + "Removing block " + block |
| + " from priority queue "+ priLevel); |
| return true; |
| } else { |
| for(int i=0; i<LEVEL; i++) { |
| if(i!=priLevel && priorityQueues.get(i).remove(block)) { |
| NameNode.stateChangeLog.debug( |
| "BLOCK* NameSystem.UnderReplicationBlock.remove: " |
| + "Removing block " + block |
| + " from priority queue "+ i); |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /* update the priority level of a block */ |
| synchronized void update(Block block, int curReplicas, |
| int decommissionedReplicas, |
| int curExpectedReplicas, |
| int curReplicasDelta, int expectedReplicasDelta) { |
| int oldReplicas = curReplicas-curReplicasDelta; |
| int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; |
| int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); |
| int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); |
| NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + |
| block + |
| " curReplicas " + curReplicas + |
| " curExpectedReplicas " + curExpectedReplicas + |
| " oldReplicas " + oldReplicas + |
| " oldExpectedReplicas " + oldExpectedReplicas + |
| " curPri " + curPri + |
| " oldPri " + oldPri); |
| if(oldPri != LEVEL && oldPri != curPri) { |
| remove(block, oldPri); |
| } |
| if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) { |
| NameNode.stateChangeLog.debug( |
| "BLOCK* NameSystem.UnderReplicationBlock.update:" |
| + block |
| + " has only "+curReplicas |
| + " replicas and need " + curExpectedReplicas |
| + " replicas so is added to neededReplications" |
| + " at priority level " + curPri); |
| } |
| } |
| |
| /* return an iterator of all the under replication blocks */ |
| public synchronized BlockIterator iterator() { |
| return new BlockIterator(); |
| } |
| |
| class BlockIterator implements Iterator<Block> { |
| private int level; |
| private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>(); |
| BlockIterator() |
| { |
| level=0; |
| for(int i=0; i<LEVEL; i++) { |
| iterators.add(priorityQueues.get(i).iterator()); |
| } |
| } |
| |
| private void update() { |
| while(level< LEVEL-1 && !iterators.get(level).hasNext()) { |
| level++; |
| } |
| } |
| |
| public Block next() { |
| update(); |
| return iterators.get(level).next(); |
| } |
| |
| public boolean hasNext() { |
| update(); |
| return iterators.get(level).hasNext(); |
| } |
| |
| public void remove() { |
| iterators.get(level).remove(); |
| } |
| |
| public int getPriority() { |
| return level; |
| }; |
| } |
| } |