blob: b99ca3b66d44dd8d0eb0a367d63a7d8c7eeaf1ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
.FsVolumeReferences;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
.DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
* Worker class for Disk Balancer.
* <p>
* Here is the high level logic executed by this class. Users can submit disk
* balancing plans using submitPlan calls. After a set of sanity checks the plan
* is admitted and put into workMap.
* <p>
* The executePlan launches a thread that picks up work from workMap and hands
* it over to the BlockMover#copyBlocks function.
* <p>
* Constraints :
* <p>
* Only one plan can be executing in a datanode at any given time. This is
* ensured by checking the future handle of the worker thread in submitPlan.
*/
@InterfaceAudience.Private
public class DiskBalancer {
@VisibleForTesting
public static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
.class);
private final FsDatasetSpi<?> dataset;
private final String dataNodeUUID;
private final BlockMover blockMover;
private final ReentrantLock lock;
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
private boolean isDiskBalancerEnabled = false;
private ExecutorService scheduler;
private Future future;
private String planID;
private String planFile;
private DiskBalancerWorkStatus.Result currentResult;
private long bandwidth;
private long planValidityInterval;
private final Configuration config;
/**
* Constructs a Disk Balancer object. This object takes care of reading a
* NodePlan and executing it against a set of volumes.
*
* @param dataNodeUUID - Data node UUID
* @param conf - Hdfs Config
* @param blockMover - Object that supports moving blocks.
*/
public DiskBalancer(String dataNodeUUID,
Configuration conf, BlockMover blockMover) {
this.config = conf;
this.currentResult = Result.NO_PLAN;
this.blockMover = blockMover;
this.dataset = this.blockMover.getDataset();
this.dataNodeUUID = dataNodeUUID;
scheduler = Executors.newSingleThreadExecutor();
lock = new ReentrantLock();
workMap = new ConcurrentHashMap<>();
this.planID = ""; // to keep protobuf happy.
this.planFile = ""; // to keep protobuf happy.
this.isDiskBalancerEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
this.bandwidth = conf.getInt(
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
this.planValidityInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
/**
* Shutdown disk balancer services.
*/
public void shutdown() {
lock.lock();
boolean needShutdown = false;
try {
this.isDiskBalancerEnabled = false;
this.currentResult = Result.NO_PLAN;
if ((this.future != null) && (!this.future.isDone())) {
this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
scheduler.shutdown();
needShutdown = true;
}
} finally {
lock.unlock();
}
// no need to hold lock while shutting down executor.
if (needShutdown) {
shutdownExecutor();
}
}
/**
* Shutdown the executor.
*/
private void shutdownExecutor() {
final int secondsTowait = 10;
try {
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
LOG.error("Disk Balancer : Scheduler did not terminate.");
}
}
} catch (InterruptedException ex) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* Takes a client submitted plan and converts into a set of work items that
* can be executed by the blockMover.
*
* @param planId - A SHA-1 of the plan string
* @param planVersion - version of the plan string - for future use.
* @param planFileName - Plan file name
* @param planData - Plan data in json format
* @param force - Skip some validations and execute the plan file.
* @throws DiskBalancerException
*/
public void submitPlan(String planId, long planVersion, String planFileName,
String planData, boolean force)
throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
if ((this.future != null) && (!this.future.isDone())) {
LOG.error("Disk Balancer - Executing another plan, submitPlan failed.");
throw new DiskBalancerException("Executing another plan",
DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
}
NodePlan nodePlan = verifyPlan(planId, planVersion, planData, force);
createWorkPlan(nodePlan);
this.planID = planId;
this.planFile = planFileName;
this.currentResult = Result.PLAN_UNDER_PROGRESS;
executePlan();
} finally {
lock.unlock();
}
}
/**
* Get FsVolume by volume UUID.
* @param fsDataset
* @param volUuid
* @return FsVolumeSpi
*/
private static FsVolumeSpi getFsVolume(final FsDatasetSpi<?> fsDataset,
final String volUuid) {
FsVolumeSpi fsVolume = null;
try (FsVolumeReferences volumeReferences =
fsDataset.getFsVolumeReferences()) {
for (int i = 0; i < volumeReferences.size(); i++) {
if (volumeReferences.get(i).getStorageID().equals(volUuid)) {
fsVolume = volumeReferences.get(i);
break;
}
}
} catch (IOException e) {
LOG.warn("Disk Balancer - Error when closing volume references: ", e);
}
return fsVolume;
}
/**
* Returns the current work status of a previously submitted Plan.
*
* @return DiskBalancerWorkStatus.
* @throws DiskBalancerException
*/
public DiskBalancerWorkStatus queryWorkStatus() throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
// if we had a plan in progress, check if it is finished.
if (this.currentResult == Result.PLAN_UNDER_PROGRESS &&
this.future != null &&
this.future.isDone()) {
this.currentResult = Result.PLAN_DONE;
}
DiskBalancerWorkStatus status =
new DiskBalancerWorkStatus(this.currentResult, this.planID,
this.planFile);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
entry.getKey().getSourceVolBasePath(),
entry.getKey().getDestVolBasePath(),
entry.getValue());
status.addWorkEntry(workEntry);
}
return status;
} finally {
lock.unlock();
}
}
/**
* Cancels a running plan.
*
* @param planID - Hash of the plan to cancel.
* @throws DiskBalancerException
*/
public void cancelPlan(String planID) throws DiskBalancerException {
lock.lock();
boolean needShutdown = false;
try {
checkDiskBalancerEnabled();
if (this.planID == null ||
!this.planID.equals(planID) ||
this.planID.isEmpty()) {
LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " +
planID);
throw new DiskBalancerException("No such plan.",
DiskBalancerException.Result.NO_SUCH_PLAN);
}
if (!this.future.isDone()) {
this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
scheduler.shutdown();
needShutdown = true;
}
} finally {
lock.unlock();
}
// no need to hold lock while shutting down executor.
if (needShutdown) {
shutdownExecutor();
}
}
/**
* Returns a volume ID to Volume base path map.
*
* @return Json string of the volume map.
* @throws DiskBalancerException
*/
public String getVolumeNames() throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
return JsonUtil.toJsonString(getStorageIDToVolumeBasePathMap());
} catch (DiskBalancerException ex) {
throw ex;
} catch (IOException e) {
throw new DiskBalancerException("Internal error, Unable to " +
"create JSON string.", e,
DiskBalancerException.Result.INTERNAL_ERROR);
} finally {
lock.unlock();
}
}
/**
* Returns the current bandwidth.
*
* @return string representation of bandwidth.
* @throws DiskBalancerException
*/
public long getBandwidth() throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
return this.bandwidth;
} finally {
lock.unlock();
}
}
/**
* Throws if Disk balancer is disabled.
*
* @throws DiskBalancerException
*/
private void checkDiskBalancerEnabled()
throws DiskBalancerException {
if (!isDiskBalancerEnabled) {
throw new DiskBalancerException("Disk Balancer is not enabled.",
DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED);
}
}
/**
* Verifies that user provided plan is valid.
*
* @param planID - SHA-1 of the plan.
* @param planVersion - Version of the plan, for future use.
* @param plan - Plan String in Json.
* @param force - Skip verifying when the plan was generated.
* @return a NodePlan Object.
* @throws DiskBalancerException
*/
private NodePlan verifyPlan(String planID, long planVersion, String plan,
boolean force) throws DiskBalancerException {
Preconditions.checkState(lock.isHeldByCurrentThread());
verifyPlanVersion(planVersion);
NodePlan nodePlan = verifyPlanHash(planID, plan);
if (!force) {
verifyTimeStamp(nodePlan);
}
verifyNodeUUID(nodePlan);
return nodePlan;
}
/**
* Verifies the plan version is something that we support.
*
* @param planVersion - Long version.
* @throws DiskBalancerException
*/
private void verifyPlanVersion(long planVersion)
throws DiskBalancerException {
if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) ||
(planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) {
LOG.error("Disk Balancer - Invalid plan version.");
throw new DiskBalancerException("Invalid plan version.",
DiskBalancerException.Result.INVALID_PLAN_VERSION);
}
}
/**
* Verifies that plan matches the SHA-1 provided by the client.
*
* @param planID - SHA-1 Hex Bytes
* @param plan - Plan String
* @throws DiskBalancerException
*/
private NodePlan verifyPlanHash(String planID, String plan)
throws DiskBalancerException {
final long sha1Length = 40;
if (plan == null || plan.length() == 0) {
LOG.error("Disk Balancer - Invalid plan.");
throw new DiskBalancerException("Invalid plan.",
DiskBalancerException.Result.INVALID_PLAN);
}
if ((planID == null) ||
(planID.length() != sha1Length) ||
!DigestUtils.shaHex(plan.getBytes(Charset.forName("UTF-8")))
.equalsIgnoreCase(planID)) {
LOG.error("Disk Balancer - Invalid plan hash.");
throw new DiskBalancerException("Invalid or mis-matched hash.",
DiskBalancerException.Result.INVALID_PLAN_HASH);
}
try {
return NodePlan.parseJson(plan);
} catch (IOException ex) {
throw new DiskBalancerException("Parsing plan failed.", ex,
DiskBalancerException.Result.MALFORMED_PLAN);
}
}
/**
* Verifies that this plan is not older than 24 hours.
*
* @param plan - Node Plan
*/
private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException {
long now = Time.now();
long planTime = plan.getTimeStamp();
if ((planTime + planValidityInterval) < now) {
String planValidity = config.get(
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT);
if (planValidity.matches("[0-9]$")) {
planValidity += "ms";
}
String errorString = "Plan was generated more than " + planValidity
+ " ago";
LOG.error("Disk Balancer - " + errorString);
throw new DiskBalancerException(errorString,
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
}
}
/**
* Verify Node UUID.
*
* @param plan - Node Plan
*/
private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException {
if ((plan.getNodeUUID() == null) ||
!plan.getNodeUUID().equals(this.dataNodeUUID)) {
LOG.error("Disk Balancer - Plan was generated for another node.");
throw new DiskBalancerException(
"Plan was generated for another node.",
DiskBalancerException.Result.DATANODE_ID_MISMATCH);
}
}
/**
* Convert a node plan to DiskBalancerWorkItem that Datanode can execute.
*
* @param plan - Node Plan
*/
private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
Preconditions.checkState(lock.isHeldByCurrentThread());
// Cleanup any residual work in the map.
workMap.clear();
Map<String, String> storageIDToVolBasePathMap =
getStorageIDToVolumeBasePathMap();
for (Step step : plan.getVolumeSetPlans()) {
String sourceVolUuid = step.getSourceVolume().getUuid();
String destVolUuid = step.getDestinationVolume().getUuid();
String sourceVolBasePath = storageIDToVolBasePathMap.get(sourceVolUuid);
if (sourceVolBasePath == null) {
final String errMsg = "Disk Balancer - Unable to find volume: "
+ step.getSourceVolume().getPath() + ". SubmitPlan failed.";
LOG.error(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_VOLUME);
}
String destVolBasePath = storageIDToVolBasePathMap.get(destVolUuid);
if (destVolBasePath == null) {
final String errMsg = "Disk Balancer - Unable to find volume: "
+ step.getDestinationVolume().getPath() + ". SubmitPlan failed.";
LOG.error(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_VOLUME);
}
VolumePair volumePair = new VolumePair(sourceVolUuid,
sourceVolBasePath, destVolUuid, destVolBasePath);
createWorkPlan(volumePair, step);
}
}
/**
* Returns volume UUID to volume base path map.
*
* @return Map
* @throws DiskBalancerException
*/
private Map<String, String> getStorageIDToVolumeBasePathMap()
throws DiskBalancerException {
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
references.close();
}
} catch (IOException ex) {
LOG.error("Disk Balancer - Internal Error.", ex);
throw new DiskBalancerException("Internal error", ex,
DiskBalancerException.Result.INTERNAL_ERROR);
}
return storageIDToVolBasePathMap;
}
/**
* Starts Executing the plan, exits when the plan is done executing.
*/
private void executePlan() {
Preconditions.checkState(lock.isHeldByCurrentThread());
this.blockMover.setRunnable();
if (this.scheduler.isShutdown()) {
this.scheduler = Executors.newSingleThreadExecutor();
}
this.future = scheduler.submit(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
planFile, planID);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
blockMover.setRunnable();
blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
}
});
}
/**
* Insert work items to work map.
* @param volumePair - VolumePair
* @param step - Move Step
*/
private void createWorkPlan(final VolumePair volumePair, Step step)
throws DiskBalancerException {
if (volumePair.getSourceVolUuid().equals(volumePair.getDestVolUuid())) {
final String errMsg = "Disk Balancer - Source and destination volumes " +
"are same: " + volumePair.getSourceVolUuid();
LOG.warn(errMsg);
throw new DiskBalancerException(errMsg,
DiskBalancerException.Result.INVALID_MOVE);
}
long bytesToMove = step.getBytesToMove();
// In case we have a plan with more than
// one line of same VolumePair
// we compress that into one work order.
if (workMap.containsKey(volumePair)) {
bytesToMove += workMap.get(volumePair).getBytesToCopy();
}
DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
// all these values can be zero, if so we will use
// values from configuration.
work.setBandwidth(step.getBandwidth());
work.setTolerancePercent(step.getTolerancePercent());
work.setMaxDiskErrors(step.getMaxDiskErrors());
workMap.put(volumePair, work);
}
/**
* BlockMover supports moving blocks across Volumes.
*/
public interface BlockMover {
/**
* Copies blocks from a set of volumes.
*
* @param pair - Source and Destination Volumes.
* @param item - Number of bytes to move from volumes.
*/
void copyBlocks(VolumePair pair, DiskBalancerWorkItem item);
/**
* Begin the actual copy operations. This is useful in testing.
*/
void setRunnable();
/**
* Tells copyBlocks to exit from the copy routine.
*/
void setExitFlag();
/**
* Returns a pointer to the current dataset we are operating against.
*
* @return FsDatasetSpi
*/
FsDatasetSpi getDataset();
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
long getStartTime();
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
long getElapsedSeconds();
}
/**
* Holds source and dest volumes UUIDs and their BasePaths
* that disk balancer will be operating against.
*/
public static class VolumePair {
private final String sourceVolUuid;
private final String destVolUuid;
private final String sourceVolBasePath;
private final String destVolBasePath;
/**
* Constructs a volume pair.
* @param sourceVolUuid - Source Volume
* @param sourceVolBasePath - Source Volume Base Path
* @param destVolUuid - Destination Volume
* @param destVolBasePath - Destination Volume Base Path
*/
public VolumePair(final String sourceVolUuid,
final String sourceVolBasePath, final String destVolUuid,
final String destVolBasePath) {
this.sourceVolUuid = sourceVolUuid;
this.sourceVolBasePath = sourceVolBasePath;
this.destVolUuid = destVolUuid;
this.destVolBasePath = destVolBasePath;
}
/**
* Gets source volume UUID.
*
* @return UUID String
*/
public String getSourceVolUuid() {
return sourceVolUuid;
}
/**
* Gets source volume base path.
* @return String
*/
public String getSourceVolBasePath() {
return sourceVolBasePath;
}
/**
* Gets destination volume UUID.
*
* @return UUID String
*/
public String getDestVolUuid() {
return destVolUuid;
}
/**
* Gets desitnation volume base path.
*
* @return String
*/
public String getDestVolBasePath() {
return destVolBasePath;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VolumePair that = (VolumePair) o;
return sourceVolUuid.equals(that.sourceVolUuid)
&& sourceVolBasePath.equals(that.sourceVolBasePath)
&& destVolUuid.equals(that.destVolUuid)
&& destVolBasePath.equals(that.destVolBasePath);
}
@Override
public int hashCode() {
final int primeNum = 31;
final List<String> volumeStrList = Arrays.asList(sourceVolUuid,
sourceVolBasePath, destVolUuid, destVolBasePath);
int result = 1;
for (String str : volumeStrList) {
result = (result * primeNum) + str.hashCode();
}
return result;
}
}
/**
* Actual DataMover class for DiskBalancer.
* <p>
*/
public static class DiskBalancerMover implements BlockMover {
private final FsDatasetSpi dataset;
private long diskBandwidth;
private long blockTolerance;
private long maxDiskErrors;
private int poolIndex;
private AtomicBoolean shouldRun;
private long startTime;
private long secondsElapsed;
/**
* Constructs diskBalancerMover.
*
* @param dataset Dataset
* @param conf Configuration
*/
public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
this.dataset = dataset;
shouldRun = new AtomicBoolean(false);
this.diskBandwidth = conf.getLong(
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
this.blockTolerance = conf.getLong(
DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE,
DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT);
this.maxDiskErrors = conf.getLong(
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS,
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
// Since these are user provided values make sure it is sane
// or ignore faulty values.
if (this.diskBandwidth <= 0) {
LOG.debug("Found 0 or less as max disk throughput, ignoring config " +
"value. value : " + diskBandwidth);
diskBandwidth =
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT;
}
if (this.blockTolerance <= 0) {
LOG.debug("Found 0 or less for block tolerance value, ignoring config" +
"value. value : " + blockTolerance);
blockTolerance =
DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT;
}
if (this.maxDiskErrors < 0) {
LOG.debug("Found less than 0 for maxDiskErrors value, ignoring " +
"config value. value : " + maxDiskErrors);
maxDiskErrors =
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT;
}
}
/**
* Sets Diskmover copyblocks into runnable state.
*/
@Override
public void setRunnable() {
this.shouldRun.set(true);
}
/**
* Signals copy block to exit.
*/
@Override
public void setExitFlag() {
this.shouldRun.set(false);
}
/**
* Returns the shouldRun boolean flag.
*/
public boolean shouldRun() {
return this.shouldRun.get();
}
/**
* Checks if a given block is less than needed size to meet our goal.
*
* @param blockSize - block len
* @param item - Work item
* @return true if this block meets our criteria, false otherwise.
*/
private boolean isLessThanNeeded(long blockSize,
DiskBalancerWorkItem item) {
long bytesToCopy = item.getBytesToCopy() - item.getBytesCopied();
bytesToCopy = bytesToCopy +
((bytesToCopy * getBlockTolerancePercentage(item)) / 100);
return (blockSize <= bytesToCopy) ? true : false;
}
/**
* Returns the default block tolerance if the plan does not have value of
* tolerance specified.
*
* @param item - DiskBalancerWorkItem
* @return long
*/
private long getBlockTolerancePercentage(DiskBalancerWorkItem item) {
return item.getTolerancePercent() <= 0 ? this.blockTolerance :
item.getTolerancePercent();
}
/**
* Inflates bytesCopied and returns true or false. This allows us to stop
* copying if we have reached close enough.
*
* @param item DiskBalancerWorkItem
* @return -- false if we need to copy more, true if we are done
*/
private boolean isCloseEnough(DiskBalancerWorkItem item) {
long temp = item.getBytesCopied() +
((item.getBytesCopied() * getBlockTolerancePercentage(item)) / 100);
return (item.getBytesToCopy() >= temp) ? false : true;
}
/**
* Returns disk bandwidth associated with this plan, if none is specified
* returns the global default.
*
* @param item DiskBalancerWorkItem.
* @return MB/s - long
*/
private long getDiskBandwidth(DiskBalancerWorkItem item) {
return item.getBandwidth() <= 0 ? this.diskBandwidth : item
.getBandwidth();
}
/**
* Computes sleep delay needed based on the block that just got copied. we
* copy using a burst mode, that is we let the copy proceed in full
* throttle. Once a copy is done, we compute how many bytes have been
* transferred and try to average it over the user specified bandwidth. In
* other words, This code implements a poor man's token bucket algorithm for
* traffic shaping.
*
* @param bytesCopied - byteCopied.
* @param timeUsed in milliseconds
* @param item DiskBalancerWorkItem
* @return sleep delay in Milliseconds.
*/
@VisibleForTesting
public long computeDelay(long bytesCopied, long timeUsed,
DiskBalancerWorkItem item) {
// we had an overflow, ignore this reading and continue.
if (timeUsed == 0) {
return 0;
}
final int megaByte = 1024 * 1024;
if (bytesCopied < megaByte) {
return 0;
}
long bytesInMB = bytesCopied / megaByte;
// converting disk bandwidth in MB/millisec
float bandwidth = getDiskBandwidth(item) / 1000f;
float delay = ((long) (bytesInMB / bandwidth) - timeUsed);
return (delay <= 0) ? 0 : (long) delay;
}
/**
* Returns maximum errors to tolerate for the specific plan or the default.
*
* @param item - DiskBalancerWorkItem
* @return maximum error counts to tolerate.
*/
private long getMaxError(DiskBalancerWorkItem item) {
return item.getMaxDiskErrors() <= 0 ? this.maxDiskErrors :
item.getMaxDiskErrors();
}
/**
* Gets the next block that we can copy, returns null if we cannot find a
* block that fits our parameters or if have run out of blocks.
*
* @param iter Block Iter
* @param item - Work item
* @return Extended block or null if no copyable block is found.
*/
private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
DiskBalancerWorkItem item) {
while (!iter.atEnd() && item.getErrorCount() <= getMaxError(item)) {
try {
ExtendedBlock block = iter.nextBlock();
if(null == block){
LOG.info("NextBlock call returned null. No valid block to copy. {}",
item.toJson());
return null;
}
// A valid block is a finalized block, we iterate until we get
// finalized blocks
if (!this.dataset.isValidBlock(block)) {
continue;
}
// We don't look for the best, we just do first fit
if (isLessThanNeeded(block.getNumBytes(), item)) {
return block;
}
} catch (IOException e) {
item.incErrorCount();
}
}
if (item.getErrorCount() > getMaxError(item)) {
item.setErrMsg("Error count exceeded.");
LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
item.getErrorCount(), item.getMaxDiskErrors());
}
return null;
}
/**
* Opens all Block pools on a given volume.
*
* @param source Source
* @param poolIters List of PoolIters to maintain.
*/
private void openPoolIters(FsVolumeSpi source, List<FsVolumeSpi
.BlockIterator> poolIters) {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(poolIters);
for (String blockPoolID : source.getBlockPoolList()) {
poolIters.add(source.newBlockIterator(blockPoolID,
"DiskBalancerSource"));
}
}
/**
* Returns the next block that we copy from all the block pools. This
* function looks across all block pools to find the next block to copy.
*
* @param poolIters - List of BlockIterators
* @return ExtendedBlock.
*/
ExtendedBlock getNextBlock(List<FsVolumeSpi.BlockIterator> poolIters,
DiskBalancerWorkItem item) {
Preconditions.checkNotNull(poolIters);
int currentCount = 0;
ExtendedBlock block = null;
while (block == null && currentCount < poolIters.size()) {
currentCount++;
int index = poolIndex++ % poolIters.size();
FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(index);
block = getBlockToCopy(currentPoolIter, item);
}
if (block == null) {
try {
item.setErrMsg("No source blocks found to move.");
LOG.error("No movable source blocks found. {}", item.toJson());
} catch (IOException e) {
LOG.error("Unable to get json from Item.");
}
}
return block;
}
/**
* Close all Pool Iters.
*
* @param poolIters List of BlockIters
*/
private void closePoolIters(List<FsVolumeSpi.BlockIterator> poolIters) {
Preconditions.checkNotNull(poolIters);
for (FsVolumeSpi.BlockIterator iter : poolIters) {
try {
iter.close();
} catch (IOException ex) {
LOG.error("Error closing a block pool iter. ex: {}", ex);
}
}
}
/**
* Copies blocks from a set of volumes.
*
* @param pair - Source and Destination Volumes.
* @param item - Number of bytes to move from volumes.
*/
@Override
public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
String sourceVolUuid = pair.getSourceVolUuid();
String destVolUuuid = pair.getDestVolUuid();
// When any of the DiskBalancerWorkItem volumes are not
// available, return after setting error in item.
FsVolumeSpi source = getFsVolume(this.dataset, sourceVolUuid);
if (source == null) {
final String errMsg = "Disk Balancer - Unable to find source volume: "
+ pair.getDestVolBasePath();
LOG.error(errMsg);
item.setErrMsg(errMsg);
return;
}
FsVolumeSpi dest = getFsVolume(this.dataset, destVolUuuid);
if (dest == null) {
final String errMsg = "Disk Balancer - Unable to find dest volume: "
+ pair.getDestVolBasePath();
LOG.error(errMsg);
item.setErrMsg(errMsg);
return;
}
if (source.isTransientStorage() || dest.isTransientStorage()) {
final String errMsg = "Disk Balancer - Unable to support " +
"transient storage type.";
LOG.error(errMsg);
item.setErrMsg(errMsg);
return;
}
List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
startTime = Time.now();
item.setStartTime(startTime);
secondsElapsed = 0;
try {
openPoolIters(source, poolIters);
if (poolIters.size() == 0) {
LOG.error("No block pools found on volume. volume : {}. Exiting.",
source.getBaseURI());
return;
}
while (shouldRun()) {
try {
// Check for the max error count constraint.
if (item.getErrorCount() > getMaxError(item)) {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBaseURI(),
dest.getBaseURI(), item.getErrorCount());
break;
}
// Check for the block tolerance constraint.
if (isCloseEnough(item)) {
LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
"blocks.",
source.getBaseURI(), dest.getBaseURI(),
item.getBytesCopied(), item.getBlocksCopied());
this.setExitFlag();
continue;
}
ExtendedBlock block = getNextBlock(poolIters, item);
// we are not able to find any blocks to copy.
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"Dest:{}", source.getBaseURI(), dest.getBaseURI());
this.setExitFlag();
continue;
}
// check if someone told us exit, treat this as an interruption
// point
// for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time.
if (!shouldRun()) {
continue;
}
long timeUsed;
// There is a race condition here, but we will get an IOException
// if dest has no space, which we handle anyway.
if (dest.getAvailable() > item.getBytesToCopy()) {
long begin = System.nanoTime();
this.dataset.moveBlockAcrossVolumes(block, dest);
long now = System.nanoTime();
timeUsed = (now - begin) > 0 ? now - begin : 0;
} else {
// Technically it is possible for us to find a smaller block and
// make another copy, but opting for the safer choice of just
// exiting here.
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBaseURI(), block.getNumBytes());
break;
}
LOG.debug("Moved block with size {} from {} to {}",
block.getNumBytes(), source.getBaseURI(),
dest.getBaseURI());
// Check for the max throughput constraint.
// We sleep here to keep the promise that we will not
// copy more than Max MB/sec. we sleep enough time
// to make sure that our promise is good on average.
// Because we sleep, if a shutdown or cancel call comes in
// we exit via Thread Interrupted exception.
Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS
.toMillis(timeUsed), item));
// We delay updating the info to avoid confusing the user.
// This way we report the copy only if it is under the
// throughput threshold.
item.incCopiedSoFar(block.getNumBytes());
item.incBlocksCopied();
secondsElapsed = TimeUnit.MILLISECONDS.toSeconds(Time.now() -
startTime);
item.setSecondsElapsed(secondsElapsed);
} catch (IOException ex) {
LOG.error("Exception while trying to copy blocks. error: {}", ex);
item.incErrorCount();
} catch (InterruptedException e) {
LOG.error("Copy Block Thread interrupted, exiting the copy.");
Thread.currentThread().interrupt();
item.incErrorCount();
this.setExitFlag();
} catch (RuntimeException ex) {
// Exiting if any run time exceptions.
LOG.error("Got an unexpected Runtime Exception {}", ex);
item.incErrorCount();
this.setExitFlag();
}
}
} finally {
// Close all Iters.
closePoolIters(poolIters);
}
}
/**
* Returns a pointer to the current dataset we are operating against.
*
* @return FsDatasetSpi
*/
@Override
public FsDatasetSpi getDataset() {
return dataset;
}
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
@Override
public long getStartTime() {
return startTime;
}
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
@Override
public long getElapsedSeconds() {
return secondsElapsed;
}
}
}