blob: 6bd737324887f05daeffb791fe880130867e9fe8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.checkOpStatistics;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.checkStatistics;
import static org.apache.hadoop.hdfs.TestDistributedFileSystem.getOpStatistics;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.test.Whitebox;
import org.slf4j.event.Level;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Test WebHDFS */
public class TestWebHDFS {
static final Logger LOG = LoggerFactory.getLogger(TestWebHDFS.class);
static final Random RANDOM = new Random();
static final long systemStartTime = System.nanoTime();
private static MiniDFSCluster cluster = null;
@After
public void tearDown() {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
}
/** A timer for measuring performance. */
static class Ticker {
final String name;
final long startTime = System.nanoTime();
private long previousTick = startTime;
Ticker(final String name, String format, Object... args) {
this.name = name;
LOG.info(String.format("\n\n%s START: %s\n",
name, String.format(format, args)));
}
void tick(final long nBytes, String format, Object... args) {
final long now = System.nanoTime();
if (now - previousTick > 10000000000L) {
previousTick = now;
final double mintues = (now - systemStartTime)/60000000000.0;
LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
String.format(format, args), toMpsString(nBytes, now)));
}
}
void end(final long nBytes) {
final long now = System.nanoTime();
final double seconds = (now - startTime)/1000000000.0;
LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
name, seconds, toMpsString(nBytes, now)));
}
String toMpsString(final long nBytes, final long now) {
final double mb = nBytes/(double)(1<<20);
final double mps = mb*1000000000.0/(now - startTime);
return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
}
}
@Test(timeout=300000)
public void testLargeFile() throws Exception {
largeFileTest(200L << 20); //200MB file length
}
/** Test read and write large files. */
static void largeFileTest(final long fileLength) throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path dir = new Path("/test/largeFile");
Assert.assertTrue(fs.mkdirs(dir));
final byte[] data = new byte[1 << 20];
RANDOM.nextBytes(data);
final byte[] expected = new byte[2 * data.length];
System.arraycopy(data, 0, expected, 0, data.length);
System.arraycopy(data, 0, expected, data.length, data.length);
final Path p = new Path(dir, "file");
final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
final FSDataOutputStream out = fs.create(p);
try {
long remaining = fileLength;
for (; remaining > 0;) {
t.tick(fileLength - remaining, "remaining=%d", remaining);
final int n = (int) Math.min(remaining, data.length);
out.write(data, 0, n);
remaining -= n;
}
} finally {
out.close();
}
t.end(fileLength);
Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());
final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
final long largeOffset = fileLength - smallOffset;
final byte[] buf = new byte[data.length];
verifySeek(fs, p, largeOffset, fileLength, buf, expected);
verifySeek(fs, p, smallOffset, fileLength, buf, expected);
verifyPread(fs, p, largeOffset, fileLength, buf, expected);
}
static void checkData(long offset, long remaining, int n,
byte[] actual, byte[] expected) {
if (RANDOM.nextInt(100) == 0) {
int j = (int)(offset % actual.length);
for(int i = 0; i < n; i++) {
if (expected[j] != actual[i]) {
Assert.fail("expected[" + j + "]=" + expected[j]
+ " != actual[" + i + "]=" + actual[i]
+ ", offset=" + offset + ", remaining=" + remaining + ", n=" + n);
}
j++;
}
}
}
/** test seek */
static void verifySeek(FileSystem fs, Path p, long offset, long length,
byte[] buf, byte[] expected) throws IOException {
long remaining = length - offset;
long checked = 0;
LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
offset, remaining);
final FSDataInputStream in = fs.open(p, 64 << 10);
in.seek(offset);
for(; remaining > 0; ) {
t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
final int n = (int)Math.min(remaining, buf.length);
in.readFully(buf, 0, n);
checkData(offset, remaining, n, buf, expected);
offset += n;
remaining -= n;
checked += n;
}
in.close();
t.end(checked);
}
static void verifyPread(FileSystem fs, Path p, long offset, long length,
byte[] buf, byte[] expected) throws IOException {
long remaining = length - offset;
long checked = 0;
LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);
final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
offset, remaining);
final FSDataInputStream in = fs.open(p, 64 << 10);
for(; remaining > 0; ) {
t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
final int n = (int)Math.min(remaining, buf.length);
in.readFully(offset, buf, 0, n);
checkData(offset, remaining, n, buf, expected);
offset += n;
remaining -= n;
checked += n;
}
in.close();
t.end(checked);
}
/** Test client retry with namenode restarting. */
@Test(timeout=300000)
public void testNamenodeRestart() throws Exception {
GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG, Level.TRACE);
final Configuration conf = WebHdfsTestUtil.createConf();
TestDFSClientRetries.namenodeRestartTest(conf, true);
}
@Test(timeout=300000)
public void testLargeDirectory() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final int listLimit = 2;
// force small chunking of directory listing
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, listLimit);
// force paths to be only owner-accessible to ensure ugi isn't changing
// during listStatus
FsPermission.setUMask(conf, new FsPermission((short)0077));
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
.setPermission(new Path("/"),
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
// trick the NN into not believing it's not the superuser so we can
// tell if the correct user is used by listStatus
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting(
"not-superuser", new String[] {"not-supergroup" }));
UserGroupInformation.createUserForTesting("me", new String[] {"my-group" })
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, URISyntaxException {
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path d = new Path("/my-dir");
Assert.assertTrue(fs.mkdirs(d));
// Iterator should have no items when dir is empty
RemoteIterator<FileStatus> it = fs.listStatusIterator(d);
assertFalse(it.hasNext());
Path p = new Path(d, "file-" + 0);
Assert.assertTrue(fs.createNewFile(p));
// Iterator should have an item when dir is not empty
it = fs.listStatusIterator(d);
assertTrue(it.hasNext());
it.next();
assertFalse(it.hasNext());
for (int i = 1; i < listLimit * 3; i++) {
p = new Path(d, "file-" + i);
Assert.assertTrue(fs.createNewFile(p));
}
// Check the FileStatus[] listing
FileStatus[] statuses = fs.listStatus(d);
Assert.assertEquals(listLimit * 3, statuses.length);
// Check the iterator-based listing
GenericTestUtils.setLogLevel(WebHdfsFileSystem.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG,
Level.TRACE);
it = fs.listStatusIterator(d);
int count = 0;
while (it.hasNext()) {
FileStatus stat = it.next();
assertEquals("FileStatuses not equal", statuses[count], stat);
count++;
}
assertEquals("Different # of statuses!", statuses.length, count);
// Do some more basic iterator tests
it = fs.listStatusIterator(d);
// Try advancing the iterator without calling hasNext()
for (int i = 0; i < statuses.length; i++) {
FileStatus stat = it.next();
assertEquals("FileStatuses not equal", statuses[i], stat);
}
assertFalse("No more items expected", it.hasNext());
// Try doing next when out of items
try {
it.next();
fail("Iterator should error if out of elements.");
} catch (NoSuchElementException e) {
// pass
}
return null;
}
});
}
/**
* Test client receives correct DSQuotaExceededException.
*/
@Test
public void testExceedingFileSpaceQuota() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
long spaceQuota = 50L << 20;
long fileLength = 80L << 20;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path dir = new Path("/test/largeFile");
assertTrue(fs.mkdirs(dir));
final byte[] data = new byte[1 << 20];
RANDOM.nextBytes(data);
cluster.getFileSystem().setQuota(dir, HdfsConstants.QUOTA_DONT_SET,
spaceQuota);
final Path p = new Path(dir, "file");
FSDataOutputStream out = fs.create(p);
try {
for (long remaining = fileLength; remaining > 0;) {
final int n = (int) Math.min(remaining, data.length);
out.write(data, 0, n);
remaining -= n;
}
fail("should have thrown exception during the write");
} catch (DSQuotaExceededException e) {
// expected
} finally {
try {
out.close();
} catch (Exception e) {
// discard exception from close
}
}
}
@Test(timeout=300000)
public void testCustomizedUserAndGroupNames() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// Modify username pattern to allow numeric usernames
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, "^[A-Za-z0-9_][A-Za-z0-9" +
"._-]*[$]?$");
// Modify acl pattern to allow numeric and "@" characters user/groups in ACL spec
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_KEY,
"^(default:)?(user|group|mask|other):" +
"[[0-9A-Za-z_][@A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?" +
"(user|group|mask|other):[[0-9A-Za-z_][@A-Za-z0-9._-]]*:([rwx-]{3})?)*$");
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
.setPermission(new Path("/"),
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
// Test a numeric username
UserGroupInformation
.createUserForTesting("123", new String[] {"my-group" })
.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, URISyntaxException {
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path d = new Path("/my-dir");
Assert.assertTrue(fs.mkdirs(d));
// Test also specifying a default ACL with a numeric username
// and another of a groupname with '@'
fs.modifyAclEntries(d, ImmutableList.of(new AclEntry.Builder()
.setPermission(FsAction.READ).setScope(AclEntryScope.DEFAULT)
.setType(AclEntryType.USER).setName("11010").build(),
new AclEntry.Builder().setPermission(FsAction.READ_WRITE)
.setType(AclEntryType.GROUP).setName("foo@bar").build()));
return null;
}
});
}
/**
* Test for catching "no datanode" IOException, when to create a file
* but datanode is not running for some reason.
*/
@Test(timeout=300000)
public void testCreateWithNoDN() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cluster.waitActive();
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
fs.create(new Path("/testnodatanode"));
Assert.fail("No exception was thrown");
} catch (IOException ex) {
GenericTestUtils.assertExceptionContains("Failed to find datanode", ex);
}
}
@Test
public void testWebHdfsCreateWithInvalidPath() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
// A path name include duplicated slashes.
String path = "//tmp//file";
assertResponse(path);
}
private String getUri(String path) {
final String user = System.getProperty("user.name");
final StringBuilder uri = new StringBuilder(cluster.getHttpUri(0));
uri.append("/webhdfs/v1").
append(path).
append("?op=CREATE").
append("&user.name=" + user);
return uri.toString();
}
private void assertResponse(String path) throws IOException {
URL url = new URL(getUri(path));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
// Assert response code.
assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
// Assert exception.
Map<?, ?> response = WebHdfsFileSystem.jsonParse(conn, true);
assertEquals("InvalidPathException",
((LinkedHashMap) response.get("RemoteException")).get("exception"));
conn.disconnect();
}
/**
* Test allow and disallow snapshot through WebHdfs. Verifying webhdfs with
* Distributed filesystem methods.
*/
@Test
public void testWebHdfsAllowandDisallowSnapshots() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path bar = new Path("/bar");
dfs.mkdirs(bar);
// allow snapshots on /bar using webhdfs
webHdfs.allowSnapshot(bar);
// check if snapshot status is enabled
assertTrue(dfs.getFileStatus(bar).isSnapshotEnabled());
assertTrue(webHdfs.getFileStatus(bar).isSnapshotEnabled());
webHdfs.createSnapshot(bar, "s1");
final Path s1path = SnapshotTestHelper.getSnapshotRoot(bar, "s1");
Assert.assertTrue(webHdfs.exists(s1path));
SnapshottableDirectoryStatus[] snapshottableDirs =
dfs.getSnapshottableDirListing();
assertEquals(1, snapshottableDirs.length);
assertEquals(bar, snapshottableDirs[0].getFullPath());
dfs.deleteSnapshot(bar, "s1");
dfs.disallowSnapshot(bar);
// check if snapshot status is disabled
assertFalse(dfs.getFileStatus(bar).isSnapshotEnabled());
assertFalse(webHdfs.getFileStatus(bar).isSnapshotEnabled());
snapshottableDirs = dfs.getSnapshottableDirListing();
assertNull(snapshottableDirs);
// disallow snapshots on /bar using webhdfs
dfs.allowSnapshot(bar);
// check if snapshot status is enabled, again
assertTrue(dfs.getFileStatus(bar).isSnapshotEnabled());
assertTrue(webHdfs.getFileStatus(bar).isSnapshotEnabled());
snapshottableDirs = dfs.getSnapshottableDirListing();
assertEquals(1, snapshottableDirs.length);
assertEquals(bar, snapshottableDirs[0].getFullPath());
webHdfs.disallowSnapshot(bar);
// check if snapshot status is disabled, again
assertFalse(dfs.getFileStatus(bar).isSnapshotEnabled());
assertFalse(webHdfs.getFileStatus(bar).isSnapshotEnabled());
snapshottableDirs = dfs.getSnapshottableDirListing();
assertNull(snapshottableDirs);
try {
webHdfs.createSnapshot(bar);
fail("Cannot create snapshot on a non-snapshottable directory");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains(
"Directory is not a snapshottable directory", e);
}
}
@Test (timeout = 60000)
public void testWebHdfsErasureCodingFiles() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path ecDir = new Path("/ec");
dfs.mkdirs(ecDir);
dfs.setErasureCodingPolicy(ecDir, SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.XOR_2_1_POLICY_ID).getName());
final Path ecFile = new Path(ecDir, "ec-file.log");
DFSTestUtil.createFile(dfs, ecFile, 1024 * 10, (short) 1, 0xFEED);
final Path normalDir = new Path("/dir");
dfs.mkdirs(normalDir);
final Path normalFile = new Path(normalDir, "file.log");
DFSTestUtil.createFile(dfs, normalFile, 1024 * 10, (short) 1, 0xFEED);
FileStatus expectedECDirStatus = dfs.getFileStatus(ecDir);
FileStatus actualECDirStatus = webHdfs.getFileStatus(ecDir);
Assert.assertEquals(expectedECDirStatus.isErasureCoded(),
actualECDirStatus.isErasureCoded());
ContractTestUtils.assertErasureCoded(dfs, ecDir);
assertTrue(
ecDir + " should have erasure coding set in "
+ "FileStatus#toString(): " + actualECDirStatus,
actualECDirStatus.toString().contains("isErasureCoded=true"));
FileStatus expectedECFileStatus = dfs.getFileStatus(ecFile);
FileStatus actualECFileStatus = webHdfs.getFileStatus(ecFile);
Assert.assertEquals(expectedECFileStatus.isErasureCoded(),
actualECFileStatus.isErasureCoded());
ContractTestUtils.assertErasureCoded(dfs, ecFile);
assertTrue(
ecFile + " should have erasure coding set in "
+ "FileStatus#toString(): " + actualECFileStatus,
actualECFileStatus.toString().contains("isErasureCoded=true"));
FileStatus expectedNormalDirStatus = dfs.getFileStatus(normalDir);
FileStatus actualNormalDirStatus = webHdfs.getFileStatus(normalDir);
Assert.assertEquals(expectedNormalDirStatus.isErasureCoded(),
actualNormalDirStatus.isErasureCoded());
ContractTestUtils.assertNotErasureCoded(dfs, normalDir);
assertTrue(
normalDir + " should have erasure coding unset in "
+ "FileStatus#toString(): " + actualNormalDirStatus,
actualNormalDirStatus.toString().contains("isErasureCoded=false"));
FileStatus expectedNormalFileStatus = dfs.getFileStatus(normalFile);
FileStatus actualNormalFileStatus = webHdfs.getFileStatus(normalDir);
Assert.assertEquals(expectedNormalFileStatus.isErasureCoded(),
actualNormalFileStatus.isErasureCoded());
ContractTestUtils.assertNotErasureCoded(dfs, normalFile);
assertTrue(
normalFile + " should have erasure coding unset in "
+ "FileStatus#toString(): " + actualNormalFileStatus,
actualNormalFileStatus.toString().contains("isErasureCoded=false"));
}
/**
* Test snapshot creation through WebHdfs.
*/
@Test
public void testWebHdfsCreateSnapshot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
dfs.mkdirs(foo);
try {
webHdfs.createSnapshot(foo);
fail("Cannot create snapshot on a non-snapshottable directory");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains(
"Directory is not a snapshottable directory", e);
}
// allow snapshots on /foo
dfs.allowSnapshot(foo);
// create snapshots on foo using WebHdfs
webHdfs.createSnapshot(foo, "s1");
// create snapshot without specifying name
final Path spath = webHdfs.createSnapshot(foo, null);
Assert.assertTrue(webHdfs.exists(spath));
final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
Assert.assertTrue(webHdfs.exists(s1path));
}
/**
* Test snapshot deletion through WebHdfs.
*/
@Test
public void testWebHdfsDeleteSnapshot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
dfs.mkdirs(foo);
dfs.allowSnapshot(foo);
webHdfs.createSnapshot(foo, "s1");
final Path spath = webHdfs.createSnapshot(foo, null);
Assert.assertTrue(webHdfs.exists(spath));
final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
Assert.assertTrue(webHdfs.exists(s1path));
// delete operation snapshot name as null
try {
webHdfs.deleteSnapshot(foo, null);
fail("Expected IllegalArgumentException");
} catch (RemoteException e) {
Assert.assertEquals("Required param snapshotname for "
+ "op: DELETESNAPSHOT is null or empty", e.getLocalizedMessage());
}
// delete the two snapshots
webHdfs.deleteSnapshot(foo, "s1");
assertFalse(webHdfs.exists(s1path));
webHdfs.deleteSnapshot(foo, spath.getName());
assertFalse(webHdfs.exists(spath));
}
/**
* Test snapshot diff through WebHdfs.
*/
@Test
public void testWebHdfsSnapshotDiff() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
dfs.mkdirs(foo);
Path file0 = new Path(foo, "file0");
DFSTestUtil.createFile(dfs, file0, 100, (short) 1, 0);
Path file1 = new Path(foo, "file1");
DFSTestUtil.createFile(dfs, file1, 100, (short) 1, 0);
Path file2 = new Path(foo, "file2");
DFSTestUtil.createFile(dfs, file2, 100, (short) 1, 0);
dfs.allowSnapshot(foo);
webHdfs.createSnapshot(foo, "s1");
final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
Assert.assertTrue(webHdfs.exists(s1path));
Path file3 = new Path(foo, "file3");
DFSTestUtil.createFile(dfs, file3, 100, (short) 1, 0);
DFSTestUtil.appendFile(dfs, file0, 100);
dfs.delete(file1, false);
Path file4 = new Path(foo, "file4");
dfs.rename(file2, file4);
webHdfs.createSnapshot(foo, "s2");
SnapshotDiffReport diffReport =
webHdfs.getSnapshotDiffReport(foo, "s1", "s2");
Assert.assertEquals("/foo", diffReport.getSnapshotRoot());
Assert.assertEquals("s1", diffReport.getFromSnapshot());
Assert.assertEquals("s2", diffReport.getLaterSnapshotName());
DiffReportEntry entry0 =
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes(""));
DiffReportEntry entry1 =
new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("file0"));
DiffReportEntry entry2 =
new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("file1"));
DiffReportEntry entry3 = new DiffReportEntry(DiffType.RENAME,
DFSUtil.string2Bytes("file2"), DFSUtil.string2Bytes("file4"));
DiffReportEntry entry4 =
new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("file3"));
Assert.assertTrue(diffReport.getDiffList().contains(entry0));
Assert.assertTrue(diffReport.getDiffList().contains(entry1));
Assert.assertTrue(diffReport.getDiffList().contains(entry2));
Assert.assertTrue(diffReport.getDiffList().contains(entry3));
Assert.assertTrue(diffReport.getDiffList().contains(entry4));
Assert.assertEquals(diffReport.getDiffList().size(), 5);
// Test with fromSnapshot and toSnapshot as null.
diffReport = webHdfs.getSnapshotDiffReport(foo, null, "s2");
Assert.assertEquals(diffReport.getDiffList().size(), 0);
diffReport = webHdfs.getSnapshotDiffReport(foo, "s1", null);
Assert.assertEquals(diffReport.getDiffList().size(), 5);
}
/**
* Test snapshottable directory list through WebHdfs.
*/
@Test
public void testWebHdfsSnapshottableDirectoryList() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
final Path bar = new Path("/bar");
dfs.mkdirs(foo);
dfs.mkdirs(bar);
SnapshottableDirectoryStatus[] statuses =
webHdfs.getSnapshottableDirectoryList();
Assert.assertNull(statuses);
dfs.allowSnapshot(foo);
dfs.allowSnapshot(bar);
Path file0 = new Path(foo, "file0");
DFSTestUtil.createFile(dfs, file0, 100, (short) 1, 0);
Path file1 = new Path(bar, "file1");
DFSTestUtil.createFile(dfs, file1, 100, (short) 1, 0);
statuses = webHdfs.getSnapshottableDirectoryList();
SnapshottableDirectoryStatus[] dfsStatuses =
dfs.getSnapshottableDirListing();
for (int i = 0; i < dfsStatuses.length; i++) {
Assert.assertEquals(statuses[i].getSnapshotNumber(),
dfsStatuses[i].getSnapshotNumber());
Assert.assertEquals(statuses[i].getSnapshotQuota(),
dfsStatuses[i].getSnapshotQuota());
Assert.assertTrue(Arrays.equals(statuses[i].getParentFullPath(),
dfsStatuses[i].getParentFullPath()));
Assert.assertEquals(dfsStatuses[i].getDirStatus().getChildrenNum(),
statuses[i].getDirStatus().getChildrenNum());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getModificationTime(),
statuses[i].getDirStatus().getModificationTime());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isDir(),
statuses[i].getDirStatus().isDir());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getAccessTime(),
statuses[i].getDirStatus().getAccessTime());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getPermission(),
statuses[i].getDirStatus().getPermission());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getOwner(),
statuses[i].getDirStatus().getOwner());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getGroup(),
statuses[i].getDirStatus().getGroup());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getPath(),
statuses[i].getDirStatus().getPath());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getFileId(),
statuses[i].getDirStatus().getFileId());
Assert.assertEquals(dfsStatuses[i].getDirStatus().hasAcl(),
statuses[i].getDirStatus().hasAcl());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isEncrypted(),
statuses[i].getDirStatus().isEncrypted());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isErasureCoded(),
statuses[i].getDirStatus().isErasureCoded());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isSnapshotEnabled(),
statuses[i].getDirStatus().isSnapshotEnabled());
}
}
@Test
public void testWebHdfsSnapshotList() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
dfs.mkdirs(foo);
dfs.allowSnapshot(foo);
webHdfs.createSnapshot(foo, "s1");
webHdfs.createSnapshot(foo, "s2");
webHdfs.deleteSnapshot(foo, "s2");
SnapshotStatus[] statuses = webHdfs.getSnapshotListing(foo);
SnapshotStatus[] dfsStatuses = dfs.getSnapshotListing(foo);
for (int i = 0; i < dfsStatuses.length; i++) {
Assert.assertEquals(statuses[i].getSnapshotID(),
dfsStatuses[i].getSnapshotID());
Assert.assertEquals(statuses[i].isDeleted(),
dfsStatuses[i].isDeleted());
Assert.assertTrue(Arrays.equals(statuses[i].getParentFullPath(),
dfsStatuses[i].getParentFullPath()));
Assert.assertEquals(dfsStatuses[i].getDirStatus().getChildrenNum(),
statuses[i].getDirStatus().getChildrenNum());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getModificationTime(),
statuses[i].getDirStatus().getModificationTime());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isDir(),
statuses[i].getDirStatus().isDir());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getAccessTime(),
statuses[i].getDirStatus().getAccessTime());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getPermission(),
statuses[i].getDirStatus().getPermission());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getOwner(),
statuses[i].getDirStatus().getOwner());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getGroup(),
statuses[i].getDirStatus().getGroup());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getPath(),
statuses[i].getDirStatus().getPath());
Assert.assertEquals(dfsStatuses[i].getDirStatus().getFileId(),
statuses[i].getDirStatus().getFileId());
Assert.assertEquals(dfsStatuses[i].getDirStatus().hasAcl(),
statuses[i].getDirStatus().hasAcl());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isEncrypted(),
statuses[i].getDirStatus().isEncrypted());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isErasureCoded(),
statuses[i].getDirStatus().isErasureCoded());
Assert.assertEquals(dfsStatuses[i].getDirStatus().isSnapshotEnabled(),
statuses[i].getDirStatus().isSnapshotEnabled());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testWebHdfsCreateNonRecursive() throws IOException, URISyntaxException {
final Configuration conf = WebHdfsTestUtil.createConf();
WebHdfsFileSystem webHdfs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
TestFileCreation.testFileCreationNonRecursive(webHdfs);
} finally {
if(webHdfs != null) {
webHdfs.close();
}
}
}
/**
* Test snapshot rename through WebHdfs.
*/
@Test
public void testWebHdfsRenameSnapshot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final Path foo = new Path("/foo");
dfs.mkdirs(foo);
dfs.allowSnapshot(foo);
webHdfs.createSnapshot(foo, "s1");
final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1");
Assert.assertTrue(webHdfs.exists(s1path));
// rename s1 to s2 with oldsnapshotName as null
try {
webHdfs.renameSnapshot(foo, null, "s2");
fail("Expected IllegalArgumentException");
} catch (RemoteException e) {
Assert.assertEquals("Required param oldsnapshotname for "
+ "op: RENAMESNAPSHOT is null or empty", e.getLocalizedMessage());
}
// rename s1 to s2
webHdfs.renameSnapshot(foo, "s1", "s2");
assertFalse(webHdfs.exists(s1path));
final Path s2path = SnapshotTestHelper.getSnapshotRoot(foo, "s2");
Assert.assertTrue(webHdfs.exists(s2path));
webHdfs.deleteSnapshot(foo, "s2");
assertFalse(webHdfs.exists(s2path));
}
/**
* Make sure a RetriableException is thrown when rpcServer is null in
* NamenodeWebHdfsMethods.
*/
@Test
public void testRaceWhileNNStartup() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final NameNode namenode = cluster.getNameNode();
final NamenodeProtocols rpcServer = namenode.getRpcServer();
Whitebox.setInternalState(namenode, "rpcServer", null);
final Path foo = new Path("/foo");
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try {
webHdfs.mkdirs(foo);
fail("Expected RetriableException");
} catch (RetriableException e) {
GenericTestUtils.assertExceptionContains("Namenode is in startup mode",
e);
}
Whitebox.setInternalState(namenode, "rpcServer", rpcServer);
}
@Test
public void testDTInInsecureClusterWithFallback()
throws IOException, URISyntaxException {
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean(CommonConfigurationKeys
.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Assert.assertNull(webHdfs.getDelegationToken(null));
}
@Test
public void testDTInInsecureCluster() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
webHdfs.getDelegationToken(null);
fail("No exception is thrown.");
} catch (AccessControlException ace) {
Assert.assertTrue(ace.getMessage().startsWith(
WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG));
}
}
@Test
public void testWebHdfsOffsetAndLength() throws Exception{
final Configuration conf = WebHdfsTestUtil.createConf();
final int OFFSET = 42;
final int LENGTH = 512;
final String PATH = "/foo";
byte[] CONTENTS = new byte[1024];
RANDOM.nextBytes(CONTENTS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(new Path(PATH))) {
os.write(CONTENTS);
}
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
URL url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN"
+ Param.toSortedString("&", new OffsetParam((long) OFFSET),
new LengthParam((long) LENGTH)));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(true);
Assert.assertEquals(LENGTH, conn.getContentLength());
byte[] subContents = new byte[LENGTH];
byte[] realContents = new byte[LENGTH];
System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH);
IOUtils.readFully(conn.getInputStream(), realContents);
Assert.assertArrayEquals(subContents, realContents);
}
@Test
public void testContentSummary() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/QuotaDir");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(path);
dfs.setQuotaByStorageType(path, StorageType.DISK, 100000);
ContentSummary contentSummary = webHdfs.getContentSummary(path);
Assert
.assertTrue((contentSummary.getTypeQuota(StorageType.DISK) == 100000));
}
/**
* Test Snapshot related information in ContentSummary.
*/
@Test
public void testSnapshotInContentSummary() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
Path dirPath = new Path("/dir");
final Path filePath = new Path("/dir/file");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
DFSTestUtil.createFile(dfs, filePath, 10, (short) 3, 0L);
dfs.allowSnapshot(dirPath);
dfs.createSnapshot(dirPath);
dfs.delete(filePath, true);
ContentSummary contentSummary = webHdfs.getContentSummary(dirPath);
assertEquals(1, contentSummary.getSnapshotFileCount());
assertEquals(10, contentSummary.getSnapshotLength());
assertEquals(30, contentSummary.getSnapshotSpaceConsumed());
assertEquals(dfs.getContentSummary(dirPath),
webHdfs.getContentSummary(dirPath));
}
@Test
public void testQuotaUsage() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/TestDir");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
final long nsQuota = 100;
final long spaceQuota = 600 * 1024 * 1024;
final long diskQuota = 100000;
final byte[] bytes = {0x0, 0x1, 0x2, 0x3 };
dfs.mkdirs(path);
dfs.setQuota(path, nsQuota, spaceQuota);
for (int i = 0; i < 10; i++) {
dfs.createNewFile(new Path(path, "test_file_" + i));
}
FSDataOutputStream out = dfs.create(new Path(path, "test_file"));
out.write(bytes);
out.close();
dfs.setQuotaByStorageType(path, StorageType.DISK, diskQuota);
QuotaUsage quotaUsage = webHdfs.getQuotaUsage(path);
assertEquals(12, quotaUsage.getFileAndDirectoryCount());
assertEquals(nsQuota, quotaUsage.getQuota());
assertEquals(bytes.length * dfs.getDefaultReplication(),
quotaUsage.getSpaceConsumed());
assertEquals(spaceQuota, quotaUsage.getSpaceQuota());
assertEquals(diskQuota, quotaUsage.getTypeQuota(StorageType.DISK));
}
@Test
public void testSetQuota() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/TestDir");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
final long nsQuota = 100;
final long spaceQuota = 1024;
webHdfs.mkdirs(path);
webHdfs.setQuota(path, nsQuota, spaceQuota);
QuotaUsage quotaUsage = dfs.getQuotaUsage(path);
assertEquals(nsQuota, quotaUsage.getQuota());
assertEquals(spaceQuota, quotaUsage.getSpaceQuota());
webHdfs.setQuota(path, HdfsConstants.QUOTA_RESET,
HdfsConstants.QUOTA_RESET);
quotaUsage = dfs.getQuotaUsage(path);
assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getQuota());
assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getSpaceQuota());
webHdfs.setQuotaByStorageType(path, StorageType.DISK, spaceQuota);
webHdfs.setQuotaByStorageType(path, StorageType.ARCHIVE, spaceQuota);
webHdfs.setQuotaByStorageType(path, StorageType.SSD, spaceQuota);
webHdfs.setQuotaByStorageType(path, StorageType.NVDIMM, spaceQuota);
quotaUsage = dfs.getQuotaUsage(path);
assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.DISK));
assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.ARCHIVE));
assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.SSD));
assertEquals(spaceQuota, quotaUsage.getTypeQuota(StorageType.NVDIMM));
// Test invalid parameters
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuota(path, -100, 100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuota(path, 100, -100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuotaByStorageType(path, StorageType.SSD, -100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuotaByStorageType(path, null, 100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuotaByStorageType(path, StorageType.SSD, -100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuotaByStorageType(path, StorageType.RAM_DISK, 100));
LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> webHdfs.setQuotaByStorageType(path, StorageType.NVDIMM, -100));
}
@Test
public void testWebHdfsPread() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
byte[] content = new byte[1024];
RANDOM.nextBytes(content);
final Path foo = new Path("/foo");
FSDataInputStream in = null;
try {
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(foo)) {
os.write(content);
}
// pread
in = fs.open(foo, 1024);
byte[] buf = new byte[1024];
try {
in.readFully(1020, buf, 0, 5);
Assert.fail("EOF expected");
} catch (EOFException ignored) {}
// mix pread with stateful read
int length = in.read(buf, 0, 512);
in.readFully(100, new byte[1024], 0, 100);
int preadLen = in.read(200, new byte[1024], 0, 200);
Assert.assertTrue(preadLen > 0);
IOUtils.readFully(in, buf, length, 1024 - length);
Assert.assertArrayEquals(content, buf);
} finally {
if (in != null) {
in.close();
}
}
}
@Test(timeout = 30000)
public void testGetHomeDirectory() throws Exception {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem hdfs = cluster.getFileSystem();
final URI uri = new URI(WebHdfsConstants.WEBHDFS_SCHEME + "://"
+ cluster.getHttpUri(0).replace("http://", ""));
final Configuration confTemp = new Configuration();
WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri,
confTemp);
assertEquals(hdfs.getHomeDirectory().toUri().getPath(),
webhdfs.getHomeDirectory().toUri().getPath());
webhdfs.close();
webhdfs = createWebHDFSAsTestUser(confTemp, uri, "XXX");
assertNotEquals(hdfs.getHomeDirectory().toUri().getPath(),
webhdfs.getHomeDirectory().toUri().getPath());
webhdfs.close();
}
@Test
public void testWebHdfsGetBlockLocationsWithStorageType() throws Exception{
final Configuration conf = WebHdfsTestUtil.createConf();
final int OFFSET = 42;
final int LENGTH = 512;
final Path PATH = new Path("/foo");
byte[] CONTENTS = new byte[1024];
RANDOM.nextBytes(CONTENTS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(PATH)) {
os.write(CONTENTS);
}
BlockLocation[] locations = fs.getFileBlockLocations(PATH, OFFSET, LENGTH);
for (BlockLocation location : locations) {
StorageType[] storageTypes = location.getStorageTypes();
Assert.assertTrue(storageTypes != null && storageTypes.length > 0
&& storageTypes[0] == StorageType.DISK);
}
}
@Test
public void testWebHdfsGetBlockLocations() throws Exception{
final Configuration conf = WebHdfsTestUtil.createConf();
final int offset = 42;
final int length = 512;
final Path path = new Path("/foo");
byte[] contents = new byte[1024];
RANDOM.nextBytes(contents);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(path)) {
os.write(contents);
}
BlockLocation[] locations = fs.getFileBlockLocations(path, offset, length);
// Query webhdfs REST API to get block locations
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
// Case 1
// URL without length or offset parameters
URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
String response1 = getResponse(url1, "GET");
// Parse BlockLocation array from json output using object mapper
BlockLocation[] locationArray1 = toBlockLocationArray(response1);
// Verify the result from rest call is same as file system api
verifyEquals(locations, locationArray1);
// Case 2
// URL contains length and offset parameters
URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + length + "&offset=" + offset);
String response2 = getResponse(url2, "GET");
BlockLocation[] locationArray2 = toBlockLocationArray(response2);
verifyEquals(locations, locationArray2);
// Case 3
// URL contains length parameter but without offset parameters
URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + length);
String response3 = getResponse(url3, "GET");
BlockLocation[] locationArray3 = toBlockLocationArray(response3);
verifyEquals(locations, locationArray3);
// Case 4
// URL contains offset parameter but without length parameter
URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=" + offset);
String response4 = getResponse(url4, "GET");
BlockLocation[] locationArray4 = toBlockLocationArray(response4);
verifyEquals(locations, locationArray4);
// Case 5
// URL specifies offset exceeds the file length
URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=1200");
String response5 = getResponse(url5, "GET");
BlockLocation[] locationArray5 = toBlockLocationArray(response5);
// Expected an empty array of BlockLocation
verifyEquals(new BlockLocation[] {}, locationArray5);
}
private BlockLocation[] toBlockLocationArray(String json)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
MapType subType = mapper.getTypeFactory().constructMapType(
Map.class,
String.class,
BlockLocation[].class);
MapType rootType = mapper.getTypeFactory().constructMapType(
Map.class,
mapper.constructType(String.class),
mapper.constructType(subType));
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
.readValue(json, rootType);
Map<String, BlockLocation[]> locationMap = jsonMap
.get("BlockLocations");
BlockLocation[] locationArray = locationMap.get(
BlockLocation.class.getSimpleName());
return locationArray;
}
private void verifyEquals(BlockLocation[] locations1,
BlockLocation[] locations2) throws IOException {
for(int i=0; i<locations1.length; i++) {
BlockLocation location1 = locations1[i];
BlockLocation location2 = locations2[i];
Assert.assertEquals(location1.getLength(),
location2.getLength());
Assert.assertEquals(location1.getOffset(),
location2.getOffset());
Assert.assertArrayEquals(location1.getCachedHosts(),
location2.getCachedHosts());
Assert.assertArrayEquals(location1.getHosts(),
location2.getHosts());
Assert.assertArrayEquals(location1.getNames(),
location2.getNames());
Assert.assertArrayEquals(location1.getTopologyPaths(),
location2.getTopologyPaths());
Assert.assertArrayEquals(location1.getStorageTypes(),
location2.getStorageTypes());
}
}
private static String getResponse(URL url, String httpRequestType)
throws IOException {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(httpRequestType);
conn.setInstanceFollowRedirects(false);
return IOUtils.toString(conn.getInputStream(),
StandardCharsets.UTF_8);
} finally {
if(conn != null) {
conn.disconnect();
}
}
}
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
final URI uri, final String userName) throws Exception {
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
userName, new String[] {"supergroup" });
return ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
@Override
public WebHdfsFileSystem run() throws IOException {
WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri,
conf);
return webhdfs;
}
});
}
@Test(timeout=90000)
public void testWebHdfsReadRetries() throws Exception {
// ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
final Configuration conf = WebHdfsTestUtil.createConf();
final Path dir = new Path("/testWebHdfsReadRetries");
conf.setBoolean(HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, true);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
final short numDatanodes = 1;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.build();
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// create a file
final long length = 1L << 20;
final Path file1 = new Path(dir, "testFile");
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
// get file status and check that it was written properly.
final FileStatus s1 = fs.getFileStatus(file1);
assertEquals("Write failed for file " + file1, length, s1.getLen());
// Ensure file can be read through WebHdfsInputStream
FSDataInputStream in = fs.open(file1);
assertTrue("Input stream is not an instance of class WebHdfsInputStream",
in.getWrappedStream() instanceof WebHdfsInputStream);
int count = 0;
for (; in.read() != -1; count++)
;
assertEquals("Read failed for file " + file1, s1.getLen(), count);
assertEquals("Sghould not be able to read beyond end of file", in.read(),
-1);
in.close();
try {
in.read();
fail("Read after close should have failed");
} catch (IOException ioe) {
}
WebHdfsFileSystem wfs = (WebHdfsFileSystem) fs;
// Read should not be retried if AccessControlException is encountered.
String msg = "ReadRetries: Test Access Control Exception";
testReadRetryExceptionHelper(wfs, file1, new AccessControlException(msg),
msg, false, 1);
// Retry policy should be invoked if IOExceptions are thrown.
msg = "ReadRetries: Test SocketTimeoutException";
testReadRetryExceptionHelper(wfs, file1, new SocketTimeoutException(msg),
msg, true, 5);
msg = "ReadRetries: Test SocketException";
testReadRetryExceptionHelper(wfs, file1, new SocketException(msg), msg,
true, 5);
msg = "ReadRetries: Test EOFException";
testReadRetryExceptionHelper(wfs, file1, new EOFException(msg), msg, true,
5);
msg = "ReadRetries: Test Generic IO Exception";
testReadRetryExceptionHelper(wfs, file1, new IOException(msg), msg, true,
5);
// If InvalidToken exception occurs, WebHdfs only retries if the
// delegation token was replaced. Do that twice, then verify by checking
// the number of times it tried.
WebHdfsFileSystem spyfs = spy(wfs);
when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
msg = "ReadRetries: Test Invalid Token Exception";
testReadRetryExceptionHelper(spyfs, file1, new InvalidToken(msg), msg,
false, 3);
}
public boolean attemptedRetry;
private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
final IOException ex, String msg, boolean shouldAttemptRetry,
int numTimesTried)
throws Exception {
// Ovverride WebHdfsInputStream#getInputStream so that it returns
// an input stream that throws the specified exception when read
// is called.
FSDataInputStream in = fs.open(fn);
in.read(); // Connection is made only when the first read() occurs.
final WebHdfsInputStream webIn =
(WebHdfsInputStream)(in.getWrappedStream());
final InputStream spyInputStream =
spy(webIn.getReadRunner().getInputStream());
doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
doReturn(spyInputStream)
.when(rr).initializeInputStream((HttpURLConnection) any());
rr.setInputStream(spyInputStream);
webIn.setReadRunner(rr);
// Override filesystem's retry policy in order to verify that
// WebHdfsInputStream is calling shouldRetry for the appropriate
// exceptions.
final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
RetryPolicy rp = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
attemptedRetry = true;
if (retries > 3) {
return failAction;
} else {
return retryAction;
}
}
};
fs.setRetryPolicy(rp);
// If the retry logic is exercised, attemptedRetry will be true. Some
// exceptions should exercise the retry logic and others should not.
// Either way, the value of attemptedRetry should match shouldAttemptRetry.
attemptedRetry = false;
try {
webIn.read();
fail(msg + ": Read should have thrown exception.");
} catch (Exception e) {
assertTrue(e.getMessage().contains(msg));
}
assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
+ "have called shouldRetry. ",
attemptedRetry, shouldAttemptRetry);
verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
webIn.close();
in.close();
}
private void checkResponseContainsLocation(URL url, String TYPE)
throws JSONException, IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(TYPE);
conn.setInstanceFollowRedirects(false);
String response =
IOUtils.toString(conn.getInputStream(), StandardCharsets.UTF_8);
LOG.info("Response was : " + response);
Assert.assertEquals(
"Response wasn't " + HttpURLConnection.HTTP_OK,
HttpURLConnection.HTTP_OK, conn.getResponseCode());
JSONObject responseJson = new JSONObject(response);
Assert.assertTrue("Response didn't give us a location. " + response,
responseJson.has("Location"));
//Test that the DN allows CORS on Create
if(TYPE.equals("CREATE")) {
URL dnLocation = new URL(responseJson.getString("Location"));
HttpURLConnection dnConn = (HttpURLConnection) dnLocation.openConnection();
dnConn.setRequestMethod("OPTIONS");
Assert.assertEquals("Datanode url : " + dnLocation + " didn't allow "
+ "CORS", HttpURLConnection.HTTP_OK, dnConn.getResponseCode());
}
}
@Test
/**
* Test that when "&noredirect=true" is added to operations CREATE, APPEND,
* OPEN, and GETFILECHECKSUM the response (which is usually a 307 temporary
* redirect) is a 200 with JSON that contains the redirected location
*/
public void testWebHdfsNoRedirect() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
LOG.info("Started cluster");
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
URL url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirectCreate"
+ "?op=CREATE"
+ Param.toSortedString("&", new NoRedirectParam(true)));
LOG.info("Sending create request " + url);
checkResponseContainsLocation(url, "PUT");
// Write a file that we can read
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final String PATH = "/testWebHdfsNoRedirect";
byte[] CONTENTS = new byte[1024];
RANDOM.nextBytes(CONTENTS);
try (OutputStream os = fs.create(new Path(PATH))) {
os.write(CONTENTS);
}
url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" + "?op=OPEN"
+ Param.toSortedString("&", new NoRedirectParam(true)));
LOG.info("Sending open request " + url);
checkResponseContainsLocation(url, "GET");
url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect"
+ "?op=GETFILECHECKSUM"
+ Param.toSortedString("&", new NoRedirectParam(true)));
LOG.info("Sending getfilechecksum request " + url);
checkResponseContainsLocation(url, "GET");
url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/testWebHdfsNoRedirect" + "?op=APPEND"
+ Param.toSortedString("&", new NoRedirectParam(true)));
LOG.info("Sending append request " + url);
checkResponseContainsLocation(url, "POST");
}
@Test
public void testGetTrashRoot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path trashPath = webFS.getTrashRoot(new Path("/"));
Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
new Path(currentUser, FileSystem.TRASH_PREFIX));
assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
}
@Test
public void testGetSnapshotTrashRoot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path ssDir1 = new Path("/ssDir1");
assertTrue(webFS.mkdirs(ssDir1));
Path trashPath = webFS.getTrashRoot(ssDir1);
Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
new Path(currentUser, FileSystem.TRASH_PREFIX));
assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
// Enable snapshot
webFS.allowSnapshot(ssDir1);
Path trashPathAfter = webFS.getTrashRoot(ssDir1);
Path expectedPathAfter = new Path(ssDir1,
new Path(FileSystem.TRASH_PREFIX, currentUser));
assertEquals(expectedPathAfter.toUri().getPath(),
trashPathAfter.toUri().getPath());
// Cleanup
webFS.disallowSnapshot(ssDir1);
webFS.delete(ssDir1, true);
}
@Test
public void testGetEZTrashRoot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
FileSystemTestHelper fsHelper = new FileSystemTestHelper();
File testRootDir = new File(fsHelper.getTestRootDir()).getAbsoluteFile();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"jceks://file" + new Path(testRootDir.toString(), "test.jks").toUri());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
HdfsAdmin dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
dfs.getClient().setKeyProvider(
cluster.getNameNode().getNamesystem().getProvider());
final String testkey = "test_key";
DFSTestUtil.createKey(testkey, cluster, conf);
final Path zone1 = new Path("/zone1");
dfs.mkdirs(zone1, new FsPermission(700));
dfsAdmin.createEncryptionZone(zone1, testkey,
EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH));
final Path insideEZ = new Path(zone1, "insideEZ");
dfs.mkdirs(insideEZ, new FsPermission(700));
assertEquals(
dfs.getTrashRoot(insideEZ).toUri().getPath(),
webhdfs.getTrashRoot(insideEZ).toUri().getPath());
final Path outsideEZ = new Path("/outsideEZ");
dfs.mkdirs(outsideEZ, new FsPermission(755));
assertEquals(
dfs.getTrashRoot(outsideEZ).toUri().getPath(),
webhdfs.getTrashRoot(outsideEZ).toUri().getPath());
final Path root = new Path("/");
assertEquals(
dfs.getTrashRoot(root).toUri().getPath(),
webhdfs.getTrashRoot(root).toUri().getPath());
assertEquals(
webhdfs.getTrashRoot(root).toUri().getPath(),
webhdfs.getTrashRoot(zone1).toUri().getPath());
assertEquals(
webhdfs.getTrashRoot(outsideEZ).toUri().getPath(),
webhdfs.getTrashRoot(zone1).toUri().getPath());
}
@Test
public void testStoragePolicy() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final Path path = new Path("/file");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// test getAllStoragePolicies
Assert.assertTrue(Arrays.equals(dfs.getAllStoragePolicies().toArray(),
webHdfs.getAllStoragePolicies().toArray()));
// test get/set/unset policies
DFSTestUtil.createFile(dfs, path, 0, (short) 1, 0L);
// get defaultPolicy
BlockStoragePolicySpi defaultdfsPolicy = dfs.getStoragePolicy(path);
// set policy through webhdfs
webHdfs.setStoragePolicy(path, HdfsConstants.COLD_STORAGE_POLICY_NAME);
// get policy from dfs
BlockStoragePolicySpi dfsPolicy = dfs.getStoragePolicy(path);
// get policy from webhdfs
BlockStoragePolicySpi webHdfsPolicy = webHdfs.getStoragePolicy(path);
Assert.assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME.toString(),
webHdfsPolicy.getName());
Assert.assertEquals(webHdfsPolicy, dfsPolicy);
// unset policy
webHdfs.unsetStoragePolicy(path);
Assert.assertEquals(defaultdfsPolicy, webHdfs.getStoragePolicy(path));
}
@Test
public void testSetStoragePolicyWhenPolicyDisabled() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
try {
cluster.waitActive();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
webHdfs.setStoragePolicy(new Path("/"),
HdfsConstants.COLD_STORAGE_POLICY_NAME);
fail("Should throw exception, when storage policy disabled");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(
"Failed to set storage policy since"));
}
}
private void checkECPolicyState(Collection<ErasureCodingPolicyInfo> policies,
String ecpolicy, String state) {
Iterator<ErasureCodingPolicyInfo> itr = policies.iterator();
boolean found = false;
while (policies.iterator().hasNext()) {
ErasureCodingPolicyInfo policy = itr.next();
if (policy.getPolicy().getName().equals(ecpolicy)) {
found = true;
if (state.equals("disable")) {
Assert.assertTrue(policy.isDisabled());
} else if (state.equals("enable")) {
Assert.assertTrue(policy.isEnabled());
}
break;
}
}
Assert.assertTrue(found);
}
// Test For Enable/Disable EC Policy in DFS.
@Test
public void testECPolicyCommands() throws Exception {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
String policy = "RS-10-4-1024k";
// Check for Enable EC policy via WEBHDFS.
dfs.disableErasureCodingPolicy(policy);
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
webHdfs.enableECPolicy(policy);
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "enable");
Path dir = new Path("/tmp");
dfs.mkdirs(dir);
// Check for Set EC policy via WEBHDFS
assertNull(dfs.getErasureCodingPolicy(dir));
webHdfs.setErasureCodingPolicy(dir, policy);
assertEquals(policy, dfs.getErasureCodingPolicy(dir).getName());
// Check for Get EC policy via WEBHDFS
assertEquals(policy, webHdfs.getErasureCodingPolicy(dir).getName());
// Check for Unset EC policy via WEBHDFS
webHdfs.unsetErasureCodingPolicy(dir);
assertNull(dfs.getErasureCodingPolicy(dir));
// Check for Disable EC policy via WEBHDFS.
webHdfs.disableECPolicy(policy);
checkECPolicyState(dfs.getAllErasureCodingPolicies(), policy, "disable");
}
// Test for Storage Policy Satisfier in DFS.
@Test
public void testWebHdfsSps() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
StoragePolicySatisfier sps = new StoragePolicySatisfier(conf);
try {
cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE } })
.storagesPerDatanode(2).numDataNodes(1).build();
cluster.waitActive();
sps.init(new ExternalSPSContext(sps, DFSTestUtil.getNameNodeConnector(
conf, HdfsServerConstants.MOVER_ID_PATH, 1, false)));
sps.start(StoragePolicySatisfierMode.EXTERNAL);
sps.start(StoragePolicySatisfierMode.EXTERNAL);
DistributedFileSystem dfs = cluster.getFileSystem();
DFSTestUtil.createFile(dfs, new Path("/file"), 1024L, (short) 1, 0L);
DFSTestUtil.waitForReplication(dfs, new Path("/file"), (short) 1, 5000);
dfs.setStoragePolicy(new Path("/file"), "COLD");
dfs.satisfyStoragePolicy(new Path("/file"));
DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 1,
30000, dfs);
} finally {
sps.stopGracefully();
}
}
@Test
public void testWebHdfsAppend() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final int dnNumber = 3;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNumber).build();
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem fs = cluster.getFileSystem();
final Path appendFile = new Path("/testAppend.txt");
final String content = "hello world";
DFSTestUtil.writeFile(fs, appendFile, content);
for (int index = 0; index < dnNumber - 1; index++) {
cluster.shutdownDataNode(index);
}
cluster.restartNameNodes();
cluster.waitActive();
try {
DFSTestUtil.appendFile(webFS, appendFile, content);
fail("Should fail to append file since "
+ "datanode number is 1 and replication is 3");
} catch (IOException ignored) {
String resultContent = DFSTestUtil.readFile(fs, appendFile);
assertTrue(resultContent.equals(content));
}
}
/**
* Test fsserver defaults response from {@link DistributedFileSystem} and
* {@link WebHdfsFileSystem} are the same.
* @throws Exception
*/
@Test
public void testFsserverDefaults() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
// Here we override all the default values so that we can verify that it
// doesn't pick up the default value.
long blockSize = 256*1024*1024;
int bytesPerChecksum = 256;
int writePacketSize = 128*1024;
int replicationFactor = 0;
int bufferSize = 1024;
boolean encryptDataTransfer = true;
long trashInterval = 1;
String checksumType = "CRC32";
// Setting policy to a special value 7 because BlockManager will
// create defaultSuite with policy id 7.
byte policyId = (byte) 7;
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFS_BYTES_PER_CHECKSUM_KEY, bytesPerChecksum);
conf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, writePacketSize);
conf.setInt(DFS_REPLICATION_KEY, replicationFactor);
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, bufferSize);
conf.setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, encryptDataTransfer);
conf.setLong(FS_TRASH_INTERVAL_KEY, trashInterval);
conf.set(DFS_CHECKSUM_TYPE_KEY, checksumType);
FsServerDefaults originalServerDefaults = new FsServerDefaults(blockSize,
bytesPerChecksum, writePacketSize, (short)replicationFactor,
bufferSize, encryptDataTransfer, trashInterval,
DataChecksum.Type.valueOf(checksumType), "", policyId);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final DistributedFileSystem dfs = cluster.getFileSystem();
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
FsServerDefaults dfsServerDefaults = dfs.getServerDefaults();
FsServerDefaults webfsServerDefaults = webfs.getServerDefaults();
// Verify whether server defaults value that we override is equal to
// dfsServerDefaults.
compareFsServerDefaults(originalServerDefaults, dfsServerDefaults);
// Verify whether dfs serverdefaults is equal to
// webhdfsServerDefaults.
compareFsServerDefaults(dfsServerDefaults, webfsServerDefaults);
webfs.getServerDefaults();
}
private void compareFsServerDefaults(FsServerDefaults serverDefaults1,
FsServerDefaults serverDefaults2) throws Exception {
Assert.assertEquals("Block size is different",
serverDefaults1.getBlockSize(),
serverDefaults2.getBlockSize());
Assert.assertEquals("Bytes per checksum are different",
serverDefaults1.getBytesPerChecksum(),
serverDefaults2.getBytesPerChecksum());
Assert.assertEquals("Write packet size is different",
serverDefaults1.getWritePacketSize(),
serverDefaults2.getWritePacketSize());
Assert.assertEquals("Default replication is different",
serverDefaults1.getReplication(),
serverDefaults2.getReplication());
Assert.assertEquals("File buffer size are different",
serverDefaults1.getFileBufferSize(),
serverDefaults2.getFileBufferSize());
Assert.assertEquals("Encrypt data transfer key is different",
serverDefaults1.getEncryptDataTransfer(),
serverDefaults2.getEncryptDataTransfer());
Assert.assertEquals("Trash interval is different",
serverDefaults1.getTrashInterval(),
serverDefaults2.getTrashInterval());
Assert.assertEquals("Checksum type is different",
serverDefaults1.getChecksumType(),
serverDefaults2.getChecksumType());
Assert.assertEquals("Key provider uri is different",
serverDefaults1.getKeyProviderUri(),
serverDefaults2.getKeyProviderUri());
Assert.assertEquals("Default storage policy is different",
serverDefaults1.getDefaultStoragePolicyId(),
serverDefaults2.getDefaultStoragePolicyId());
}
/**
* Tests the case when client is upgraded to return {@link FsServerDefaults}
* but then namenode is not upgraded.
* @throws Exception
*/
@Test
public void testFsserverDefaultsBackwardsCompatible() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
FSNamesystem fsnSpy =
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
Mockito.when(fsnSpy.getServerDefaults())
.thenThrow(new UnsupportedOperationException());
try {
webfs.getServerDefaults();
Assert.fail("should have thrown UnSupportedOperationException.");
} catch (UnsupportedOperationException uoe) {
// Expected exception.
}
}
/**
* Tests that {@link WebHdfsFileSystem.AbstractRunner} propagates original
* exception's stacktrace and cause during runWithRetry attempts.
* @throws Exception
*/
@Test
public void testExceptionPropogationInAbstractRunner() throws Exception{
final Configuration conf = WebHdfsTestUtil.createConf();
final Path dir = new Path("/testExceptionPropogationInAbstractRunner");
conf.setBoolean(HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, true);
final short numDatanodes = 1;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.build();
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// create a file
final long length = 1L << 20;
final Path file1 = new Path(dir, "testFile");
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
// get file status and check that it was written properly.
final FileStatus s1 = fs.getFileStatus(file1);
assertEquals("Write failed for file " + file1, length, s1.getLen());
FSDataInputStream in = fs.open(file1);
in.read(); // Connection is made only when the first read() occurs.
final WebHdfsInputStream webIn =
(WebHdfsInputStream) (in.getWrappedStream());
final String msg = "Throwing dummy exception";
IOException ioe = new IOException(msg, new DummyThrowable());
WebHdfsFileSystem.ReadRunner readRunner = spy(webIn.getReadRunner());
doThrow(ioe).when(readRunner).getResponse(any(HttpURLConnection.class));
webIn.setReadRunner(readRunner);
try {
webIn.read();
fail("Read should have thrown IOException.");
} catch (IOException e) {
assertTrue(e.getMessage().contains(msg));
assertTrue(e.getCause() instanceof DummyThrowable);
}
}
/**
* Tests that the LISTSTATUS ang GETFILESTATUS WebHDFS calls return the
* ecPolicy for EC files.
*/
@Test(timeout=300000)
public void testECPolicyInFileStatus() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
final String ecPolicyName = ecPolicy.getName();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(5)
.build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
// Create an EC dir and write a test file in it
final Path ecDir = new Path("/ec");
Path ecFile = new Path(ecDir, "ec_file.txt");
Path nonEcFile = new Path(ecDir, "non_ec_file.txt");
fs.mkdirs(ecDir);
// Create a non-EC file before enabling ec policy
DFSTestUtil.createFile(fs, nonEcFile, 1024, (short) 1, 0);
fs.enableErasureCodingPolicy(ecPolicyName);
fs.setErasureCodingPolicy(ecDir, ecPolicyName);
// Create a EC file
DFSTestUtil.createFile(fs, ecFile, 1024, (short) 1, 0);
// Query webhdfs REST API to list statuses of files/directories in ecDir
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
URL listStatusUrl = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + ecDir.toString() + "?op=LISTSTATUS");
HttpURLConnection conn = (HttpURLConnection) listStatusUrl.openConnection();
conn.setRequestMethod("GET");
conn.setInstanceFollowRedirects(false);
String listStatusResponse = IOUtils.toString(conn.getInputStream(),
StandardCharsets.UTF_8);
Assert.assertEquals("Response wasn't " + HttpURLConnection.HTTP_OK,
HttpURLConnection.HTTP_OK, conn.getResponseCode());
// Verify that ecPolicy is set in the ListStatus response for ec file
String ecpolicyForECfile = getECPolicyFromFileStatusJson(
getFileStatusJson(listStatusResponse, ecFile.getName()));
assertEquals("EC policy for ecFile should match the set EC policy",
ecpolicyForECfile, ecPolicyName);
// Verify that ecPolicy is not set in the ListStatus response for non-ec
// file
String ecPolicyForNonECfile = getECPolicyFromFileStatusJson(
getFileStatusJson(listStatusResponse, nonEcFile.getName()));
assertEquals("EC policy for nonEcFile should be null (not set)",
ecPolicyForNonECfile, null);
// Query webhdfs REST API to get fileStatus for ecFile
URL getFileStatusUrl = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + ecFile.toString() +
"?op=GETFILESTATUS");
conn = (HttpURLConnection) getFileStatusUrl.openConnection();
conn.setRequestMethod("GET");
conn.setInstanceFollowRedirects(false);
String getFileStatusResponse = IOUtils.toString(conn.getInputStream(),
StandardCharsets.UTF_8);
Assert.assertEquals("Response wasn't " + HttpURLConnection.HTTP_OK,
HttpURLConnection.HTTP_OK, conn.getResponseCode());
// Verify that ecPolicy is set in getFileStatus response for ecFile
JSONObject fileStatusObject = new JSONObject(getFileStatusResponse)
.getJSONObject("FileStatus");
ecpolicyForECfile = getECPolicyFromFileStatusJson(fileStatusObject);
assertEquals("EC policy for ecFile should match the set EC policy",
ecpolicyForECfile, ecPolicyName);
}
@Test
public void testStatistics() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
StoragePolicySatisfier sps = new StoragePolicySatisfier(conf);
try {
cluster = new MiniDFSCluster.Builder(conf).storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
.storagesPerDatanode(2).numDataNodes(1).build();
cluster.waitActive();
sps.init(new ExternalSPSContext(sps, DFSTestUtil
.getNameNodeConnector(conf, HdfsServerConstants.MOVER_ID_PATH, 1,
false)));
sps.start(StoragePolicySatisfierMode.EXTERNAL);
sps.start(StoragePolicySatisfierMode.EXTERNAL);
final WebHdfsFileSystem webHdfs = WebHdfsTestUtil
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
Path dir = new Path("/test");
webHdfs.mkdirs(dir);
int readOps = 0;
int writeOps = 0;
FileSystem.clearStatistics();
long opCount =
getOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICY);
webHdfs.getStoragePolicy(dir);
checkStatistics(webHdfs, ++readOps, writeOps, 0);
checkOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICY,
opCount + 1);
opCount =
getOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICIES);
webHdfs.getAllStoragePolicies();
checkStatistics(webHdfs, ++readOps, writeOps, 0);
checkOpStatistics(DFSOpsCountStatistics.OpType.GET_STORAGE_POLICIES,
opCount + 1);
opCount =
getOpStatistics(DFSOpsCountStatistics.OpType.SATISFY_STORAGE_POLICY);
webHdfs.satisfyStoragePolicy(dir);
checkStatistics(webHdfs, readOps, ++writeOps, 0);
checkOpStatistics(DFSOpsCountStatistics.OpType.SATISFY_STORAGE_POLICY,
opCount + 1);
opCount = getOpStatistics(
DFSOpsCountStatistics.OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST);
webHdfs.getSnapshottableDirectoryList();
checkStatistics(webHdfs, ++readOps, writeOps, 0);
checkOpStatistics(
DFSOpsCountStatistics.OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST,
opCount + 1);
} finally {
cluster.shutdown();
}
}
@Test
public void testLinkTarget() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
cluster.waitActive();
final WebHdfsFileSystem webHdfs =
WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// Symbolic link
Path root = new Path("/webHdfsTest/");
Path targetFile = new Path(root, "debug.log");
FileSystemTestHelper.createFile(webHdfs, targetFile);
Path symLink = new Path(root, "debug.link");
webHdfs.createSymlink(targetFile, symLink, false);
assertEquals(webHdfs.getLinkTarget(symLink), targetFile);
} finally {
cluster.shutdown();
}
}
@Test
public void testFileLinkStatus() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
final WebHdfsFileSystem webHdfs =
WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// Symbolic link
Path root = new Path("/webHdfsTest/");
Path file = new Path(root, "file");
FileSystemTestHelper.createFile(webHdfs, file);
Path linkToFile = new Path(root, "linkToFile");
webHdfs.createSymlink(file, linkToFile, false);
assertFalse(webHdfs.getFileLinkStatus(file).isSymlink());
assertTrue(webHdfs.getFileLinkStatus(linkToFile).isSymlink());
} finally {
cluster.shutdown();
}
}
@Test
public void testFsStatus() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
final WebHdfsFileSystem webHdfs =
WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
final String path = "/foo";
try (OutputStream os = webHdfs.create(new Path(path))) {
os.write(new byte[1024]);
}
FsStatus webHdfsFsStatus = webHdfs.getStatus(new Path("/"));
Assert.assertNotNull(webHdfsFsStatus);
FsStatus dfsFsStatus = dfs.getStatus(new Path("/"));
Assert.assertNotNull(dfsFsStatus);
//Validate used free and capacity are the same as DistributedFileSystem
Assert.assertEquals(webHdfsFsStatus.getUsed(), dfsFsStatus.getUsed());
Assert.assertEquals(webHdfsFsStatus.getRemaining(),
dfsFsStatus.getRemaining());
Assert.assertEquals(webHdfsFsStatus.getCapacity(),
dfsFsStatus.getCapacity());
} finally {
cluster.shutdown();
}
}
@Test
public void testGetErasureCodingPolicies() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final WebHdfsFileSystem webHdfs =
WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
final DistributedFileSystem dfs = cluster.getFileSystem();
Collection<ErasureCodingPolicyInfo> webHdfsEcPolicyInfos =
webHdfs.getAllErasureCodingPolicies();
Collection<ErasureCodingPolicyInfo> dfsEcPolicyInfos =
dfs.getAllErasureCodingPolicies();
//Validate erasureCodingPolicyInfos are the same as DistributedFileSystem
assertEquals(dfsEcPolicyInfos.size(), webHdfsEcPolicyInfos.size());
assertTrue(dfsEcPolicyInfos.containsAll(webHdfsEcPolicyInfos));
} finally {
cluster.shutdown();
}
}
/**
* Get FileStatus JSONObject from ListStatus response.
*/
private JSONObject getFileStatusJson(String response, String fileName)
throws JSONException {
JSONObject listStatusResponseJson = new JSONObject(response);
JSONArray fileStatusArray = listStatusResponseJson
.getJSONObject("FileStatuses")
.getJSONArray("FileStatus");
for (int i = 0; i < fileStatusArray.length(); i++) {
JSONObject fileStatusJsonObject = fileStatusArray.getJSONObject(i);
if (fileName.equals(fileStatusJsonObject.get("pathSuffix"))) {
return fileStatusJsonObject;
}
}
return null;
}
/**
* Get ECPolicy name from FileStatus JSONObject.
*/
private String getECPolicyFromFileStatusJson(JSONObject fileStatusJsonObject)
throws JSONException {
if (fileStatusJsonObject.has("ecPolicy")) {
return fileStatusJsonObject.getString("ecPolicy");
} else {
return null;
}
}
final static class DummyThrowable extends Throwable {
}
}