| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; |
| |
| import java.io.BufferedInputStream; |
| import java.io.DataInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.RandomAccessFile; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.Scanner; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.ForkJoinTask; |
| import java.util.concurrent.RecursiveAction; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSUtilClient; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; |
| import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; |
| import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; |
| import org.apache.hadoop.hdfs.server.datanode.DataStorage; |
| import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.MultipleIOException; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.apache.hadoop.util.DiskChecker; |
| import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| import org.apache.hadoop.util.Timer; |
| import org.apache.solr.cloud.hdfs.HdfsTestUtil; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * A block pool slice represents a portion of a block pool stored on a volume. |
| * Taken together, all BlockPoolSlices sharing a block pool ID across a |
| * cluster represent a single block pool. |
| * |
| * This class is synchronized by {@link FsVolumeImpl}. |
| */ |
| public class BlockPoolSlice { |
| public static final Object SOLR_HACK_FOR_CLASS_VERIFICATION = new Object(); |
| |
| // Apparently the Hadoop code expectes upper-case LOG, so... |
| static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class); //nowarn |
| |
| private final String bpid; |
| private final FsVolumeImpl volume; // volume to which this BlockPool belongs to |
| private final File currentDir; // StorageDirectory/current/bpid/current |
| // directory where finalized replicas are stored |
| private final File finalizedDir; |
| private final File lazypersistDir; |
| private final File rbwDir; // directory store RBW replica |
| private final File tmpDir; // directory store Temporary replica |
| private final int ioFileBufferSize; |
| @VisibleForTesting |
| public static final String DU_CACHE_FILE = "dfsUsed"; |
| private final Runnable shutdownHook; |
| private volatile boolean dfsUsedSaved = false; |
| private static final int SHUTDOWN_HOOK_PRIORITY = 30; |
| private final boolean deleteDuplicateReplicas; |
| private static final String REPLICA_CACHE_FILE = "replicas"; |
| private final long replicaCacheExpiry = 5*60*1000; |
| private AtomicLong numOfBlocks = new AtomicLong(); |
| private final long cachedDfsUsedCheckTime; |
| private final Timer timer; |
| private final int maxDataLength; |
| private final FileIoProvider fileIoProvider; |
| |
| private static ForkJoinPool addReplicaThreadPool = null; |
| private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime |
| .getRuntime().availableProcessors(); |
| private static final Comparator<File> FILE_COMPARATOR = |
| new Comparator<File>() { |
| @Override |
| public int compare(File f1, File f2) { |
| return f1.getName().compareTo(f2.getName()); |
| } |
| }; |
| |
| /** |
| * Create a blook pool slice |
| * @param bpid Block pool Id |
| * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to |
| * @param bpDir directory corresponding to the BlockPool |
| * @param conf configuration |
| * @param timer include methods for getting time |
| * @throws IOException Error making directories |
| */ |
| BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, |
| Configuration conf, Timer timer) throws IOException { |
| this.bpid = bpid; |
| this.volume = volume; |
| this.fileIoProvider = volume.getFileIoProvider(); |
| this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); |
| this.finalizedDir = new File( |
| currentDir, DataStorage.STORAGE_DIR_FINALIZED); |
| this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST); |
| if (!this.finalizedDir.exists()) { |
| if (!this.finalizedDir.mkdirs()) { |
| throw new IOException("Failed to mkdirs " + this.finalizedDir); |
| } |
| } |
| |
| this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); |
| |
| this.deleteDuplicateReplicas = conf.getBoolean( |
| DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, |
| DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT); |
| |
| this.cachedDfsUsedCheckTime = |
| conf.getLong( |
| DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, |
| DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS); |
| |
| this.maxDataLength = conf.getInt( |
| CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, |
| CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); |
| |
| this.timer = timer; |
| |
| // Files that were being written when the datanode was last shutdown |
| // are now moved back to the data directory. It is possible that |
| // in the future, we might want to do some sort of datanode-local |
| // recovery for these blocks. For example, crc validation. |
| // |
| this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); |
| if (tmpDir.exists()) { |
| fileIoProvider.fullyDelete(volume, tmpDir); |
| } |
| this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); |
| |
| // create the rbw and tmp directories if they don't exist. |
| fileIoProvider.mkdirs(volume, rbwDir); |
| fileIoProvider.mkdirs(volume, tmpDir); |
| |
| if (addReplicaThreadPool == null) { |
| // initialize add replica fork join pool |
| initializeAddReplicaPool(conf); |
| } |
| // Make the dfs usage to be saved during shutdown. |
| shutdownHook = new Runnable() { |
| @Override |
| public void run() { |
| addReplicaThreadPool.shutdownNow(); |
| } |
| }; |
| ShutdownHookManager.get().addShutdownHook(shutdownHook, |
| SHUTDOWN_HOOK_PRIORITY); |
| } |
| |
| private synchronized void initializeAddReplicaPool(Configuration conf) { |
| if (addReplicaThreadPool == null) { |
| FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset(); |
| int numberOfBlockPoolSlice = dataset.getVolumeCount() |
| * dataset.getBPServiceCount(); |
| int poolsize = Math.max(numberOfBlockPoolSlice, |
| VOLUMES_REPLICA_ADD_THREADPOOL_SIZE); |
| // Default pool sizes is max of (volume * number of bp_service) and |
| // number of processor. |
| int parallelism = conf.getInt( |
| DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY, |
| poolsize); |
| |
| // Needed for SOLR-9515 and HDFS-14251 |
| ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = new HdfsTestUtil.HDFSForkJoinThreadFactory(); |
| addReplicaThreadPool = new ForkJoinPool(parallelism, threadFactory, null, false); |
| } |
| } |
| |
| File getDirectory() { |
| return currentDir.getParentFile(); |
| } |
| |
| File getFinalizedDir() { |
| return finalizedDir; |
| } |
| |
| File getLazypersistDir() { |
| return lazypersistDir; |
| } |
| |
| File getRbwDir() { |
| return rbwDir; |
| } |
| |
| File getTmpDir() { |
| return tmpDir; |
| } |
| |
| /** Run DU on local drives. It must be synchronized from caller. */ |
| void decDfsUsed(long value) { |
| } |
| |
| long getDfsUsed() throws IOException { |
| return 0L; |
| } |
| |
| void incDfsUsed(long value) { |
| } |
| |
| /** |
| * Temporary files. They get moved to the finalized block directory when |
| * the block is finalized. |
| */ |
| File createTmpFile(Block b) throws IOException { |
| File f = new File(tmpDir, b.getBlockName()); |
| File tmpFile = DatanodeUtil.createFileWithExistsCheck( |
| volume, b, f, fileIoProvider); |
| // If any exception during creation, its expected that counter will not be |
| // incremented, So no need to decrement |
| incrNumBlocks(); |
| return tmpFile; |
| } |
| |
| /** |
| * RBW files. They get moved to the finalized block directory when |
| * the block is finalized. |
| */ |
| File createRbwFile(Block b) throws IOException { |
| File f = new File(rbwDir, b.getBlockName()); |
| File rbwFile = DatanodeUtil.createFileWithExistsCheck( |
| volume, b, f, fileIoProvider); |
| // If any exception during creation, its expected that counter will not be |
| // incremented, So no need to decrement |
| incrNumBlocks(); |
| return rbwFile; |
| } |
| |
| File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException { |
| File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); |
| fileIoProvider.mkdirsWithExistsCheck(volume, blockDir); |
| return FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir); |
| } |
| |
| /** |
| * Move a persisted replica from lazypersist directory to a subdirectory |
| * under finalized. |
| */ |
| ReplicaInfo activateSavedReplica(ReplicaInfo replicaInfo, |
| RamDiskReplica replicaState) throws IOException { |
| File metaFile = replicaState.getSavedMetaFile(); |
| File blockFile = replicaState.getSavedBlockFile(); |
| final long blockId = replicaInfo.getBlockId(); |
| final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId); |
| final File targetBlockFile = new File(blockDir, blockFile.getName()); |
| final File targetMetaFile = new File(blockDir, metaFile.getName()); |
| fileIoProvider.moveFile(volume, blockFile, targetBlockFile); |
| FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile); |
| fileIoProvider.moveFile(volume, metaFile, targetMetaFile); |
| FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile); |
| |
| ReplicaInfo newReplicaInfo = |
| new ReplicaBuilder(ReplicaState.FINALIZED) |
| .setBlockId(blockId) |
| .setLength(replicaInfo.getBytesOnDisk()) |
| .setGenerationStamp(replicaInfo.getGenerationStamp()) |
| .setFsVolume(replicaState.getLazyPersistVolume()) |
| .setDirectoryToUse(targetBlockFile.getParentFile()) |
| .build(); |
| return newReplicaInfo; |
| } |
| |
| void checkDirs() throws DiskErrorException { |
| DiskChecker.checkDir(finalizedDir); |
| DiskChecker.checkDir(tmpDir); |
| DiskChecker.checkDir(rbwDir); |
| } |
| |
| void getVolumeMap(ReplicaMap volumeMap, |
| final RamDiskReplicaTracker lazyWriteReplicaMap) |
| throws IOException { |
| // Recover lazy persist replicas, they will be added to the volumeMap |
| // when we scan the finalized directory. |
| if (lazypersistDir.exists()) { |
| int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir); |
| FsDatasetImpl.LOG.info( |
| "Recovered " + numRecovered + " replicas from " + lazypersistDir); |
| } |
| |
| boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); |
| if (!success) { |
| List<IOException> exceptions = Collections |
| .synchronizedList(new ArrayList<IOException>()); |
| Queue<RecursiveAction> subTaskQueue = |
| new ConcurrentLinkedQueue<RecursiveAction>(); |
| |
| // add finalized replicas |
| AddReplicaProcessor task = new AddReplicaProcessor(volumeMap, |
| finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue); |
| ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task); |
| |
| // add rbw replicas |
| task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap, |
| false, exceptions, subTaskQueue); |
| ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task); |
| |
| try { |
| finalizedTask.get(); |
| rbwTask.get(); |
| } catch (InterruptedException | ExecutionException e) { |
| exceptions.add(new IOException( |
| "Failed to start sub tasks to add replica in replica map :" |
| + e.getMessage())); |
| } |
| |
| //wait for all the tasks to finish. |
| waitForSubTaskToFinish(subTaskQueue, exceptions); |
| } |
| } |
| |
| /** |
| * Wait till all the recursive task for add replica to volume completed. |
| * |
| * @param subTaskQueue |
| * {@link AddReplicaProcessor} tasks list. |
| * @param exceptions |
| * exceptions occurred in sub tasks. |
| * @throws IOException |
| * throw if any sub task or multiple sub tasks failed. |
| */ |
| private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue, |
| List<IOException> exceptions) throws IOException { |
| while (!subTaskQueue.isEmpty()) { |
| RecursiveAction task = subTaskQueue.poll(); |
| if (task != null) { |
| task.join(); |
| } |
| } |
| if (!exceptions.isEmpty()) { |
| throw MultipleIOException.createIOException(exceptions); |
| } |
| } |
| |
| /** |
| * Recover an unlinked tmp file on datanode restart. If the original block |
| * does not exist, then the tmp file is renamed to be the |
| * original file name and the original name is returned; otherwise the tmp |
| * file is deleted and null is returned. |
| */ |
| File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException { |
| File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp); |
| if (blockFile.exists()) { |
| // If the original block file still exists, then no recovery is needed. |
| if (!fileIoProvider.delete(volume, unlinkedTmp)) { |
| throw new IOException("Unable to cleanup unlinked tmp file " + |
| unlinkedTmp); |
| } |
| return null; |
| } else { |
| fileIoProvider.rename(volume, unlinkedTmp, blockFile); |
| return blockFile; |
| } |
| } |
| |
| |
| /** |
| * Move replicas in the lazy persist directory to their corresponding locations |
| * in the finalized directory. |
| * @return number of replicas recovered. |
| */ |
| private int moveLazyPersistReplicasToFinalized(File source) |
| throws IOException { |
| File[] files = fileIoProvider.listFiles(volume, source); |
| int numRecovered = 0; |
| for (File file : files) { |
| if (file.isDirectory()) { |
| numRecovered += moveLazyPersistReplicasToFinalized(file); |
| } |
| |
| if (Block.isMetaFilename(file.getName())) { |
| File metaFile = file; |
| File blockFile = Block.metaToBlockFile(metaFile); |
| long blockId = Block.filename2id(blockFile.getName()); |
| File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId); |
| |
| if (blockFile.exists()) { |
| |
| try { |
| fileIoProvider.mkdirsWithExistsCheck(volume, targetDir); |
| } catch(IOException ioe) { |
| LOG.warn("Failed to mkdirs {}", targetDir); |
| continue; |
| } |
| |
| final File targetMetaFile = new File(targetDir, metaFile.getName()); |
| try { |
| fileIoProvider.rename(volume, metaFile, targetMetaFile); |
| } catch (IOException e) { |
| LOG.warn("Failed to move meta file from {} to {}", metaFile, targetMetaFile, e); |
| continue; |
| } |
| |
| final File targetBlockFile = new File(targetDir, blockFile.getName()); |
| try { |
| fileIoProvider.rename(volume, blockFile, targetBlockFile); |
| } catch (IOException e) { |
| LOG.warn("Failed to move block file from {} to {}", blockFile, targetBlockFile, e); |
| continue; |
| } |
| |
| if (targetBlockFile.exists() && targetMetaFile.exists()) { |
| ++numRecovered; |
| } else { |
| // Failure should be rare. |
| LOG.warn("Failed to move {} to {}", blockFile, targetDir); |
| } |
| } |
| } |
| } |
| |
| fileIoProvider.fullyDelete(volume, source); |
| return numRecovered; |
| } |
| |
| private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, |
| final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized) |
| throws IOException { |
| ReplicaInfo newReplica = null; |
| long blockId = block.getBlockId(); |
| long genStamp = block.getGenerationStamp(); |
| if (isFinalized) { |
| newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) |
| .setBlockId(blockId) |
| .setLength(block.getNumBytes()) |
| .setGenerationStamp(genStamp) |
| .setFsVolume(volume) |
| .setDirectoryToUse(DatanodeUtil.idToBlockDir(finalizedDir, blockId)) |
| .build(); |
| } else { |
| File file = new File(rbwDir, block.getBlockName()); |
| boolean loadRwr = true; |
| File restartMeta = new File(file.getParent() + |
| File.pathSeparator + "." + file.getName() + ".restart"); |
| Scanner sc = null; |
| try { |
| sc = new Scanner(restartMeta, "UTF-8"); |
| // The restart meta file exists |
| if (sc.hasNextLong() && (sc.nextLong() > timer.now())) { |
| // It didn't expire. Load the replica as a RBW. |
| // We don't know the expected block length, so just use 0 |
| // and don't reserve any more space for writes. |
| newReplica = new ReplicaBuilder(ReplicaState.RBW) |
| .setBlockId(blockId) |
| .setLength(validateIntegrityAndSetLength(file, genStamp)) |
| .setGenerationStamp(genStamp) |
| .setFsVolume(volume) |
| .setDirectoryToUse(file.getParentFile()) |
| .setWriterThread(null) |
| .setBytesToReserve(0) |
| .build(); |
| loadRwr = false; |
| } |
| sc.close(); |
| if (!fileIoProvider.delete(volume, restartMeta)) { |
| FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + |
| restartMeta.getPath()); |
| } |
| } catch (FileNotFoundException fnfe) { |
| // nothing to do hereFile dir = |
| } finally { |
| if (sc != null) { |
| sc.close(); |
| } |
| } |
| // Restart meta doesn't exist or expired. |
| if (loadRwr) { |
| ReplicaBuilder builder = new ReplicaBuilder(ReplicaState.RWR) |
| .setBlockId(blockId) |
| .setLength(validateIntegrityAndSetLength(file, genStamp)) |
| .setGenerationStamp(genStamp) |
| .setFsVolume(volume) |
| .setDirectoryToUse(file.getParentFile()); |
| newReplica = builder.build(); |
| } |
| } |
| |
| ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica); |
| ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null |
| : tmpReplicaInfo; |
| if (oldReplica != null) { |
| // We have multiple replicas of the same block so decide which one |
| // to keep. |
| newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); |
| } |
| |
| // If we are retaining a replica on transient storage make sure |
| // it is in the lazyWriteReplicaMap so it can be persisted |
| // eventually. |
| if (newReplica.getVolume().isTransientStorage()) { |
| lazyWriteReplicaMap.addReplica(bpid, blockId, |
| (FsVolumeImpl) newReplica.getVolume(), 0); |
| } else { |
| lazyWriteReplicaMap.discardReplica(bpid, blockId, false); |
| } |
| if (oldReplica == null) { |
| incrNumBlocks(); |
| } |
| } |
| |
| |
| /** |
| * Add replicas under the given directory to the volume map |
| * @param volumeMap the replicas map |
| * @param dir an input directory |
| * @param lazyWriteReplicaMap Map of replicas on transient |
| * storage. |
| * @param isFinalized true if the directory has finalized replicas; |
| * false if the directory has rbw replicas |
| * @param exceptions list of exception which need to return to parent thread. |
| * @param subTaskQueue queue of sub tasks |
| */ |
| void addToReplicasMap(ReplicaMap volumeMap, File dir, |
| final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, |
| List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) |
| throws IOException { |
| File[] files = fileIoProvider.listFiles(volume, dir); |
| Arrays.sort(files, FILE_COMPARATOR); |
| for (int i = 0; i < files.length; i++) { |
| File file = files[i]; |
| if (file.isDirectory()) { |
| // Launch new sub task. |
| AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file, |
| lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue); |
| subTask.fork(); |
| subTaskQueue.add(subTask); |
| } |
| |
| if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { |
| file = recoverTempUnlinkedBlock(file); |
| if (file == null) { // the original block still exists, so we cover it |
| // in another iteration and can continue here |
| continue; |
| } |
| } |
| if (!Block.isBlockFilename(file)) { |
| continue; |
| } |
| |
| long genStamp = FsDatasetUtil.getGenerationStampFromFile( |
| files, file, i); |
| long blockId = Block.filename2id(file.getName()); |
| Block block = new Block(blockId, file.length(), genStamp); |
| addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, |
| isFinalized); |
| } |
| } |
| |
| /** |
| * This method is invoked during DN startup when volumes are scanned to |
| * build up the volumeMap. |
| * |
| * Given two replicas, decide which one to keep. The preference is as |
| * follows: |
| * 1. Prefer the replica with the higher generation stamp. |
| * 2. If generation stamps are equal, prefer the replica with the |
| * larger on-disk length. |
| * 3. If on-disk length is the same, prefer the replica on persistent |
| * storage volume. |
| * 4. All other factors being equal, keep replica1. |
| * |
| * The other replica is removed from the volumeMap and is deleted from |
| * its storage volume. |
| * |
| * @param replica1 first replica |
| * @param replica2 second replica |
| * @param volumeMap volume map to update |
| * @return the replica that is retained. |
| */ |
| ReplicaInfo resolveDuplicateReplicas( |
| final ReplicaInfo replica1, final ReplicaInfo replica2, |
| final ReplicaMap volumeMap) { |
| |
| if (!deleteDuplicateReplicas) { |
| // Leave both block replicas in place. |
| return replica1; |
| } |
| final ReplicaInfo replicaToDelete = |
| selectReplicaToDelete(replica1, replica2); |
| final ReplicaInfo replicaToKeep = |
| (replicaToDelete != replica1) ? replica1 : replica2; |
| // Update volumeMap and delete the replica |
| volumeMap.add(bpid, replicaToKeep); |
| if (replicaToDelete != null) { |
| deleteReplica(replicaToDelete); |
| } |
| return replicaToKeep; |
| } |
| |
| @VisibleForTesting |
| static ReplicaInfo selectReplicaToDelete(final ReplicaInfo replica1, |
| final ReplicaInfo replica2) { |
| ReplicaInfo replicaToKeep; |
| ReplicaInfo replicaToDelete; |
| |
| // it's the same block so don't ever delete it, even if GS or size |
| // differs. caller should keep the one it just discovered on disk |
| if (replica1.getBlockURI().equals(replica2.getBlockURI())) { |
| return null; |
| } |
| if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) { |
| replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp() |
| ? replica1 : replica2; |
| } else if (replica1.getNumBytes() != replica2.getNumBytes()) { |
| replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ? |
| replica1 : replica2; |
| } else if (replica1.getVolume().isTransientStorage() && |
| !replica2.getVolume().isTransientStorage()) { |
| replicaToKeep = replica2; |
| } else { |
| replicaToKeep = replica1; |
| } |
| |
| replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("resolveDuplicateReplicas decide to keep {}. Will try to delete {}", replicaToKeep, replicaToDelete); |
| } |
| return replicaToDelete; |
| } |
| |
| private void deleteReplica(final ReplicaInfo replicaToDelete) { |
| // Delete the files on disk. Failure here is okay. |
| if (!replicaToDelete.deleteBlockData()) { |
| LOG.warn("Failed to delete block file for replica {}", replicaToDelete); |
| } |
| if (!replicaToDelete.deleteMetadata()) { |
| LOG.warn("Failed to delete meta file for replica {}", replicaToDelete); |
| } |
| } |
| |
| /** |
| * Find out the number of bytes in the block that match its crc. |
| * |
| * This algorithm assumes that data corruption caused by unexpected |
| * datanode shutdown occurs only in the last crc chunk. So it checks |
| * only the last chunk. |
| * |
| * @param blockFile the block file |
| * @param genStamp generation stamp of the block |
| * @return the number of valid bytes |
| */ |
| private long validateIntegrityAndSetLength(File blockFile, long genStamp) { |
| try { |
| final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); |
| long blockFileLen = blockFile.length(); |
| long metaFileLen = metaFile.length(); |
| int crcHeaderLen = DataChecksum.getChecksumHeaderSize(); |
| if (!blockFile.exists() || blockFileLen == 0 || |
| !metaFile.exists() || metaFileLen < crcHeaderLen) { |
| return 0; |
| } |
| try (DataInputStream checksumIn = new DataInputStream( |
| new BufferedInputStream( |
| fileIoProvider.getFileInputStream(volume, metaFile), |
| ioFileBufferSize))) { |
| // read and handle the common header here. For now just a version |
| final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( |
| checksumIn, metaFile); |
| int bytesPerChecksum = checksum.getBytesPerChecksum(); |
| int checksumSize = checksum.getChecksumSize(); |
| long numChunks = Math.min( |
| (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum, |
| (metaFileLen - crcHeaderLen) / checksumSize); |
| if (numChunks == 0) { |
| return 0; |
| } |
| try (InputStream blockIn = fileIoProvider.getFileInputStream( |
| volume, blockFile); |
| ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, |
| checksumIn, volume.obtainReference(), fileIoProvider)) { |
| ris.skipChecksumFully((numChunks - 1) * checksumSize); |
| long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; |
| ris.skipDataFully(lastChunkStartPos); |
| int lastChunkSize = (int) Math.min( |
| bytesPerChecksum, blockFileLen - lastChunkStartPos); |
| byte[] buf = new byte[lastChunkSize + checksumSize]; |
| ris.readChecksumFully(buf, lastChunkSize, checksumSize); |
| ris.readDataFully(buf, 0, lastChunkSize); |
| checksum.update(buf, 0, lastChunkSize); |
| long validFileLength; |
| if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc |
| validFileLength = lastChunkStartPos + lastChunkSize; |
| } else { // last chunk is corrupt |
| validFileLength = lastChunkStartPos; |
| } |
| // truncate if extra bytes are present without CRC |
| if (blockFile.length() > validFileLength) { |
| try (RandomAccessFile blockRAF = |
| fileIoProvider.getRandomAccessFile( |
| volume, blockFile, "rw")) { |
| // truncate blockFile |
| blockRAF.setLength(validFileLength); |
| } |
| } |
| return validFileLength; |
| } |
| } |
| } catch (IOException e) { |
| FsDatasetImpl.LOG.warn("Getting exception while validating integrity " + |
| "and setting length for blockFile", e); |
| return 0; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return currentDir.getAbsolutePath(); |
| } |
| |
| void shutdown(BlockListAsLongs blocksListToPersist) { |
| saveReplicas(blocksListToPersist); |
| |
| // Remove the shutdown hook to avoid any memory leak |
| if (shutdownHook != null) { |
| ShutdownHookManager.get().removeShutdownHook(shutdownHook); |
| } |
| } |
| |
| private boolean readReplicasFromCache(ReplicaMap volumeMap, |
| final RamDiskReplicaTracker lazyWriteReplicaMap) { |
| ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock()); |
| File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); |
| // Check whether the file exists or not. |
| if (!replicaFile.exists()) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Replica Cache file: {} doesn't exist", replicaFile.getPath()); |
| } |
| return false; |
| } |
| long fileLastModifiedTime = replicaFile.lastModified(); |
| if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Replica Cache file: {} has gone stale", replicaFile.getPath()); |
| } |
| // Just to make findbugs happy |
| if (!replicaFile.delete()) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Replica Cache file: {} cannot be deleted", replicaFile.getPath()); |
| } |
| } |
| return false; |
| } |
| FileInputStream inputStream = null; |
| try { |
| inputStream = fileIoProvider.getFileInputStream(volume, replicaFile); |
| BlockListAsLongs blocksList = |
| BlockListAsLongs.readFrom(inputStream, maxDataLength); |
| if (blocksList == null) { |
| return false; |
| } |
| |
| for (BlockReportReplica replica : blocksList) { |
| switch (replica.getState()) { |
| case FINALIZED: |
| addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true); |
| break; |
| case RUR: |
| case RBW: |
| case RWR: |
| addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false); |
| break; |
| default: |
| break; |
| } |
| } |
| // Now it is safe to add the replica into volumeMap |
| // In case of any exception during parsing this cache file, fall back |
| // to scan all the files on disk. |
| for (Iterator<ReplicaInfo> iter = |
| tmpReplicaMap.replicas(bpid).iterator(); iter.hasNext(); ) { |
| ReplicaInfo info = iter.next(); |
| // We use a lightweight GSet to store replicaInfo, we need to remove |
| // it from one GSet before adding to another. |
| iter.remove(); |
| volumeMap.add(bpid, info); |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Successfully read replica from cache file : {}", replicaFile.getPath()); |
| } |
| return true; |
| } catch (Exception e) { |
| // Any exception we need to revert back to read from disk |
| // Log the error and return false |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Exception occurred while reading the replicas cache file: {}", replicaFile.getPath(), e); |
| } |
| return false; |
| } |
| finally { |
| // close the inputStream |
| IOUtils.closeStream(inputStream); |
| |
| if (!fileIoProvider.delete(volume, replicaFile)) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Failed to delete replica cache file: {}", replicaFile.getPath()); |
| } |
| } |
| } |
| } |
| |
| private void saveReplicas(BlockListAsLongs blocksListToPersist) { |
| if (blocksListToPersist == null || |
| blocksListToPersist.getNumberOfBlocks()== 0) { |
| return; |
| } |
| final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); |
| final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); |
| if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) || |
| !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) { |
| return; |
| } |
| |
| FileOutputStream out = null; |
| try { |
| out = fileIoProvider.getFileOutputStream(volume, tmpFile); |
| blocksListToPersist.writeTo(out); |
| out.close(); |
| // Renaming the tmp file to replicas |
| fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile); |
| } catch (Exception e) { |
| // If write failed, the volume might be bad. Since the cache file is |
| // not critical, log the error, delete both the files (tmp and cache) |
| // and continue. |
| LOG.warn("Failed to write replicas to cache ", e); |
| fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile); |
| } finally { |
| IOUtils.closeStream(out); |
| fileIoProvider.deleteWithExistsCheck(volume, tmpFile); |
| } |
| } |
| |
| void incrNumBlocks() { |
| numOfBlocks.incrementAndGet(); |
| } |
| |
| void decrNumBlocks() { |
| numOfBlocks.decrementAndGet(); |
| } |
| |
| public long getNumOfBlocks() { |
| return numOfBlocks.get(); |
| } |
| |
| /** |
| * Recursive action for add replica in map. |
| */ |
| class AddReplicaProcessor extends RecursiveAction { |
| |
| private ReplicaMap volumeMap; |
| private File dir; |
| private RamDiskReplicaTracker lazyWriteReplicaMap; |
| private boolean isFinalized; |
| private List<IOException> exceptions; |
| private Queue<RecursiveAction> subTaskQueue; |
| |
| /** |
| * @param volumeMap |
| * the replicas map |
| * @param dir |
| * an input directory |
| * @param lazyWriteReplicaMap |
| * Map of replicas on transient storage. |
| * @param isFinalized |
| * true if the directory has finalized replicas; false if the |
| * directory has rbw replicas |
| * @param exceptions |
| * List of exception which need to return to parent thread. |
| * @param subTaskQueue |
| * queue of sub tasks |
| */ |
| AddReplicaProcessor(ReplicaMap volumeMap, File dir, |
| RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, |
| List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) { |
| this.volumeMap = volumeMap; |
| this.dir = dir; |
| this.lazyWriteReplicaMap = lazyWriteReplicaMap; |
| this.isFinalized = isFinalized; |
| this.exceptions = exceptions; |
| this.subTaskQueue = subTaskQueue; |
| } |
| |
| @Override |
| protected void compute() { |
| try { |
| addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized, |
| exceptions, subTaskQueue); |
| } catch (IOException e) { |
| LOG.warn("Caught exception while adding replicas from {} in subtask. Will throw later.", volume, e); |
| exceptions.add(e); |
| } |
| } |
| } |
| |
| /** |
| * Return the size of fork pool used for adding replica in map. |
| */ |
| @VisibleForTesting |
| public static int getAddReplicaForkPoolSize() { |
| return addReplicaThreadPool.getPoolSize(); |
| } |
| } |