| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.web; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.HttpURLConnection; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Random; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.TestDFSClientRetries; |
| import org.apache.hadoop.hdfs.TestFileCreation; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; |
| import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.hdfs.web.resources.LengthParam; |
| import org.apache.hadoop.hdfs.web.resources.OffsetParam; |
| import org.apache.hadoop.hdfs.web.resources.Param; |
| import org.apache.hadoop.ipc.RetriableException; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.log4j.Level; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.mockito.internal.util.reflection.Whitebox; |
| |
| /** Test WebHDFS */ |
| public class TestWebHDFS { |
| static final Log LOG = LogFactory.getLog(TestWebHDFS.class); |
| |
| static final Random RANDOM = new Random(); |
| |
| static final long systemStartTime = System.nanoTime(); |
| |
| /** A timer for measuring performance. */ |
| static class Ticker { |
| final String name; |
| final long startTime = System.nanoTime(); |
| private long previousTick = startTime; |
| |
| Ticker(final String name, String format, Object... args) { |
| this.name = name; |
| LOG.info(String.format("\n\n%s START: %s\n", |
| name, String.format(format, args))); |
| } |
| |
| void tick(final long nBytes, String format, Object... args) { |
| final long now = System.nanoTime(); |
| if (now - previousTick > 10000000000L) { |
| previousTick = now; |
| final double mintues = (now - systemStartTime)/60000000000.0; |
| LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues, |
| String.format(format, args), toMpsString(nBytes, now))); |
| } |
| } |
| |
| void end(final long nBytes) { |
| final long now = System.nanoTime(); |
| final double seconds = (now - startTime)/1000000000.0; |
| LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n", |
| name, seconds, toMpsString(nBytes, now))); |
| } |
| |
| String toMpsString(final long nBytes, final long now) { |
| final double mb = nBytes/(double)(1<<20); |
| final double mps = mb*1000000000.0/(now - startTime); |
| return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps); |
| } |
| } |
| |
| @Test(timeout=300000) |
| public void testLargeFile() throws Exception { |
| largeFileTest(200L << 20); //200MB file length |
| } |
| |
| /** Test read and write large files. */ |
| static void largeFileTest(final long fileLength) throws Exception { |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(3) |
| .build(); |
| try { |
| cluster.waitActive(); |
| |
| final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); |
| final Path dir = new Path("/test/largeFile"); |
| Assert.assertTrue(fs.mkdirs(dir)); |
| |
| final byte[] data = new byte[1 << 20]; |
| RANDOM.nextBytes(data); |
| |
| final byte[] expected = new byte[2 * data.length]; |
| System.arraycopy(data, 0, expected, 0, data.length); |
| System.arraycopy(data, 0, expected, data.length, data.length); |
| |
| final Path p = new Path(dir, "file"); |
| final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength); |
| final FSDataOutputStream out = fs.create(p); |
| try { |
| long remaining = fileLength; |
| for(; remaining > 0;) { |
| t.tick(fileLength - remaining, "remaining=%d", remaining); |
| |
| final int n = (int)Math.min(remaining, data.length); |
| out.write(data, 0, n); |
| remaining -= n; |
| } |
| } finally { |
| out.close(); |
| } |
| t.end(fileLength); |
| |
| Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen()); |
| |
| final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20); |
| final long largeOffset = fileLength - smallOffset; |
| final byte[] buf = new byte[data.length]; |
| |
| verifySeek(fs, p, largeOffset, fileLength, buf, expected); |
| verifySeek(fs, p, smallOffset, fileLength, buf, expected); |
| |
| verifyPread(fs, p, largeOffset, fileLength, buf, expected); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| static void checkData(long offset, long remaining, int n, |
| byte[] actual, byte[] expected) { |
| if (RANDOM.nextInt(100) == 0) { |
| int j = (int)(offset % actual.length); |
| for(int i = 0; i < n; i++) { |
| if (expected[j] != actual[i]) { |
| Assert.fail("expected[" + j + "]=" + expected[j] |
| + " != actual[" + i + "]=" + actual[i] |
| + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n); |
| } |
| j++; |
| } |
| } |
| } |
| |
| /** test seek */ |
| static void verifySeek(FileSystem fs, Path p, long offset, long length, |
| byte[] buf, byte[] expected) throws IOException { |
| long remaining = length - offset; |
| long checked = 0; |
| LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining); |
| |
| final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d", |
| offset, remaining); |
| final FSDataInputStream in = fs.open(p, 64 << 10); |
| in.seek(offset); |
| for(; remaining > 0; ) { |
| t.tick(checked, "offset=%d, remaining=%d", offset, remaining); |
| final int n = (int)Math.min(remaining, buf.length); |
| in.readFully(buf, 0, n); |
| checkData(offset, remaining, n, buf, expected); |
| |
| offset += n; |
| remaining -= n; |
| checked += n; |
| } |
| in.close(); |
| t.end(checked); |
| } |
| |
| static void verifyPread(FileSystem fs, Path p, long offset, long length, |
| byte[] buf, byte[] expected) throws IOException { |
| long remaining = length - offset; |
| long checked = 0; |
| LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining); |
| |
| final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d", |
| offset, remaining); |
| final FSDataInputStream in = fs.open(p, 64 << 10); |
| for(; remaining > 0; ) { |
| t.tick(checked, "offset=%d, remaining=%d", offset, remaining); |
| final int n = (int)Math.min(remaining, buf.length); |
| in.readFully(offset, buf, 0, n); |
| checkData(offset, remaining, n, buf, expected); |
| |
| offset += n; |
| remaining -= n; |
| checked += n; |
| } |
| in.close(); |
| t.end(checked); |
| } |
| |
| /** Test client retry with namenode restarting. */ |
| @Test(timeout=300000) |
| public void testNamenodeRestart() throws Exception { |
| GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG, Level.ALL); |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| TestDFSClientRetries.namenodeRestartTest(conf, true); |
| } |
| |
| @Test(timeout=300000) |
| public void testLargeDirectory() throws Exception { |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| final int listLimit = 2; |
| // force small chunking of directory listing |
| conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, listLimit); |
| // force paths to be only owner-accessible to ensure ugi isn't changing |
| // during listStatus |
| FsPermission.setUMask(conf, new FsPermission((short)0077)); |
| |
| final MiniDFSCluster cluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); |
| try { |
| cluster.waitActive(); |
| WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME) |
| .setPermission(new Path("/"), |
| new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); |
| |
| // trick the NN into not believing it's not the superuser so we can |
| // tell if the correct user is used by listStatus |
| UserGroupInformation.setLoginUser( |
| UserGroupInformation.createUserForTesting( |
| "not-superuser", new String[]{"not-supergroup"})); |
| |
| UserGroupInformation.createUserForTesting("me", new String[]{"my-group"}) |
| .doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws IOException, URISyntaxException { |
| FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| Path d = new Path("/my-dir"); |
| Assert.assertTrue(fs.mkdirs(d)); |
| for (int i=0; i < listLimit*3; i++) { |
| Path p = new Path(d, "file-"+i); |
| Assert.assertTrue(fs.createNewFile(p)); |
| } |
| Assert.assertEquals(listLimit*3, fs.listStatus(d).length); |
| return null; |
| } |
| }); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout=300000) |
| public void testNumericalUserName() throws Exception { |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY, "^[A-Za-z0-9_][A-Za-z0-9" + |
| "._-]*[$]?$"); |
| final MiniDFSCluster cluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| try { |
| cluster.waitActive(); |
| WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME) |
| .setPermission(new Path("/"), |
| new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); |
| |
| UserGroupInformation.createUserForTesting("123", new String[]{"my-group"}) |
| .doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws IOException, URISyntaxException { |
| FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| Path d = new Path("/my-dir"); |
| Assert.assertTrue(fs.mkdirs(d)); |
| return null; |
| } |
| }); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test for catching "no datanode" IOException, when to create a file |
| * but datanode is not running for some reason. |
| */ |
| @Test(timeout=300000) |
| public void testCreateWithNoDN() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| cluster.waitActive(); |
| FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| fs.create(new Path("/testnodatanode")); |
| Assert.fail("No exception was thrown"); |
| } catch (IOException ex) { |
| GenericTestUtils.assertExceptionContains("Failed to find datanode", ex); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Test snapshot creation through WebHdfs |
| */ |
| @Test |
| public void testWebHdfsCreateSnapshot() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| cluster.waitActive(); |
| final DistributedFileSystem dfs = cluster.getFileSystem(); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| |
| final Path foo = new Path("/foo"); |
| dfs.mkdirs(foo); |
| |
| try { |
| webHdfs.createSnapshot(foo); |
| fail("Cannot create snapshot on a non-snapshottable directory"); |
| } catch (Exception e) { |
| GenericTestUtils.assertExceptionContains( |
| "Directory is not a snapshottable directory", e); |
| } |
| |
| // allow snapshots on /foo |
| dfs.allowSnapshot(foo); |
| // create snapshots on foo using WebHdfs |
| webHdfs.createSnapshot(foo, "s1"); |
| // create snapshot without specifying name |
| final Path spath = webHdfs.createSnapshot(foo, null); |
| |
| Assert.assertTrue(webHdfs.exists(spath)); |
| final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1"); |
| Assert.assertTrue(webHdfs.exists(s1path)); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Test snapshot deletion through WebHdfs |
| */ |
| @Test |
| public void testWebHdfsDeleteSnapshot() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| cluster.waitActive(); |
| final DistributedFileSystem dfs = cluster.getFileSystem(); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| |
| final Path foo = new Path("/foo"); |
| dfs.mkdirs(foo); |
| dfs.allowSnapshot(foo); |
| |
| webHdfs.createSnapshot(foo, "s1"); |
| final Path spath = webHdfs.createSnapshot(foo, null); |
| Assert.assertTrue(webHdfs.exists(spath)); |
| final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1"); |
| Assert.assertTrue(webHdfs.exists(s1path)); |
| |
| // delete the two snapshots |
| webHdfs.deleteSnapshot(foo, "s1"); |
| Assert.assertFalse(webHdfs.exists(s1path)); |
| webHdfs.deleteSnapshot(foo, spath.getName()); |
| Assert.assertFalse(webHdfs.exists(spath)); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWebHdfsCreateNonRecursive() throws IOException, URISyntaxException { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| WebHdfsFileSystem webHdfs = null; |
| |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).build(); |
| cluster.waitActive(); |
| |
| webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); |
| |
| TestFileCreation.testFileCreationNonRecursive(webHdfs); |
| |
| } finally { |
| if(webHdfs != null) { |
| webHdfs.close(); |
| } |
| |
| if(cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| /** |
| * Test snapshot rename through WebHdfs |
| */ |
| @Test |
| public void testWebHdfsRenameSnapshot() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| cluster.waitActive(); |
| final DistributedFileSystem dfs = cluster.getFileSystem(); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| |
| final Path foo = new Path("/foo"); |
| dfs.mkdirs(foo); |
| dfs.allowSnapshot(foo); |
| |
| webHdfs.createSnapshot(foo, "s1"); |
| final Path s1path = SnapshotTestHelper.getSnapshotRoot(foo, "s1"); |
| Assert.assertTrue(webHdfs.exists(s1path)); |
| |
| // rename s1 to s2 |
| webHdfs.renameSnapshot(foo, "s1", "s2"); |
| Assert.assertFalse(webHdfs.exists(s1path)); |
| final Path s2path = SnapshotTestHelper.getSnapshotRoot(foo, "s2"); |
| Assert.assertTrue(webHdfs.exists(s2path)); |
| |
| webHdfs.deleteSnapshot(foo, "s2"); |
| Assert.assertFalse(webHdfs.exists(s2path)); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Make sure a RetriableException is thrown when rpcServer is null in |
| * NamenodeWebHdfsMethods. |
| */ |
| @Test |
| public void testRaceWhileNNStartup() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| cluster.waitActive(); |
| final NameNode namenode = cluster.getNameNode(); |
| final NamenodeProtocols rpcServer = namenode.getRpcServer(); |
| Whitebox.setInternalState(namenode, "rpcServer", null); |
| |
| final Path foo = new Path("/foo"); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| try { |
| webHdfs.mkdirs(foo); |
| fail("Expected RetriableException"); |
| } catch (RetriableException e) { |
| GenericTestUtils.assertExceptionContains("Namenode is in startup mode", |
| e); |
| } |
| Whitebox.setInternalState(namenode, "rpcServer", rpcServer); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testDTInInsecureClusterWithFallback() |
| throws IOException, URISyntaxException { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| conf.setBoolean(CommonConfigurationKeys |
| .IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| Assert.assertNull(webHdfs.getDelegationToken(null)); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testDTInInsecureCluster() throws Exception { |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); |
| final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| webHdfs.getDelegationToken(null); |
| fail("No exception is thrown."); |
| } catch (AccessControlException ace) { |
| Assert.assertTrue(ace.getMessage().startsWith( |
| WebHdfsFileSystem.CANT_FALLBACK_TO_INSECURE_MSG)); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWebHdfsOffsetAndLength() throws Exception{ |
| MiniDFSCluster cluster = null; |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| final int OFFSET = 42; |
| final int LENGTH = 512; |
| final String PATH = "/foo"; |
| byte[] CONTENTS = new byte[1024]; |
| RANDOM.nextBytes(CONTENTS); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| final WebHdfsFileSystem fs = |
| WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); |
| try (OutputStream os = fs.create(new Path(PATH))) { |
| os.write(CONTENTS); |
| } |
| InetSocketAddress addr = cluster.getNameNode().getHttpAddress(); |
| URL url = new URL("http", addr.getHostString(), addr |
| .getPort(), WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" + |
| Param.toSortedString("&", new OffsetParam((long) OFFSET), |
| new LengthParam((long) LENGTH)) |
| ); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setInstanceFollowRedirects(true); |
| Assert.assertEquals(LENGTH, conn.getContentLength()); |
| byte[] subContents = new byte[LENGTH]; |
| byte[] realContents = new byte[LENGTH]; |
| System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH); |
| IOUtils.readFully(conn.getInputStream(), realContents); |
| Assert.assertArrayEquals(subContents, realContents); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWebHdfsPread() throws Exception { |
| final Configuration conf = WebHdfsTestUtil.createConf(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) |
| .build(); |
| byte[] content = new byte[1024]; |
| RANDOM.nextBytes(content); |
| final Path foo = new Path("/foo"); |
| FSDataInputStream in = null; |
| try { |
| final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, |
| WebHdfsConstants.WEBHDFS_SCHEME); |
| try (OutputStream os = fs.create(foo)) { |
| os.write(content); |
| } |
| |
| // pread |
| in = fs.open(foo, 1024); |
| byte[] buf = new byte[1024]; |
| try { |
| in.readFully(1020, buf, 0, 5); |
| Assert.fail("EOF expected"); |
| } catch (EOFException ignored) {} |
| |
| // mix pread with stateful read |
| int length = in.read(buf, 0, 512); |
| in.readFully(100, new byte[1024], 0, 100); |
| int preadLen = in.read(200, new byte[1024], 0, 200); |
| Assert.assertTrue(preadLen > 0); |
| IOUtils.readFully(in, buf, length, 1024 - length); |
| Assert.assertArrayEquals(content, buf); |
| } finally { |
| if (in != null) { |
| in.close(); |
| } |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testGetHomeDirectory() throws Exception { |
| |
| MiniDFSCluster cluster = null; |
| try { |
| Configuration conf = new Configuration(); |
| cluster = new MiniDFSCluster.Builder(conf).build(); |
| cluster.waitActive(); |
| DistributedFileSystem hdfs = cluster.getFileSystem(); |
| |
| final URI uri = new URI(WebHdfsConstants.WEBHDFS_SCHEME + "://" |
| + cluster.getHttpUri(0).replace("http://", "")); |
| final Configuration confTemp = new Configuration(); |
| |
| { |
| WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri, |
| confTemp); |
| |
| assertEquals(hdfs.getHomeDirectory().toUri().getPath(), webhdfs |
| .getHomeDirectory().toUri().getPath()); |
| |
| webhdfs.close(); |
| } |
| |
| { |
| WebHdfsFileSystem webhdfs = createWebHDFSAsTestUser(confTemp, uri, |
| "XXX"); |
| |
| assertNotEquals(hdfs.getHomeDirectory().toUri().getPath(), webhdfs |
| .getHomeDirectory().toUri().getPath()); |
| |
| webhdfs.close(); |
| } |
| |
| } finally { |
| if (cluster != null) |
| cluster.shutdown(); |
| } |
| } |
| |
| private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf, |
| final URI uri, final String userName) throws Exception { |
| |
| final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( |
| userName, new String[] { "supergroup" }); |
| |
| return ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() { |
| @Override |
| public WebHdfsFileSystem run() throws IOException { |
| WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(uri, |
| conf); |
| return webhdfs; |
| } |
| }); |
| } |
| } |