blob: e07cf9bb2a0463221af408631a0f25834e573ffd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.PrintWriter;
import java.sql.Time;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.util.Daemon;
/***************************************************
* PendingReplicationBlocks does the bookkeeping of all
* blocks that are getting replicated.
*
* It does the following:
* 1) record blocks that are getting replicated at this instant.
* 2) a coarse grain timer to track age of replication request
* 3) a thread that periodically identifies replication-requests
* that never made it.
*
***************************************************/
class PendingReplicationBlocks {
private static final Log LOG = BlockManager.LOG;
private Map<Block, PendingBlockInfo> pendingReplications;
private ArrayList<Block> timedOutItems;
Daemon timerThread = null;
private volatile boolean fsRunning = true;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long timeout = 5 * 60 * 1000;
private long defaultRecheckInterval = 5 * 60 * 1000;
PendingReplicationBlocks(long timeoutPeriod) {
if ( timeoutPeriod > 0 ) {
this.timeout = timeoutPeriod;
}
pendingReplications = new HashMap<Block, PendingBlockInfo>();
timedOutItems = new ArrayList<Block>();
}
void start() {
timerThread = new Daemon(new PendingReplicationMonitor());
timerThread.start();
}
/**
* Add a block to the list of pending Replications
*/
void add(Block block, int numReplicas) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found == null) {
pendingReplications.put(block, new PendingBlockInfo(numReplicas));
} else {
found.incrementReplicas(numReplicas);
found.setTimeStamp();
}
}
}
/**
* One replication request for this block has finished.
* Decrement the number of pending replication requests
* for this block.
*/
void remove(Block block) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block);
}
found.decrementReplicas();
if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block);
}
}
}
}
/**
* The total number of blocks that are undergoing replication
*/
int size() {
return pendingReplications.size();
}
/**
* How many copies of this block is pending replication?
*/
int getNumReplicas(Block block) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
return found.getNumReplicas();
}
}
return 0;
}
/**
* Returns a list of blocks that have timed out their
* replication requests. Returns null if no blocks have
* timed out.
*/
Block[] getTimedOutBlocks() {
synchronized (timedOutItems) {
if (timedOutItems.size() <= 0) {
return null;
}
Block[] blockList = timedOutItems.toArray(
new Block[timedOutItems.size()]);
timedOutItems.clear();
return blockList;
}
}
/**
* An object that contains information about a block that
* is being replicated. It records the timestamp when the
* system started replicating the most recent copy of this
* block. It also records the number of replication
* requests that are in progress.
*/
static class PendingBlockInfo {
private long timeStamp;
private int numReplicasInProgress;
PendingBlockInfo(int numReplicas) {
this.timeStamp = now();
this.numReplicasInProgress = numReplicas;
}
long getTimeStamp() {
return timeStamp;
}
void setTimeStamp() {
timeStamp = now();
}
void incrementReplicas(int increment) {
numReplicasInProgress += increment;
}
void decrementReplicas() {
numReplicasInProgress--;
assert(numReplicasInProgress >= 0);
}
int getNumReplicas() {
return numReplicasInProgress;
}
}
/*
* A periodic thread that scans for blocks that never finished
* their replication request.
*/
class PendingReplicationMonitor implements Runnable {
public void run() {
while (fsRunning) {
long period = Math.min(defaultRecheckInterval, timeout);
try {
pendingReplicationCheck();
Thread.sleep(period);
} catch (InterruptedException ie) {
if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);
}
}
}
}
/**
* Iterate through all items and detect timed-out items
*/
void pendingReplicationCheck() {
synchronized (pendingReplications) {
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
pendingReplications.entrySet().iterator();
long now = now();
if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor checking Q");
}
while (iter.hasNext()) {
Map.Entry<Block, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue();
if (now > pendingBlock.getTimeStamp() + timeout) {
Block block = entry.getKey();
synchronized (timedOutItems) {
timedOutItems.add(block);
}
LOG.warn("PendingReplicationMonitor timed out " + block);
iter.remove();
}
}
}
}
}
/*
* Shuts down the pending replication monitor thread.
* Waits for the thread to exit.
*/
void stop() {
fsRunning = false;
if(timerThread == null) return;
timerThread.interrupt();
try {
timerThread.join(3000);
} catch (InterruptedException ie) {
}
}
/**
* Iterate through all items and print them.
*/
void metaSave(PrintWriter out) {
synchronized (pendingReplications) {
out.println("Metasave: Blocks being replicated: " +
pendingReplications.size());
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
pendingReplications.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Block, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue();
Block block = entry.getKey();
out.println(block +
" StartTime: " + new Time(pendingBlock.timeStamp) +
" NumReplicaInProgress: " +
pendingBlock.numReplicasInProgress);
}
}
}
}