blob: 7102fcd28fb556f33472a27ad1a69ddf46b2eeaf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* This class is used for all replicas which are on local storage media
* and hence, are backed by files.
*/
abstract public class LocalReplica extends ReplicaInfo {
/**
* Base directory containing numerically-identified sub directories and
* possibly blocks.
*/
private File baseDir;
/**
* Whether or not this replica's parent directory includes subdirs, in which
* case we can generate them based on the replica's block ID
*/
private boolean hasSubdirs;
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class);
/**
* Constructor
* @param block a block
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
LocalReplica(Block block, FsVolumeSpi vol, File dir) {
this(block.getBlockId(), block.getNumBytes(),
block.getGenerationStamp(), vol, dir);
}
/**
* Constructor
* @param blockId block id
* @param len replica length
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
LocalReplica(long blockId, long len, long genStamp,
FsVolumeSpi vol, File dir) {
super(vol, blockId, len, genStamp);
setDirInternal(dir);
}
/**
* Copy constructor.
* @param from the source replica
*/
LocalReplica(LocalReplica from) {
this(from, from.getVolume(), from.getDir());
}
/**
* Get the full path of this replica's data file.
* @return the full path of this replica's data file
*/
@VisibleForTesting
public File getBlockFile() {
return new File(getDir(), getBlockName());
}
/**
* Get the full path of this replica's meta file.
* @return the full path of this replica's meta file
*/
@VisibleForTesting
public File getMetaFile() {
return new File(getDir(),
DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
}
/**
* Return the parent directory path where this replica is located.
* @return the parent directory path where this replica is located
*/
protected File getDir() {
return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
getBlockId()) : baseDir;
}
/**
* Set the parent directory where this replica is located.
* @param dir the parent directory where the replica is located
*/
private void setDirInternal(File dir) {
if (dir == null) {
baseDir = null;
return;
}
ReplicaDirInfo dirInfo = parseBaseDir(dir, getBlockId());
this.hasSubdirs = dirInfo.hasSubidrs;
synchronized (internedBaseDirs) {
if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
// Create a new String path of this file and make a brand new File object
// to guarantee we drop the reference to the underlying char[] storage.
File baseDir = new File(dirInfo.baseDirPath);
internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
}
this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
}
}
@VisibleForTesting
public static class ReplicaDirInfo {
public String baseDirPath;
public boolean hasSubidrs;
public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
this.baseDirPath = baseDirPath;
this.hasSubidrs = hasSubidrs;
}
}
@VisibleForTesting
public static ReplicaDirInfo parseBaseDir(File dir, long blockId) {
File currentDir = dir;
boolean hasSubdirs = false;
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
hasSubdirs = true;
currentDir = currentDir.getParentFile();
}
if (hasSubdirs) {
// set baseDir to currentDir if it matches id(idToBlockDir).
File idToBlockDir = DatanodeUtil.idToBlockDir(currentDir, blockId);
if (idToBlockDir.equals(dir)) {
return new ReplicaDirInfo(currentDir.getAbsolutePath(), true);
}
}
return new ReplicaDirInfo(dir.getAbsolutePath(), false);
}
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
* hardlinks to the original file to be removed. The temporary
* files are created in the same directory. The temporary files will
* be recovered (especially on Windows) on datanode restart.
*/
private void breakHardlinks(File file, Block b) throws IOException {
final FileIoProvider fileIoProvider = getFileIoProvider();
final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
try {
try (FileInputStream in = fileIoProvider.getFileInputStream(
getVolume(), file)) {
try (FileOutputStream out = fileIoProvider.getFileOutputStream(
getVolume(), tmpFile)) {
IOUtils.copyBytes(in, out, 16 * 1024);
}
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()
+ " into file " + tmpFile + " resulted in a size of "
+ tmpFile.length());
}
fileIoProvider.replaceFile(getVolume(), tmpFile, file);
} catch (IOException e) {
if (!fileIoProvider.delete(getVolume(), tmpFile)) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
throw e;
}
}
/**
* This function "breaks hardlinks" to the current replica file.
*
* When doing a DataNode upgrade, we create a bunch of hardlinks to each block
* file. This cleverly ensures that both the old and the new storage
* directories can contain the same block file, without using additional space
* for the data.
*
* However, when we want to append to the replica file, we need to "break" the
* hardlink to ensure that the old snapshot continues to contain the old data
* length. If we failed to do that, we could roll back to the previous/
* directory during a downgrade, and find that the block contents were longer
* than they were at the time of upgrade.
*
* @return true only if data was copied.
* @throws IOException
*/
public boolean breakHardLinksIfNeeded() throws IOException {
final File file = getBlockFile();
final FileIoProvider fileIoProvider = getFileIoProvider();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
breakHardlinks(meta, this);
}
return true;
}
@Override
public URI getBlockURI() {
return getBlockFile().toURI();
}
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
return getDataInputStream(getBlockFile(), seekOffset);
}
@Override
public OutputStream getDataOutputStream(boolean append) throws IOException {
return getFileIoProvider().getFileOutputStream(
getVolume(), getBlockFile(), append);
}
@Override
public boolean blockDataExists() {
return getFileIoProvider().exists(getVolume(), getBlockFile());
}
@Override
public boolean deleteBlockData() {
return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
}
@Override
public long getBlockDataLength() {
return getBlockFile().length();
}
@Override
public URI getMetadataURI() {
return getMetaFile().toURI();
}
@Override
public LengthInputStream getMetadataInputStream(long offset)
throws IOException {
final File meta = getMetaFile();
return new LengthInputStream(
getFileIoProvider().openAndSeek(getVolume(), meta, offset),
meta.length());
}
@Override
public OutputStream getMetadataOutputStream(boolean append)
throws IOException {
return new FileOutputStream(getMetaFile(), append);
}
@Override
public boolean metadataExists() {
return getFileIoProvider().exists(getVolume(), getMetaFile());
}
@Override
public boolean deleteMetadata() {
return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
}
@Override
public long getMetadataLength() {
return getMetaFile().length();
}
@Override
public boolean renameMeta(URI destURI) throws IOException {
return renameFile(getMetaFile(), new File(destURI));
}
@Override
public boolean renameData(URI destURI) throws IOException {
return renameFile(getBlockFile(), new File(destURI));
}
private boolean renameFile(File srcfile, File destfile) throws IOException {
try {
getFileIoProvider().rename(getVolume(), srcfile, destfile);
return true;
} catch (IOException e) {
throw new IOException("Failed to move block file for " + this
+ " from " + srcfile + " to " + destfile.getAbsolutePath(), e);
}
}
@Override
public void updateWithReplica(StorageLocation replicaLocation) {
// for local replicas, the replica location is assumed to be a file.
File diskFile = null;
try {
diskFile = new File(replicaLocation.getUri());
} catch (IllegalArgumentException e) {
diskFile = null;
}
if (null == diskFile) {
setDirInternal(null);
} else {
setDirInternal(diskFile.getParentFile());
}
}
@Override
public boolean getPinning(LocalFileSystem localFS) throws IOException {
return getPinning(localFS, new Path(getBlockFile().getAbsolutePath()));
}
@Override
public void setPinning(LocalFileSystem localFS) throws IOException {
File f = getBlockFile();
Path p = new Path(f.getAbsolutePath());
setPinning(localFS, p);
}
@Override
public void bumpReplicaGS(long newGS) throws IOException {
long oldGS = getGenerationStamp();
final File oldmeta = getMetaFile();
setGenerationStamp(newGS);
final File newmeta = getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
try {
// calling renameMeta on the ReplicaInfo doesn't work here
getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to " + newmeta, e);
}
}
@Override
public void truncateBlock(long newLength) throws IOException {
truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
getNumBytes(), newLength, getFileIoProvider());
}
@Override
public int compareWith(ScanInfo info) {
return info.getBlockFile().compareTo(getBlockFile());
}
@Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
getFileIoProvider().nativeCopyFileUnbuffered(
getVolume(), getMetaFile(), new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
getFileIoProvider().nativeCopyFileUnbuffered(
getVolume(), getBlockFile(), new File(destination), true);
}
/**
* Get input stream for a local file and optionally seek to the offset.
* @param f path to the file
* @param seekOffset offset to seek
* @return input stream for read
* @throws IOException
*/
private FileInputStream getDataInputStream(File f, long seekOffset)
throws IOException {
FileInputStream fis;
final FileIoProvider fileIoProvider = getFileIoProvider();
if (NativeIO.isAvailable()) {
fis = fileIoProvider.getShareDeleteFileInputStream(
getVolume(), f, seekOffset);
} else {
try {
fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Expected block file at " + f +
" does not exist.");
}
}
return fis;
}
/**
* Get pin status of a file by checking the sticky bit.
* @param localFS local file system
* @param path path to be checked
* @return true if the file is pinned with sticky bit
* @throws IOException
*/
public boolean getPinning(LocalFileSystem localFS, Path path) throws
IOException {
boolean stickyBit =
localFS.getFileStatus(path).getPermission().getStickyBit();
return stickyBit;
}
/**
* Set sticky bit on path to pin file.
* @param localFS local file system
* @param path path to be pinned with sticky bit
* @throws IOException
*/
public void setPinning(LocalFileSystem localFS, Path path) throws
IOException {
FsPermission oldPermission = localFS.getFileStatus(path).getPermission();
FsPermission permission = new FsPermission(oldPermission.getUserAction(),
oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
localFS.setPermission(path, permission);
}
public static void truncateBlock(
FsVolumeSpi volume, File blockFile, File metaFile,
long oldlen, long newlen, FileIoProvider fileIoProvider)
throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
+ ", oldlen=" + oldlen
+ ", newlen=" + newlen);
if (newlen == oldlen) {
return;
}
if (newlen > oldlen) {
throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+ ") to newlen (=" + newlen + ")");
}
// fis is closed by BlockMetadataHeader.readHeader.
final FileInputStream fis = fileIoProvider.getFileInputStream(
volume, metaFile);
DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long n = (newlen - 1)/bpc + 1;
long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
long lastchunkoffset = (n - 1)*bpc;
int lastchunksize = (int)(newlen - lastchunkoffset);
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
volume, blockFile, "rw")) {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
}
//compute checksum
dcs.update(b, 0, lastchunksize);
dcs.writeValue(b, 0, false);
//update metaFile
try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
volume, metaFile, "rw")) {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
}
}
/**
* Sync the parent directory changes to durable device.
* @throws IOException
*/
public void fsyncDirectory() throws IOException {
File dir = getDir();
try {
getFileIoProvider().dirSync(getVolume(), getDir());
} catch (IOException e) {
throw new IOException("Failed to sync " + dir, e);
}
}
}