blob: 24ad6efdd546f4b015b000a1d965e383f86916c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test {@link FSUtils}.
*/
@Category({MiscTests.class, MediumTests.class})
public class TestFSUtils {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
private HBaseTestingUtility htu;
private FileSystem fs;
private Configuration conf;
@Before
public void setUp() throws IOException {
htu = new HBaseTestingUtility();
fs = htu.getTestFileSystem();
conf = htu.getConfiguration();
}
@Test
public void testIsHDFS() throws Exception {
assertFalse(CommonFSUtils.isHDFS(conf));
MiniDFSCluster cluster = null;
try {
cluster = htu.startMiniDFSCluster(1);
assertTrue(CommonFSUtils.isHDFS(conf));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
throws Exception {
FSDataOutputStream out = fs.create(file);
byte [] data = new byte[dataSize];
out.write(data, 0, dataSize);
out.close();
}
@Test public void testcomputeHDFSBlocksDistribution() throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024;
conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = null;
Path testFile = null;
try {
// set up a cluster with 3 nodes
String hosts[] = new String[] { "host1", "host2", "host3" };
cluster = htu.startMiniDFSCluster(hosts);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create a file with two blocks
testFile = new Path("/test1.txt");
WriteDataToHDFS(fs, testFile, 2*DEFAULT_BLOCK_SIZE);
// given the default replication factor is 3, the same as the number of
// datanodes; the locality index for each host should be 100%,
// or getWeight for each host should be the same as getUniqueBlocksWeights
final long maxTime = System.currentTimeMillis() + 2000;
boolean ok;
do {
ok = true;
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
long uniqueBlocksTotalWeight =
blocksDistribution.getUniqueBlocksTotalWeight();
for (String host : hosts) {
long weight = blocksDistribution.getWeight(host);
ok = (ok && uniqueBlocksTotalWeight == weight);
}
} while (!ok && System.currentTimeMillis() < maxTime);
assertTrue(ok);
} finally {
htu.shutdownMiniDFSCluster();
}
try {
// set up a cluster with 4 nodes
String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
cluster = htu.startMiniDFSCluster(hosts);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create a file with three blocks
testFile = new Path("/test2.txt");
WriteDataToHDFS(fs, testFile, 3*DEFAULT_BLOCK_SIZE);
// given the default replication factor is 3, we will have total of 9
// replica of blocks; thus the host with the highest weight should have
// weight == 3 * DEFAULT_BLOCK_SIZE
final long maxTime = System.currentTimeMillis() + 2000;
long weight;
long uniqueBlocksTotalWeight;
do {
FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
String tophost = blocksDistribution.getTopHosts().get(0);
weight = blocksDistribution.getWeight(tophost);
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
} while (uniqueBlocksTotalWeight != weight && System.currentTimeMillis() < maxTime);
assertTrue(uniqueBlocksTotalWeight == weight);
} finally {
htu.shutdownMiniDFSCluster();
}
try {
// set up a cluster with 4 nodes
String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
cluster = htu.startMiniDFSCluster(hosts);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create a file with one block
testFile = new Path("/test3.txt");
WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
// given the default replication factor is 3, we will have total of 3
// replica of blocks; thus there is one host without weight
final long maxTime = System.currentTimeMillis() + 2000;
HDFSBlocksDistribution blocksDistribution;
do {
FileStatus status = fs.getFileStatus(testFile);
blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
}
while (blocksDistribution.getTopHosts().size() != 3 && System.currentTimeMillis() < maxTime);
assertEquals("Wrong number of hosts distributing blocks.", 3,
blocksDistribution.getTopHosts().size());
} finally {
htu.shutdownMiniDFSCluster();
}
}
private void writeVersionFile(Path versionFile, String version) throws IOException {
if (CommonFSUtils.isExists(fs, versionFile)) {
assertTrue(CommonFSUtils.delete(fs, versionFile, true));
}
try (FSDataOutputStream s = fs.create(versionFile)) {
s.writeUTF(version);
}
assertTrue(fs.exists(versionFile));
}
@Test
public void testVersion() throws DeserializationException, IOException {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
assertNull(FSUtils.getVersion(fs, rootdir));
// No meta dir so no complaint from checkVersion.
// Presumes it a new install. Will create version file.
FSUtils.checkVersion(fs, rootdir, true);
// Now remove the version file and create a metadir so checkVersion fails.
Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
assertTrue(CommonFSUtils.isExists(fs, versionFile));
assertTrue(CommonFSUtils.delete(fs, versionFile, true));
Path metaRegionDir =
FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
FsPermission defaultPerms = CommonFSUtils.getFilePermissions(fs, this.conf,
HConstants.DATA_FILE_UMASK_KEY);
CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
boolean thrown = false;
try {
FSUtils.checkVersion(fs, rootdir, true);
} catch (FileSystemVersionException e) {
thrown = true;
}
assertTrue("Expected FileSystemVersionException", thrown);
// Write out a good version file. See if we can read it in and convert.
String version = HConstants.FILE_SYSTEM_VERSION;
writeVersionFile(versionFile, version);
FileStatus [] status = fs.listStatus(versionFile);
assertNotNull(status);
assertTrue(status.length > 0);
String newVersion = FSUtils.getVersion(fs, rootdir);
assertEquals(version.length(), newVersion.length());
assertEquals(version, newVersion);
// File will have been converted. Exercise the pb format
assertEquals(version, FSUtils.getVersion(fs, rootdir));
FSUtils.checkVersion(fs, rootdir, true);
// Write an old version file.
String oldVersion = "1";
writeVersionFile(versionFile, oldVersion);
newVersion = FSUtils.getVersion(fs, rootdir);
assertNotEquals(version, newVersion);
thrown = false;
try {
FSUtils.checkVersion(fs, rootdir, true);
} catch (FileSystemVersionException e) {
thrown = true;
}
assertTrue("Expected FileSystemVersionException", thrown);
}
@Test
public void testPermMask() throws Exception {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
// default fs permission
FsPermission defaultFsPerm = CommonFSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
// 'hbase.data.umask.enable' is false. We will get default fs permission.
assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
// first check that we don't crash if we don't have perms set
FsPermission defaultStartPerm = CommonFSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
// default 'hbase.data.umask'is 000, and this umask will be used when
// 'hbase.data.umask.enable' is true.
// Therefore we will not get the real fs default in this case.
// Instead we will get the starting point FULL_RWX_PERMISSIONS
assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
// now check that we get the right perms
FsPermission filePerm = CommonFSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
assertEquals(new FsPermission("700"), filePerm);
// then that the correct file is created
Path p = new Path("target" + File.separator + htu.getRandomUUID().toString());
try {
FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
out.close();
FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission());
// and then cleanup
} finally {
fs.delete(p, true);
}
}
@Test
public void testDeleteAndExists() throws Exception {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// then that the correct file is created
String file = htu.getRandomUUID().toString();
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try {
FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
out.close();
assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted.
CommonFSUtils.delete(fs, p, false);
// Create another file
FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
out1.close();
// delete the file with recursion as false. Still the file only will be deleted
CommonFSUtils.delete(fs, p1, true);
assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1));
// and then cleanup
} finally {
CommonFSUtils.delete(fs, p, true);
CommonFSUtils.delete(fs, p1, true);
}
}
@Test
public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try {
assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), new Path("definitely/doesn't/exist"), null));
} finally {
cluster.shutdown();
}
}
@Test
public void testRenameAndSetModifyTime() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
assertTrue(CommonFSUtils.isHDFS(conf));
FileSystem fs = FileSystem.get(conf);
Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
String file = htu.getRandomUUID().toString();
Path p = new Path(testDir, file);
FSDataOutputStream out = fs.create(p);
out.close();
assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
long expect = System.currentTimeMillis() + 1000;
assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
mockEnv.setValue(expect);
EnvironmentEdgeManager.injectEdge(mockEnv);
try {
String dstFile = htu.getRandomUUID().toString();
Path dst = new Path(testDir , dstFile);
assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p));
assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst));
assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
cluster.shutdown();
} finally {
EnvironmentEdgeManager.reset();
}
}
@Test
public void testSetStoragePolicyDefault() throws Exception {
verifyNoHDFSApiInvocationForDefaultPolicy();
verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
}
/**
* Note: currently the default policy is set to defer to HDFS and this case is to verify the
* logic, will need to remove the check if the default policy is changed
*/
private void verifyNoHDFSApiInvocationForDefaultPolicy() {
FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
// There should be no exception thrown when setting to default storage policy, which indicates
// the HDFS API hasn't been called
try {
CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
} catch (IOException e) {
Assert.fail("Should have bypassed the FS API when setting default storage policy");
}
// There should be exception thrown when given non-default storage policy, which indicates the
// HDFS API has been called
try {
CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
Assert.fail("Should have invoked the FS API but haven't");
} catch (IOException e) {
// expected given an invalid path
}
}
class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
@Override
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
throw new IOException("The setStoragePolicy method is invoked");
}
}
/* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
verifyFileInDirWithStoragePolicy("ALL_SSD");
}
final String INVALID_STORAGE_POLICY = "1772";
/* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyInvalid() throws Exception {
verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
}
// Here instead of TestCommonFSUtils because we need a minicluster
private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
conf.set(HConstants.WAL_STORAGE_POLICY, policy);
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try {
assertTrue(CommonFSUtils.isHDFS(conf));
FileSystem fs = FileSystem.get(conf);
Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
fs.mkdirs(testDir);
String storagePolicy =
conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
String file =htu.getRandomUUID().toString();
Path p = new Path(testDir, file);
WriteDataToHDFS(fs, p, 4096);
HFileSystem hfs = new HFileSystem(fs);
String policySet = hfs.getStoragePolicyName(p);
LOG.debug("The storage policy of path " + p + " is " + policySet);
if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
|| policy.equals(INVALID_STORAGE_POLICY)) {
String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
LOG.debug("The default hdfs storage policy (indicated by home path: "
+ hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
Assert.assertEquals(hdfsDefaultPolicy, policySet);
} else {
Assert.assertEquals(policy, policySet);
}
// will assert existance before deleting.
cleanupFile(fs, testDir);
} finally {
cluster.shutdown();
}
}
/**
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
* @throws Exception
*/
@Test public void testDFSHedgedReadMetrics() throws Exception {
// Enable hedged reads and set it so the threshold is really low.
// Most of this test is taken from HDFS, from TestPread.
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
conf.setBoolean("dfs.datanode.transferTo.allowed", false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
// Get the metrics. Should be empty.
DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
assertEquals(0, metrics.getHedgedReadOps());
FileSystem fileSys = cluster.getFileSystem();
try {
Path p = new Path("preadtest.dat");
// We need > 1 blocks to test out the hedged reads.
DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
blockSize, (short) 3, seed);
pReadFile(fileSys, p);
cleanupFile(fileSys, p);
assertTrue(metrics.getHedgedReadOps() > 0);
} finally {
fileSys.close();
cluster.shutdown();
}
}
@Test
public void testCopyFilesParallel() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path src = new Path("/src");
fs.mkdirs(src);
for (int i = 0; i < 50; i++) {
WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
}
Path sub = new Path(src, "sub");
fs.mkdirs(sub);
for (int i = 0; i < 50; i++) {
WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
}
Path dst = new Path("/dst");
List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
assertEquals(102, allFiles.size());
FileStatus[] list = fs.listStatus(dst);
assertEquals(51, list.length);
FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
assertEquals(50, sublist.length);
}
// Below is taken from TestPread over in HDFS.
static final int blockSize = 4096;
static final long seed = 0xDEADBEEFL;
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
FSDataInputStream stm = fileSys.open(name);
byte[] expected = new byte[12 * blockSize];
Random rand = new Random(seed);
rand.nextBytes(expected);
// do a sanity check. Read first 4K bytes
byte[] actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
// now do a pread for the first 8K bytes
actual = new byte[8192];
doPread(stm, 0L, actual, 0, 8192);
checkAndEraseData(actual, 0, expected, "Pread Test 1");
// Now check to see if the normal read returns 4K-8K byte range
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 4096, expected, "Pread Test 2");
// Now see if we can cross a single block boundary successfully
// read 4K bytes from blockSize - 2K offset
stm.readFully(blockSize - 2048, actual, 0, 4096);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
// now see if we can cross two block boundaries successfully
// read blockSize + 4K bytes from blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(blockSize - 2048, actual);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
// now see if we can cross two block boundaries that are not cached
// read blockSize + 4K bytes from 10*blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(10 * blockSize - 2048, actual);
checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
// now check that even after all these preads, we can still read
// bytes 8K-12K
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 8192, expected, "Pread Test 6");
// done
stm.close();
// check block location caching
stm = fileSys.open(name);
stm.readFully(1, actual, 0, 4096);
stm.readFully(4*blockSize, actual, 0, 4096);
stm.readFully(7*blockSize, actual, 0, 4096);
actual = new byte[3*4096];
stm.readFully(0*blockSize, actual, 0, 3*4096);
checkAndEraseData(actual, 0, expected, "Pread Test 7");
actual = new byte[8*4096];
stm.readFully(3*blockSize, actual, 0, 8*4096);
checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
// read the tail
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
IOException res = null;
try { // read beyond the end of the file
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
} catch (IOException e) {
// should throw an exception
res = e;
}
assertTrue("Error reading beyond file boundary.", res != null);
stm.close();
}
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
for (int idx = 0; idx < actual.length; idx++) {
assertEquals(message+" byte "+(from+idx)+" differs. expected "+
expected[from+idx]+" actual "+actual[idx],
actual[idx], expected[from+idx]);
actual[idx] = 0;
}
}
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
int offset, int length) throws IOException {
int nread = 0;
// long totalRead = 0;
// DFSInputStream dfstm = null;
/* Disable. This counts do not add up. Some issue in original hdfs tests?
if (stm.getWrappedStream() instanceof DFSInputStream) {
dfstm = (DFSInputStream) (stm.getWrappedStream());
totalRead = dfstm.getReadStatistics().getTotalBytesRead();
} */
while (nread < length) {
int nbytes =
stm.read(position + nread, buffer, offset + nread, length - nread);
assertTrue("Error in pread", nbytes > 0);
nread += nbytes;
}
/* Disable. This counts do not add up. Some issue in original hdfs tests?
if (dfstm != null) {
if (isHedgedRead) {
assertTrue("Expected read statistic to be incremented",
length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
}*/
}
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name));
}
static {
try {
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
}
}
// Here instead of TestCommonFSUtils because we need a minicluster
@Test
public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try (FileSystem filesystem = cluster.getFileSystem()) {
FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
} finally {
cluster.shutdown();
}
}
private void testIsSameHdfs(int nnport) throws IOException {
Configuration conf = HBaseConfiguration.create();
Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
Path desPath = new Path("hdfs://127.0.0.1/");
FileSystem srcFs = srcPath.getFileSystem(conf);
FileSystem desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.0.1:8070/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
conf.set("dfs.nameservices", "haosong-hadoop");
conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
}
@Test
public void testIsSameHdfs() throws IOException {
String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
LOG.info("hadoop version is: " + hadoopVersion);
boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
if (isHadoop3_0_0) {
// Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
// See HDFS-9427
testIsSameHdfs(9820);
} else {
// pre hadoop 3.0.0 defaults to port 8020
// Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
testIsSameHdfs(8020);
}
}
}