blob: 860d1d261f806a814a4c869d2c0436482e3cd590 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* In the Standby Node, we can receive messages about blocks
* before they are actually available in the namespace, or while
* they have an outdated state in the namespace. In those cases,
* we queue those block-related messages in this structure.
* */
class PendingDataNodeMessages {
Map<Block, Queue<ReportedBlockInfo>> queueByBlockId =
Maps.newHashMap();
private int count = 0;
static class ReportedBlockInfo {
private final Block block;
private final DatanodeDescriptor dn;
private final ReplicaState reportedState;
ReportedBlockInfo(DatanodeDescriptor dn, Block block,
ReplicaState reportedState) {
this.dn = dn;
this.block = block;
this.reportedState = reportedState;
}
Block getBlock() {
return block;
}
DatanodeDescriptor getNode() {
return dn;
}
ReplicaState getReportedState() {
return reportedState;
}
@Override
public String toString() {
return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+ ", reportedState=" + reportedState + "]";
}
}
void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
new ReportedBlockInfo(dn, block, reportedState));
count++;
}
/**
* @return any messages that were previously queued for the given block,
* or null if no messages were queued.
*/
Queue<ReportedBlockInfo> takeBlockQueue(Block block) {
Queue<ReportedBlockInfo> queue = queueByBlockId.remove(block);
if (queue != null) {
count -= queue.size();
}
return queue;
}
private Queue<ReportedBlockInfo> getBlockQueue(Block block) {
Queue<ReportedBlockInfo> queue = queueByBlockId.get(block);
if (queue == null) {
queue = Lists.newLinkedList();
queueByBlockId.put(block, queue);
}
return queue;
}
public int count() {
return count ;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
queueByBlockId.entrySet()) {
sb.append("Block " + entry.getKey() + ":\n");
for (ReportedBlockInfo rbi : entry.getValue()) {
sb.append(" ").append(rbi).append("\n");
}
}
return sb.toString();
}
public Iterable<ReportedBlockInfo> takeAll() {
List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
count);
for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {
rbis.addAll(q);
}
queueByBlockId.clear();
count = 0;
return rbis;
}
}