blob: ed32faec7926a3a7e71de697f10bd8805cd0e9c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Random;
/**
* Test-related utilities to access blocks in {@link FsDatasetImpl}.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
private static final Log LOG =
LogFactory.getLog(FsDatasetImplTestUtils.class);
private final FsDatasetImpl dataset;
/**
* A reference to the replica that is used to corrupt block / meta later.
*/
private static class FsDatasetImplMaterializedReplica
implements MaterializedReplica {
/** Block file of the replica. */
private final File blockFile;
private final File metaFile;
/** Check the existence of the file. */
private static void checkFile(File file) throws FileNotFoundException {
if (file == null || !file.exists()) {
throw new FileNotFoundException(
"The block file or metadata file " + file + " does not exist.");
}
}
/** Corrupt a block / crc file by truncating it to a newSize */
private static void truncate(File file, long newSize)
throws IOException {
Preconditions.checkArgument(newSize >= 0);
checkFile(file);
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
raf.setLength(newSize);
}
}
/** Corrupt a block / crc file by deleting it. */
private static void delete(File file) throws IOException {
checkFile(file);
Files.delete(file.toPath());
}
FsDatasetImplMaterializedReplica(File blockFile, File metaFile) {
this.blockFile = blockFile;
this.metaFile = metaFile;
}
@Override
public void corruptData() throws IOException {
checkFile(blockFile);
LOG.info("Corrupting block file: " + blockFile);
final int BUF_SIZE = 32;
byte[] buf = new byte[BUF_SIZE];
try (RandomAccessFile raf = new RandomAccessFile(blockFile, "rw")) {
int nread = raf.read(buf);
for (int i = 0; i < nread; i++) {
buf[i]++;
}
raf.seek(0);
raf.write(buf);
}
}
@Override
public void corruptData(byte[] newContent) throws IOException {
checkFile(blockFile);
LOG.info("Corrupting block file with new content: " + blockFile);
try (RandomAccessFile raf = new RandomAccessFile(blockFile, "rw")) {
raf.write(newContent);
}
}
@Override
public void truncateData(long newSize) throws IOException {
LOG.info("Truncating block file: " + blockFile);
truncate(blockFile, newSize);
}
@Override
public void deleteData() throws IOException {
LOG.info("Deleting block file: " + blockFile);
delete(blockFile);
}
@Override
public void corruptMeta() throws IOException {
checkFile(metaFile);
LOG.info("Corrupting meta file: " + metaFile);
Random random = new Random();
try (RandomAccessFile raf = new RandomAccessFile(metaFile, "rw")) {
FileChannel channel = raf.getChannel();
int offset = random.nextInt((int)channel.size() / 2);
raf.seek(offset);
raf.write("BADBAD".getBytes());
}
}
@Override
public void deleteMeta() throws IOException {
LOG.info("Deleting metadata file: " + metaFile);
delete(metaFile);
}
@Override
public void truncateMeta(long newSize) throws IOException {
LOG.info("Truncating metadata file: " + metaFile);
truncate(metaFile, newSize);
}
@Override
public String toString() {
return String.format("MaterializedReplica: file=%s", blockFile);
}
}
public FsDatasetImplTestUtils(DataNode datanode) {
Preconditions.checkArgument(
datanode.getFSDataset() instanceof FsDatasetImpl);
dataset = (FsDatasetImpl) datanode.getFSDataset();
}
/**
* Return a materialized replica from the FsDatasetImpl.
*/
@Override
public MaterializedReplica getMaterializedReplica(ExtendedBlock block)
throws ReplicaNotFoundException {
File blockFile;
try {
blockFile = dataset.getBlockFile(
block.getBlockPoolId(), block.getBlockId());
} catch (IOException e) {
LOG.error("Block file for " + block + " does not existed:", e);
throw new ReplicaNotFoundException(block);
}
File metaFile = FsDatasetUtil.getMetaFile(
blockFile, block.getGenerationStamp());
return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
}
@Override
public Replica createFinalizedReplica(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createFinalizedReplica(volumes.get(0), block);
}
}
@Override
public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
vol.getCurrentDir().getParentFile());
dataset.volumeMap.add(block.getBlockPoolId(), info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
return info;
}
@Override
public Replica createReplicaInPipeline(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), block);
}
}
@Override
public Replica createReplicaInPipeline(
FsVolumeSpi volume, ExtendedBlock block) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInPipeline rip = new ReplicaInPipeline(
block.getBlockId(), block.getGenerationStamp(), volume,
vol.createTmpFile(
block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
0);
dataset.volumeMap.add(block.getBlockPoolId(), rip);
return rip;
}
@Override
public Replica createRBW(ExtendedBlock eb) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createRBW(volumes.get(0), eb);
}
}
@Override
public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaBeingWritten rbw = new ReplicaBeingWritten(
eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile(), null);
rbw.getBlockFile().createNewFile();
rbw.getMetaFile().createNewFile();
dataset.volumeMap.add(bpid, rbw);
return rbw;
}
@Override
public Replica createReplicaWaitingToBeRecovered(ExtendedBlock eb)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), eb);
}
}
@Override
public Replica createReplicaWaitingToBeRecovered(
FsVolumeSpi volume, ExtendedBlock eb) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaWaitingToBeRecovered rwbr =
new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile());
dataset.volumeMap.add(bpid, rwbr);
return rwbr;
}
@Override
public Replica createReplicaUnderRecovery(
ExtendedBlock block, long recoveryId) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica(
block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()),
recoveryId
);
dataset.volumeMap.add(block.getBlockPoolId(), rur);
return rur;
}
}
@Override
public void checkStoredReplica(Replica replica) throws IOException {
Preconditions.checkArgument(replica instanceof ReplicaInfo);
ReplicaInfo r = (ReplicaInfo) replica;
FsDatasetImpl.checkReplicaFiles(r);
}
}