| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.router; |
| |
| import static org.apache.hadoop.test.LambdaTestUtils.intercept; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.QuotaUsage; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; |
| import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; |
| import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; |
| import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; |
| import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; |
| import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; |
| import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.LambdaTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Tests quota behaviors in Router-based Federation. |
| */ |
| public class TestRouterQuota { |
| private static StateStoreDFSCluster cluster; |
| private static NamenodeContext nnContext1; |
| private static NamenodeContext nnContext2; |
| private static RouterContext routerContext; |
| private static MountTableResolver resolver; |
| |
| private static final int BLOCK_SIZE = 512; |
| |
| @Before |
| public void setUp() throws Exception { |
| |
| // Build and start a federated cluster |
| cluster = new StateStoreDFSCluster(false, 2); |
| Configuration routerConf = new RouterConfigBuilder() |
| .stateStore() |
| .admin() |
| .quota() |
| .rpc() |
| .build(); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPDATE_INTERVAL, "2s"); |
| |
| // override some hdfs settings that used in testing space quota |
| Configuration hdfsConf = new Configuration(false); |
| hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1); |
| |
| cluster.addRouterOverrides(routerConf); |
| cluster.addNamenodeOverrides(hdfsConf); |
| cluster.startCluster(); |
| cluster.startRouters(); |
| cluster.waitClusterUp(); |
| |
| nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null); |
| nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null); |
| routerContext = cluster.getRandomRouter(); |
| Router router = routerContext.getRouter(); |
| resolver = (MountTableResolver) router.getSubclusterResolver(); |
| } |
| |
| @After |
| public void tearDown() { |
| if (cluster != null) { |
| cluster.stopRouter(routerContext); |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| @Test |
| public void testNamespaceQuotaExceed() throws Exception { |
| long nsQuota = 3; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| final FileSystem nnFs2 = nnContext2.getFileSystem(); |
| |
| // Add two mount tables: |
| // /nsquota --> ns0---testdir1 |
| // /nsquota/subdir --> ns1---testdir2 |
| nnFs1.mkdirs(new Path("/testdir1")); |
| nnFs2.mkdirs(new Path("/testdir2")); |
| MountTable mountTable1 = MountTable.newInstance("/nsquota", |
| Collections.singletonMap("ns0", "/testdir1")); |
| |
| mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); |
| addMountTable(mountTable1); |
| |
| MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir", |
| Collections.singletonMap("ns1", "/testdir2")); |
| mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); |
| addMountTable(mountTable2); |
| |
| final FileSystem routerFs = routerContext.getFileSystem(); |
| final List<Path> created = new ArrayList<>(); |
| GenericTestUtils.waitFor(() -> { |
| boolean isNsQuotaViolated = false; |
| try { |
| // create new directory to trigger NSQuotaExceededException |
| Path p = new Path("/nsquota/" + UUID.randomUUID()); |
| routerFs.mkdirs(p); |
| created.add(p); |
| p = new Path("/nsquota/subdir/" + UUID.randomUUID()); |
| routerFs.mkdirs(p); |
| created.add(p); |
| } catch (NSQuotaExceededException e) { |
| isNsQuotaViolated = true; |
| } catch (IOException ignored) { |
| } |
| return isNsQuotaViolated; |
| }, 5000, 60000); |
| |
| // mkdir in real FileSystem should be okay |
| nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID())); |
| nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID())); |
| |
| // rename/delete call should be still okay |
| assertFalse(created.isEmpty()); |
| for(Path src: created) { |
| final Path dst = new Path(src.toString()+"-renamed"); |
| routerFs.rename(src, dst); |
| routerFs.delete(dst, true); |
| } |
| } |
| |
| @Test |
| public void testStorageSpaceQuotaExceed() throws Exception { |
| long ssQuota = 3071; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| final FileSystem nnFs2 = nnContext2.getFileSystem(); |
| |
| // Add two mount tables: |
| // /ssquota --> ns0---testdir3 |
| // /ssquota/subdir --> ns1---testdir4 |
| nnFs1.mkdirs(new Path("/testdir3")); |
| nnFs2.mkdirs(new Path("/testdir4")); |
| MountTable mountTable1 = MountTable.newInstance("/ssquota", |
| Collections.singletonMap("ns0", "/testdir3")); |
| |
| mountTable1 |
| .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); |
| addMountTable(mountTable1); |
| |
| MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir", |
| Collections.singletonMap("ns1", "/testdir4")); |
| mountTable2 |
| .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); |
| addMountTable(mountTable2); |
| |
| DFSClient routerClient = routerContext.getClient(); |
| routerClient.create("/ssquota/file", true).close(); |
| routerClient.create("/ssquota/subdir/file", true).close(); |
| |
| GenericTestUtils.waitFor(() -> { |
| boolean isDsQuotaViolated = false; |
| try { |
| // append data to trigger NSQuotaExceededException |
| appendData("/ssquota/file", routerClient, BLOCK_SIZE); |
| appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE); |
| } catch (DSQuotaExceededException e) { |
| isDsQuotaViolated = true; |
| } catch (IOException ignored) { |
| } |
| return isDsQuotaViolated; |
| }, 5000, 60000); |
| |
| // append data to destination path in real FileSystem should be okay |
| appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE); |
| appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE); |
| } |
| |
| @Test |
| public void testStorageTypeQuotaExceed() throws Exception { |
| long ssQuota = BLOCK_SIZE * 3; |
| DFSClient routerClient = routerContext.getClient(); |
| prepareStorageTypeQuotaTestMountTable(StorageType.DISK, BLOCK_SIZE, |
| ssQuota * 2, ssQuota, BLOCK_SIZE + 1, BLOCK_SIZE + 1); |
| |
| // Verify quota exceed on Router. |
| LambdaTestUtils.intercept(DSQuotaExceededException.class, |
| "The DiskSpace quota is exceeded", "Expect quota exceed exception.", |
| () -> appendData("/type0/file", routerClient, BLOCK_SIZE)); |
| LambdaTestUtils.intercept(DSQuotaExceededException.class, |
| "The DiskSpace quota is exceeded", "Expect quota exceed exception.", |
| () -> appendData("/type0/type1/file", routerClient, BLOCK_SIZE)); |
| |
| // Verify quota exceed on NN1. |
| LambdaTestUtils.intercept(QuotaByStorageTypeExceededException.class, |
| "Quota by storage type", "Expect quota exceed exception.", |
| () -> appendData("/type0/file", nnContext1.getClient(), BLOCK_SIZE)); |
| LambdaTestUtils.intercept(QuotaByStorageTypeExceededException.class, |
| "Quota by storage type", "Expect quota exceed exception.", |
| () -> appendData("/type1/file", nnContext1.getClient(), BLOCK_SIZE)); |
| } |
| |
| /** |
| * Add a mount table entry to the mount table through the admin API. |
| * @param entry Mount table entry to add. |
| * @return If it was successfully added. |
| * @throws IOException Problems adding entries. |
| */ |
| private boolean addMountTable(final MountTable entry) throws IOException { |
| RouterClient client = routerContext.getAdminClient(); |
| MountTableManager mountTableManager = client.getMountTableManager(); |
| AddMountTableEntryRequest addRequest = |
| AddMountTableEntryRequest.newInstance(entry); |
| AddMountTableEntryResponse addResponse = |
| mountTableManager.addMountTableEntry(addRequest); |
| |
| // Reload the Router cache |
| resolver.loadCache(true); |
| |
| return addResponse.getStatus(); |
| } |
| |
| /** |
| * Update a mount table entry to the mount table through the admin API. |
| * @param entry Mount table entry to update. |
| * @return If it was successfully updated |
| * @throws IOException Problems update entries |
| */ |
| private boolean updateMountTable(final MountTable entry) throws IOException { |
| RouterClient client = routerContext.getAdminClient(); |
| MountTableManager mountTableManager = client.getMountTableManager(); |
| UpdateMountTableEntryRequest updateRequest = |
| UpdateMountTableEntryRequest.newInstance(entry); |
| UpdateMountTableEntryResponse updateResponse = |
| mountTableManager.updateMountTableEntry(updateRequest); |
| |
| // Reload the Router cache |
| resolver.loadCache(true); |
| return updateResponse.getStatus(); |
| } |
| |
| /** |
| * Append data in specified file. |
| * @param path Path of file. |
| * @param client DFS Client. |
| * @param dataLen The length of write data. |
| * @throws IOException |
| */ |
| private void appendData(String path, DFSClient client, int dataLen) |
| throws IOException { |
| EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND); |
| HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null, |
| null); |
| byte[] data = new byte[dataLen]; |
| stream.write(data); |
| stream.close(); |
| } |
| |
| @Test |
| public void testSetQuotaToMountTableEntry() throws Exception { |
| long nsQuota = 10; |
| long ssQuota = 10240; |
| long diskQuota = 1024; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| nnFs1.mkdirs(new Path("/testSetQuotaToFederationPath")); |
| nnFs1.mkdirs(new Path("/testSetQuotaToFederationPath/dir0")); |
| |
| // Add one mount table: |
| // /setquota --> ns0---testSetQuotaToFederationPath |
| MountTable mountTable = MountTable.newInstance("/setquota", |
| Collections.singletonMap("ns0", "/testSetQuotaToFederationPath")); |
| addMountTable(mountTable); |
| |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| // ensure mount table is updated to Router. |
| updateService.periodicInvoke(); |
| |
| final FileSystem routerFs = routerContext.getFileSystem(); |
| // setting quota on a mount point should fail. |
| LambdaTestUtils.intercept(AccessControlException.class, |
| "is not allowed to change quota of", |
| "Expect an AccessControlException.", |
| () -> routerFs.setQuota(new Path("/setquota"), nsQuota, ssQuota)); |
| // setting storage quota on a mount point should fail. |
| LambdaTestUtils.intercept(AccessControlException.class, |
| "is not allowed to change quota of", |
| "Expect an AccessControlException.", () -> routerFs |
| .setQuotaByStorageType(new Path("/setquota"), StorageType.DISK, |
| diskQuota)); |
| QuotaUsage quota = |
| nnFs1.getQuotaUsage(new Path("/testSetQuotaToFederationPath")); |
| // quota should still be unset. |
| assertEquals(-1, quota.getQuota()); |
| assertEquals(-1, quota.getSpaceQuota()); |
| |
| // setting quota on a non-mount point should succeed. |
| routerFs.setQuota(new Path("/setquota/dir0"), nsQuota, ssQuota); |
| // setting storage quota on a non-mount point should succeed. |
| routerFs.setQuotaByStorageType(new Path("/setquota/dir0"), StorageType.DISK, |
| diskQuota); |
| quota = nnFs1.getQuotaUsage(new Path("/testSetQuotaToFederationPath/dir0")); |
| // quota should be set. |
| assertEquals(nsQuota, quota.getQuota()); |
| assertEquals(ssQuota, quota.getSpaceQuota()); |
| assertEquals(diskQuota, quota.getTypeQuota(StorageType.DISK)); |
| } |
| |
| @Test |
| public void testSetQuota() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 100; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| final FileSystem nnFs2 = nnContext2.getFileSystem(); |
| |
| // Add two mount tables: |
| // /setquota --> ns0---testdir5 |
| // /setquota/subdir --> ns1---testdir6 |
| nnFs1.mkdirs(new Path("/testdir5")); |
| nnFs2.mkdirs(new Path("/testdir6")); |
| MountTable mountTable1 = MountTable.newInstance("/setquota", |
| Collections.singletonMap("ns0", "/testdir5")); |
| mountTable1 |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable1); |
| |
| // don't set quota for subpath of mount table |
| MountTable mountTable2 = MountTable.newInstance("/setquota/subdir", |
| Collections.singletonMap("ns1", "/testdir6")); |
| addMountTable(mountTable2); |
| |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| // ensure setQuota RPC call was invoked |
| updateService.periodicInvoke(); |
| |
| ClientProtocol client1 = nnContext1.getClient().getNamenode(); |
| ClientProtocol client2 = nnContext2.getClient().getNamenode(); |
| final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5"); |
| final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6"); |
| |
| assertEquals(nsQuota, quota1.getQuota()); |
| assertEquals(ssQuota, quota1.getSpaceQuota()); |
| assertEquals(nsQuota, quota2.getQuota()); |
| assertEquals(ssQuota, quota2.getSpaceQuota()); |
| } |
| |
| @Test |
| public void testStorageTypeQuota() throws Exception { |
| long ssQuota = BLOCK_SIZE * 3; |
| int fileSize = BLOCK_SIZE; |
| prepareStorageTypeQuotaTestMountTable(StorageType.DISK, BLOCK_SIZE, |
| ssQuota * 2, ssQuota, fileSize, fileSize); |
| |
| // Verify /type0 quota on NN1. |
| ClientProtocol client = nnContext1.getClient().getNamenode(); |
| QuotaUsage usage = client.getQuotaUsage("/type0"); |
| assertEquals(HdfsConstants.QUOTA_RESET, usage.getQuota()); |
| assertEquals(HdfsConstants.QUOTA_RESET, usage.getSpaceQuota()); |
| verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota * 2, -1, -1, -1}, |
| null, usage); |
| // Verify /type1 quota on NN1. |
| usage = client.getQuotaUsage("/type1"); |
| assertEquals(HdfsConstants.QUOTA_RESET, usage.getQuota()); |
| assertEquals(HdfsConstants.QUOTA_RESET, usage.getSpaceQuota()); |
| verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota, -1, -1, -1}, null, |
| usage); |
| |
| FileSystem routerFs = routerContext.getFileSystem(); |
| QuotaUsage u0 = routerFs.getQuotaUsage(new Path("/type0")); |
| QuotaUsage u1 = routerFs.getQuotaUsage(new Path("/type0/type1")); |
| // Verify /type0/type1 storage type quota usage on Router. |
| assertEquals(HdfsConstants.QUOTA_RESET, u1.getQuota()); |
| assertEquals(2, u1.getFileAndDirectoryCount()); |
| assertEquals(HdfsConstants.QUOTA_RESET, u1.getSpaceQuota()); |
| assertEquals(fileSize * 3, u1.getSpaceConsumed()); |
| verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota, -1, -1, -1}, |
| new long[] {0, 0, fileSize * 3, 0, 0, 0}, u1); |
| // Verify /type0 storage type quota usage on Router. |
| assertEquals(HdfsConstants.QUOTA_RESET, u0.getQuota()); |
| assertEquals(4, u0.getFileAndDirectoryCount()); |
| assertEquals(HdfsConstants.QUOTA_RESET, u0.getSpaceQuota()); |
| assertEquals(fileSize * 3 * 2, u0.getSpaceConsumed()); |
| verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota * 2, -1, -1, -1}, |
| new long[] {0, 0, fileSize * 3 * 2, 0, 0, 0}, u0); |
| } |
| |
| @Test |
| public void testGetQuota() throws Exception { |
| long nsQuota = 10; |
| long ssQuota = 100; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| final FileSystem nnFs2 = nnContext2.getFileSystem(); |
| |
| // Add two mount tables: |
| // /getquota --> ns0---/testdir7 |
| // /getquota/subdir1 --> ns0---/testdir7/subdir |
| // /getquota/subdir2 --> ns1---/testdir8 |
| // /getquota/subdir3 --> ns1---/testdir8-ext |
| nnFs1.mkdirs(new Path("/testdir7")); |
| nnFs1.mkdirs(new Path("/testdir7/subdir")); |
| nnFs2.mkdirs(new Path("/testdir8")); |
| nnFs2.mkdirs(new Path("/testdir8-ext")); |
| MountTable mountTable1 = MountTable.newInstance("/getquota", |
| Collections.singletonMap("ns0", "/testdir7")); |
| mountTable1 |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable1); |
| |
| MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1", |
| Collections.singletonMap("ns0", "/testdir7/subdir")); |
| addMountTable(mountTable2); |
| |
| MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2", |
| Collections.singletonMap("ns1", "/testdir8")); |
| addMountTable(mountTable3); |
| |
| MountTable mountTable4 = MountTable.newInstance("/getquota/subdir3", |
| Collections.singletonMap("ns1", "/testdir8-ext")); |
| addMountTable(mountTable4); |
| |
| // use router client to create new files |
| DFSClient routerClient = routerContext.getClient(); |
| routerClient.create("/getquota/file", true).close(); |
| routerClient.create("/getquota/subdir1/file", true).close(); |
| routerClient.create("/getquota/subdir2/file", true).close(); |
| routerClient.create("/getquota/subdir3/file", true).close(); |
| |
| ClientProtocol clientProtocol = routerContext.getClient().getNamenode(); |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota"); |
| // the quota should be aggregated |
| assertEquals(8, quota.getFileAndDirectoryCount()); |
| } |
| |
| @Test |
| public void testStaleQuotaRemoving() throws Exception { |
| long nsQuota = 20; |
| long ssQuota = 200; |
| String stalePath = "/stalequota"; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| |
| // Add one mount tables: |
| // /stalequota --> ns0---/testdir9 |
| nnFs1.mkdirs(new Path("/testdir9")); |
| MountTable mountTable = MountTable.newInstance(stalePath, |
| Collections.singletonMap("ns0", "/testdir9")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| // Call periodicInvoke to ensure quota for stalePath was |
| // loaded into quota manager. |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| |
| // use quota manager to get its quota usage and do verification |
| RouterQuotaManager quotaManager = routerContext.getRouter() |
| .getQuotaManager(); |
| RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath); |
| assertEquals(nsQuota, quota.getQuota()); |
| assertEquals(ssQuota, quota.getSpaceQuota()); |
| |
| // remove stale path entry |
| removeMountTable(stalePath); |
| updateService.periodicInvoke(); |
| // the stale entry should be removed and we will get null |
| quota = quotaManager.getQuotaUsage(stalePath); |
| assertNull(quota); |
| } |
| |
| /** |
| * Remove a mount table entry to the mount table through the admin API. |
| * @param path Mount table entry to remove. |
| * @return If it was successfully removed. |
| * @throws IOException Problems removing entries. |
| */ |
| private boolean removeMountTable(String path) throws IOException { |
| RouterClient client = routerContext.getAdminClient(); |
| MountTableManager mountTableManager = client.getMountTableManager(); |
| RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest |
| .newInstance(path); |
| RemoveMountTableEntryResponse removeResponse = mountTableManager |
| .removeMountTableEntry(removeRequest); |
| |
| // Reload the Router cache |
| resolver.loadCache(true); |
| return removeResponse.getStatus(); |
| } |
| |
| /** |
| * Test {@link RouterQuotaUpdateService#periodicInvoke()} updates quota usage |
| * in RouterQuotaManager. |
| */ |
| @Test |
| public void testQuotaUpdating() throws Exception { |
| long nsQuota = 30; |
| long ssQuota = 1024; |
| String path = "/updatequota"; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| |
| // Add one mount table: |
| // /updatequota --> ns0---/testdir10 |
| nnFs1.mkdirs(new Path("/testdir10")); |
| MountTable mountTable = MountTable.newInstance(path, |
| Collections.singletonMap("ns0", "/testdir10")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| // Call periodicInvoke to ensure quota updated in quota manager. |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| |
| // verify initial quota value |
| MountTable updatedMountTable = getMountTable(path); |
| RouterQuotaUsage quota = updatedMountTable.getQuota(); |
| assertEquals(nsQuota, quota.getQuota()); |
| assertEquals(ssQuota, quota.getSpaceQuota()); |
| assertEquals(1, quota.getFileAndDirectoryCount()); |
| assertEquals(0, quota.getSpaceConsumed()); |
| |
| // mkdir and write a new file |
| final FileSystem routerFs = routerContext.getFileSystem(); |
| routerFs.mkdirs(new Path(path + "/" + UUID.randomUUID())); |
| DFSClient routerClient = routerContext.getClient(); |
| routerClient.create(path + "/file", true).close(); |
| appendData(path + "/file", routerClient, BLOCK_SIZE); |
| |
| updateService.periodicInvoke(); |
| updatedMountTable = getMountTable(path); |
| quota = updatedMountTable.getQuota(); |
| |
| // verify if quota usage has been updated in quota manager. |
| assertEquals(nsQuota, quota.getQuota()); |
| assertEquals(ssQuota, quota.getSpaceQuota()); |
| assertEquals(3, quota.getFileAndDirectoryCount()); |
| assertEquals(BLOCK_SIZE, quota.getSpaceConsumed()); |
| |
| // verify quota sync on adding new destination to mount entry. |
| updatedMountTable = getMountTable(path); |
| nnFs1.mkdirs(new Path("/newPath")); |
| updatedMountTable.setDestinations( |
| Collections.singletonList(new RemoteLocation("ns0", "/newPath", path))); |
| updateMountTable(updatedMountTable); |
| assertEquals(nsQuota, nnFs1.getQuotaUsage(new Path("/newPath")).getQuota()); |
| } |
| |
| /** |
| * Get the mount table entries of specified path through the admin API. |
| * @param path Mount table entry to get. |
| * @return If it was successfully got. |
| * @throws IOException Problems getting entries. |
| */ |
| private MountTable getMountTable(String path) throws IOException { |
| // Reload the Router cache |
| resolver.loadCache(true); |
| RouterClient client = routerContext.getAdminClient(); |
| MountTableManager mountTableManager = client.getMountTableManager(); |
| GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest |
| .newInstance(path); |
| GetMountTableEntriesResponse response = mountTableManager |
| .getMountTableEntries(getRequest); |
| List<MountTable> results = response.getEntries(); |
| |
| return !results.isEmpty() ? results.get(0) : null; |
| } |
| |
| @Test |
| public void testQuotaSynchronization() throws IOException { |
| long updateNsQuota = 3; |
| long updateSsQuota = 4; |
| FileSystem nnFs = nnContext1.getFileSystem(); |
| nnFs.mkdirs(new Path("/testsync")); |
| MountTable mountTable = MountTable.newInstance("/quotaSync", |
| Collections.singletonMap("ns0", "/testsync"), Time.now(), Time.now()); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(1) |
| .spaceQuota(2).build()); |
| // Add new mount table |
| addMountTable(mountTable); |
| |
| // ensure the quota is not set as updated value |
| QuotaUsage realQuota = nnContext1.getFileSystem() |
| .getQuotaUsage(new Path("/testsync")); |
| assertNotEquals(updateNsQuota, realQuota.getQuota()); |
| assertNotEquals(updateSsQuota, realQuota.getSpaceQuota()); |
| |
| // Call periodicInvoke to ensure quota updated in quota manager |
| // and state store. |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(updateNsQuota) |
| .spaceQuota(updateSsQuota).build()); |
| updateMountTable(mountTable); |
| |
| // verify if the quota is updated in real path |
| realQuota = nnContext1.getFileSystem().getQuotaUsage( |
| new Path("/testsync")); |
| assertEquals(updateNsQuota, realQuota.getQuota()); |
| assertEquals(updateSsQuota, realQuota.getSpaceQuota()); |
| |
| // Clear the quota |
| mountTable.setQuota(new RouterQuotaUsage.Builder() |
| .quota(HdfsConstants.QUOTA_RESET) |
| .spaceQuota(HdfsConstants.QUOTA_RESET).build()); |
| updateMountTable(mountTable); |
| |
| // verify if the quota is updated in real path |
| realQuota = nnContext1.getFileSystem().getQuotaUsage( |
| new Path("/testsync")); |
| assertEquals(HdfsConstants.QUOTA_RESET, realQuota.getQuota()); |
| assertEquals(HdfsConstants.QUOTA_RESET, realQuota.getSpaceQuota()); |
| |
| // Verify updating mount entry with actual destinations not present. |
| mountTable = MountTable.newInstance("/testupdate", |
| Collections.singletonMap("ns0", "/testupdate"), Time.now(), Time.now()); |
| addMountTable(mountTable); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(1) |
| .spaceQuota(2).build()); |
| assertTrue(updateMountTable(mountTable)); |
| } |
| |
| @Test |
| public void testQuotaRefreshAfterQuotaExceed() throws Exception { |
| long nsQuota = 3; |
| long ssQuota = 100; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| final FileSystem nnFs2 = nnContext2.getFileSystem(); |
| |
| // Add two mount tables: |
| // /setquota1 --> ns0---testdir11 |
| // /setquota2 --> ns1---testdir12 |
| nnFs1.mkdirs(new Path("/testdir11")); |
| nnFs2.mkdirs(new Path("/testdir12")); |
| MountTable mountTable1 = MountTable.newInstance("/setquota1", |
| Collections.singletonMap("ns0", "/testdir11")); |
| mountTable1 |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable1); |
| |
| MountTable mountTable2 = MountTable.newInstance("/setquota2", |
| Collections.singletonMap("ns1", "/testdir12")); |
| mountTable2 |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable2); |
| |
| final FileSystem routerFs = routerContext.getFileSystem(); |
| // Create directory to make directory count equals to nsQuota |
| routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); |
| routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); |
| |
| // create one more directory to exceed the nsQuota |
| routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); |
| |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| // Call RouterQuotaUpdateService#periodicInvoke to update quota cache |
| updateService.periodicInvoke(); |
| // Reload the Router cache |
| resolver.loadCache(true); |
| |
| RouterQuotaManager quotaManager = |
| routerContext.getRouter().getQuotaManager(); |
| ClientProtocol client1 = nnContext1.getClient().getNamenode(); |
| ClientProtocol client2 = nnContext2.getClient().getNamenode(); |
| QuotaUsage quota1 = client1.getQuotaUsage("/testdir11"); |
| QuotaUsage quota2 = client2.getQuotaUsage("/testdir12"); |
| QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setquota1"); |
| QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setquota2"); |
| |
| // Verify quota usage |
| assertEquals(4, quota1.getFileAndDirectoryCount()); |
| assertEquals(4, cacheQuota1.getFileAndDirectoryCount()); |
| assertEquals(1, quota2.getFileAndDirectoryCount()); |
| assertEquals(1, cacheQuota2.getFileAndDirectoryCount()); |
| |
| try { |
| // create new directory to trigger NSQuotaExceededException |
| routerFs.mkdirs(new Path("/testdir11/" + UUID.randomUUID())); |
| fail("Mkdir should be failed under dir /testdir11."); |
| } catch (NSQuotaExceededException ignored) { |
| } |
| |
| // Create directory under the other mount point |
| routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID())); |
| routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID())); |
| |
| // Call RouterQuotaUpdateService#periodicInvoke to update quota cache |
| updateService.periodicInvoke(); |
| |
| quota1 = client1.getQuotaUsage("/testdir11"); |
| cacheQuota1 = quotaManager.getQuotaUsage("/setquota1"); |
| quota2 = client2.getQuotaUsage("/testdir12"); |
| cacheQuota2 = quotaManager.getQuotaUsage("/setquota2"); |
| |
| // Verify whether quota usage cache is update by periodicInvoke(). |
| assertEquals(4, quota1.getFileAndDirectoryCount()); |
| assertEquals(4, cacheQuota1.getFileAndDirectoryCount()); |
| assertEquals(3, quota2.getFileAndDirectoryCount()); |
| assertEquals(3, cacheQuota2.getFileAndDirectoryCount()); |
| } |
| |
| /** |
| * Verify whether quota usage cache in RouterQuotaManager is updated properly. |
| * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update |
| * the cache even if the destination directory for some mount entry is not |
| * present in the filesystem. |
| */ |
| @Test |
| public void testQuotaRefreshWhenDestinationNotPresent() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| |
| // Add three mount tables: |
| // /setdir1 --> ns0---testdir13 |
| // /setdir2 --> ns0---testdir14 |
| // Create destination directory |
| nnFs.mkdirs(new Path("/testdir13")); |
| nnFs.mkdirs(new Path("/testdir14")); |
| |
| MountTable mountTable = MountTable.newInstance("/setdir1", |
| Collections.singletonMap("ns0", "/testdir13")); |
| mountTable |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| mountTable = MountTable.newInstance("/setdir2", |
| Collections.singletonMap("ns0", "/testdir14")); |
| mountTable |
| .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| final DFSClient routerClient = routerContext.getClient(); |
| // Create file |
| routerClient.create("/setdir1/file1", true).close(); |
| routerClient.create("/setdir2/file2", true).close(); |
| // append data to the file |
| appendData("/setdir1/file1", routerClient, BLOCK_SIZE); |
| appendData("/setdir2/file2", routerClient, BLOCK_SIZE); |
| |
| RouterQuotaUpdateService updateService = |
| routerContext.getRouter().getQuotaCacheUpdateService(); |
| // Update quota cache |
| updateService.periodicInvoke(); |
| // Reload the Router cache |
| resolver.loadCache(true); |
| |
| ClientProtocol client1 = nnContext1.getClient().getNamenode(); |
| RouterQuotaManager quotaManager = |
| routerContext.getRouter().getQuotaManager(); |
| QuotaUsage quota1 = client1.getQuotaUsage("/testdir13"); |
| QuotaUsage quota2 = client1.getQuotaUsage("/testdir14"); |
| QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setdir1"); |
| QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setdir2"); |
| |
| // Get quota details in mount table |
| MountTable updatedMountTable = getMountTable("/setdir1"); |
| RouterQuotaUsage mountQuota1 = updatedMountTable.getQuota(); |
| updatedMountTable = getMountTable("/setdir2"); |
| RouterQuotaUsage mountQuota2 = updatedMountTable.getQuota(); |
| |
| // Verify quota usage |
| assertEquals(2, quota1.getFileAndDirectoryCount()); |
| assertEquals(2, cacheQuota1.getFileAndDirectoryCount()); |
| assertEquals(2, mountQuota1.getFileAndDirectoryCount()); |
| assertEquals(2, quota2.getFileAndDirectoryCount()); |
| assertEquals(2, cacheQuota2.getFileAndDirectoryCount()); |
| assertEquals(2, mountQuota2.getFileAndDirectoryCount()); |
| assertEquals(BLOCK_SIZE, quota1.getSpaceConsumed()); |
| assertEquals(BLOCK_SIZE, cacheQuota1.getSpaceConsumed()); |
| assertEquals(BLOCK_SIZE, mountQuota1.getSpaceConsumed()); |
| assertEquals(BLOCK_SIZE, quota2.getSpaceConsumed()); |
| assertEquals(BLOCK_SIZE, cacheQuota2.getSpaceConsumed()); |
| assertEquals(BLOCK_SIZE, mountQuota2.getSpaceConsumed()); |
| |
| FileSystem routerFs = routerContext.getFileSystem(); |
| // Remove file in setdir1. The target directory still exists. |
| routerFs.delete(new Path("/setdir1/file1"), true); |
| |
| // Create file |
| routerClient.create("/setdir2/file3", true).close(); |
| // append data to the file |
| appendData("/setdir2/file3", routerClient, BLOCK_SIZE); |
| int updatedSpace = BLOCK_SIZE + BLOCK_SIZE; |
| |
| // Update quota cache |
| updateService.periodicInvoke(); |
| |
| quota2 = client1.getQuotaUsage("/testdir14"); |
| cacheQuota1 = quotaManager.getQuotaUsage("/setdir1"); |
| cacheQuota2 = quotaManager.getQuotaUsage("/setdir2"); |
| |
| // Get quota details in mount table |
| updatedMountTable = getMountTable("/setdir1"); |
| mountQuota1 = updatedMountTable.getQuota(); |
| updatedMountTable = getMountTable("/setdir2"); |
| mountQuota2 = updatedMountTable.getQuota(); |
| |
| // The quota usage should be reset. |
| assertEquals(1, cacheQuota1.getFileAndDirectoryCount()); |
| assertEquals(1, mountQuota1.getFileAndDirectoryCount()); |
| assertEquals(0, cacheQuota1.getSpaceConsumed()); |
| assertEquals(0, mountQuota1.getSpaceConsumed()); |
| |
| // Verify current quota usage for other mount entries |
| assertEquals(3, quota2.getFileAndDirectoryCount()); |
| assertEquals(3, cacheQuota2.getFileAndDirectoryCount()); |
| assertEquals(3, mountQuota2.getFileAndDirectoryCount()); |
| assertEquals(updatedSpace, quota2.getSpaceConsumed()); |
| assertEquals(updatedSpace, cacheQuota2.getSpaceConsumed()); |
| assertEquals(updatedSpace, mountQuota2.getSpaceConsumed()); |
| } |
| |
| @Test |
| public void testClearQuotaDefAfterRemovingMountTable() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| |
| // Add one mount tables: |
| // /setdir --> ns0---testdir15 |
| // Create destination directory |
| nnFs.mkdirs(new Path("/testdir15")); |
| |
| MountTable mountTable = MountTable.newInstance("/setdir", |
| Collections.singletonMap("ns0", "/testdir15")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| // Update router quota |
| RouterQuotaUpdateService updateService = |
| routerContext.getRouter().getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| |
| RouterQuotaManager quotaManager = |
| routerContext.getRouter().getQuotaManager(); |
| ClientProtocol client = nnContext1.getClient().getNamenode(); |
| QuotaUsage routerQuota = quotaManager.getQuotaUsage("/setdir"); |
| QuotaUsage subClusterQuota = client.getQuotaUsage("/testdir15"); |
| |
| // Verify current quota definitions |
| assertEquals(nsQuota, routerQuota.getQuota()); |
| assertEquals(ssQuota, routerQuota.getSpaceQuota()); |
| assertEquals(nsQuota, subClusterQuota.getQuota()); |
| assertEquals(ssQuota, subClusterQuota.getSpaceQuota()); |
| |
| // Remove mount table |
| removeMountTable("/setdir"); |
| updateService.periodicInvoke(); |
| routerQuota = quotaManager.getQuotaUsage("/setdir"); |
| subClusterQuota = client.getQuotaUsage("/testdir15"); |
| |
| // Verify quota definitions are cleared after removing the mount table |
| assertNull(routerQuota); |
| assertEquals(HdfsConstants.QUOTA_RESET, subClusterQuota.getQuota()); |
| assertEquals(HdfsConstants.QUOTA_RESET, subClusterQuota.getSpaceQuota()); |
| |
| // Verify removing mount entry with actual destinations not present. |
| mountTable = MountTable.newInstance("/mount", |
| Collections.singletonMap("ns0", "/testdir16")); |
| addMountTable(mountTable); |
| assertTrue(removeMountTable("/mount")); |
| } |
| |
| @Test |
| public void testSetQuotaNotMountTable() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 100; |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| |
| // setQuota should run for any directory |
| MountTable mountTable1 = MountTable.newInstance("/setquotanmt", |
| Collections.singletonMap("ns0", "/testdir16")); |
| |
| addMountTable(mountTable1); |
| |
| // Add a directory not present in mount table. |
| nnFs1.mkdirs(new Path("/testdir16/testdir17")); |
| |
| routerContext.getRouter().getRpcServer().setQuota("/setquotanmt/testdir17", |
| nsQuota, ssQuota, null); |
| |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| // ensure setQuota RPC call was invoked |
| updateService.periodicInvoke(); |
| |
| ClientProtocol client1 = nnContext1.getClient().getNamenode(); |
| final QuotaUsage quota1 = client1.getQuotaUsage("/testdir16/testdir17"); |
| |
| assertEquals(nsQuota, quota1.getQuota()); |
| assertEquals(ssQuota, quota1.getSpaceQuota()); |
| } |
| |
| @Test |
| public void testNoQuotaaExceptionForUnrelatedOperations() throws Exception { |
| FileSystem nnFs = nnContext1.getFileSystem(); |
| DistributedFileSystem routerFs = |
| (DistributedFileSystem) routerContext.getFileSystem(); |
| Path path = new Path("/quota"); |
| nnFs.mkdirs(new Path("/dir")); |
| MountTable mountTable1 = MountTable.newInstance("/quota", |
| Collections.singletonMap("ns0", "/dir")); |
| mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(0).build()); |
| addMountTable(mountTable1); |
| routerFs.mkdirs(new Path("/quota/1")); |
| routerContext.getRouter().getQuotaCacheUpdateService().periodicInvoke(); |
| |
| // Quota check for related operation. |
| intercept(NSQuotaExceededException.class, |
| "The NameSpace quota (directories and files) is exceeded", |
| () -> routerFs.mkdirs(new Path("/quota/2"))); |
| |
| //Quotas shouldn't be checked for unrelated operations. |
| routerFs.setStoragePolicy(path, "COLD"); |
| routerFs.setErasureCodingPolicy(path, "RS-6-3-1024k"); |
| routerFs.unsetErasureCodingPolicy(path); |
| routerFs.setPermission(path, new FsPermission((short) 01777)); |
| routerFs.setOwner(path, "user", "group"); |
| routerFs.setTimes(path, 1L, 1L); |
| routerFs.listStatus(path); |
| routerFs.getContentSummary(path); |
| } |
| |
| @Test |
| public void testGetGlobalQuota() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| prepareGlobalQuotaTestMountTable(nsQuota, ssQuota); |
| |
| Quota qModule = routerContext.getRouter().getRpcServer().getQuotaModule(); |
| QuotaUsage qu = qModule.getGlobalQuota("/dir-1"); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota, qu.getSpaceQuota()); |
| qu = qModule.getGlobalQuota("/dir-1/dir-2"); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota * 2, qu.getSpaceQuota()); |
| qu = qModule.getGlobalQuota("/dir-1/dir-2/dir-3"); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota * 2, qu.getSpaceQuota()); |
| qu = qModule.getGlobalQuota("/dir-4"); |
| assertEquals(-1, qu.getQuota()); |
| assertEquals(-1, qu.getSpaceQuota()); |
| } |
| |
| @Test |
| public void testFixGlobalQuota() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| prepareGlobalQuotaTestMountTable(nsQuota, ssQuota); |
| |
| QuotaUsage qu = nnFs.getQuotaUsage(new Path("/dir-1")); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota, qu.getSpaceQuota()); |
| qu = nnFs.getQuotaUsage(new Path("/dir-2")); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota * 2, qu.getSpaceQuota()); |
| qu = nnFs.getQuotaUsage(new Path("/dir-3")); |
| assertEquals(nsQuota, qu.getQuota()); |
| assertEquals(ssQuota * 2, qu.getSpaceQuota()); |
| qu = nnFs.getQuotaUsage(new Path("/dir-4")); |
| assertEquals(-1, qu.getQuota()); |
| assertEquals(-1, qu.getSpaceQuota()); |
| } |
| |
| @Test |
| public void testGetQuotaUsageOnMountPoint() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| prepareGlobalQuotaTestMountTable(nsQuota, ssQuota); |
| |
| final FileSystem routerFs = routerContext.getFileSystem(); |
| // Verify getQuotaUsage() of the mount point /dir-1. |
| QuotaUsage quotaUsage = routerFs.getQuotaUsage(new Path("/dir-1")); |
| assertEquals(nsQuota, quotaUsage.getQuota()); |
| assertEquals(ssQuota, quotaUsage.getSpaceQuota()); |
| // Verify getQuotaUsage() of the mount point /dir-1/dir-2. |
| quotaUsage = routerFs.getQuotaUsage(new Path("/dir-1/dir-2")); |
| assertEquals(nsQuota, quotaUsage.getQuota()); |
| assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota()); |
| // Verify getQuotaUsage() of the mount point /dir-1/dir-2/dir-3 |
| quotaUsage = routerFs.getQuotaUsage(new Path("/dir-1/dir-2/dir-3")); |
| assertEquals(nsQuota, quotaUsage.getQuota()); |
| assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota()); |
| // Verify getQuotaUsage() of the mount point /dir-4 |
| quotaUsage = routerFs.getQuotaUsage(new Path("/dir-4")); |
| assertEquals(-1, quotaUsage.getQuota()); |
| assertEquals(-1, quotaUsage.getSpaceQuota()); |
| |
| routerFs.mkdirs(new Path("/dir-1/dir-normal")); |
| try { |
| // Verify getQuotaUsage() of the normal path /dir-1/dir-normal. |
| quotaUsage = routerFs.getQuotaUsage(new Path("/dir-1/dir-normal")); |
| assertEquals(-1, quotaUsage.getQuota()); |
| assertEquals(-1, quotaUsage.getSpaceQuota()); |
| |
| routerFs.setQuota(new Path("/dir-1/dir-normal"), 100, 200); |
| // Verify getQuotaUsage() of the normal path /dir-1/dir-normal. |
| quotaUsage = routerFs.getQuotaUsage(new Path("/dir-1/dir-normal")); |
| assertEquals(100, quotaUsage.getQuota()); |
| assertEquals(200, quotaUsage.getSpaceQuota()); |
| } finally { |
| // clear normal path. |
| routerFs.delete(new Path("/dir-1/dir-normal"), true); |
| } |
| } |
| |
| /** |
| * RouterQuotaUpdateService.periodicInvoke() should only update usage in |
| * cache. The mount table in state store shouldn't be updated. |
| */ |
| @Test |
| public void testRouterQuotaUpdateService() throws Exception { |
| Router router = routerContext.getRouter(); |
| StateStoreDriver driver = router.getStateStore().getDriver(); |
| RouterQuotaUpdateService updateService = |
| router.getQuotaCacheUpdateService(); |
| RouterQuotaManager quotaManager = router.getQuotaManager(); |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| nnFs.mkdirs(new Path("/dir-1")); |
| |
| // init mount table. |
| MountTable mountTable = MountTable.newInstance("/dir-1", |
| Collections.singletonMap("ns0", "/dir-1")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| // verify the mount table in state store is updated. |
| QueryResult<MountTable> result = driver.get(MountTable.class); |
| RouterQuotaUsage quotaOnStorage = result.getRecords().get(0).getQuota(); |
| assertEquals(nsQuota, quotaOnStorage.getQuota()); |
| assertEquals(ssQuota, quotaOnStorage.getSpaceQuota()); |
| assertEquals(0, quotaOnStorage.getFileAndDirectoryCount()); |
| assertEquals(0, quotaOnStorage.getSpaceConsumed()); |
| |
| // test RouterQuotaUpdateService.periodicInvoke(). |
| updateService.periodicInvoke(); |
| |
| // RouterQuotaUpdateService should update usage in local cache. |
| RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage("/dir-1"); |
| assertEquals(nsQuota, quotaUsage.getQuota()); |
| assertEquals(ssQuota, quotaUsage.getSpaceQuota()); |
| assertEquals(1, quotaUsage.getFileAndDirectoryCount()); |
| assertEquals(0, quotaUsage.getSpaceConsumed()); |
| // RouterQuotaUpdateService shouldn't update mount entry in state store. |
| result = driver.get(MountTable.class); |
| quotaOnStorage = result.getRecords().get(0).getQuota(); |
| assertEquals(nsQuota, quotaOnStorage.getQuota()); |
| assertEquals(ssQuota, quotaOnStorage.getSpaceQuota()); |
| assertEquals(0, quotaOnStorage.getFileAndDirectoryCount()); |
| assertEquals(0, quotaOnStorage.getSpaceConsumed()); |
| } |
| |
| /** |
| * Verify whether quota is updated properly. |
| * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update |
| * the quota even if the destination directory for some mount entry is not |
| * present in the filesystem. |
| */ |
| @Test |
| public void testQuotaUpdateWhenDestinationNotPresent() throws Exception { |
| long nsQuota = 5; |
| long ssQuota = 3 * BLOCK_SIZE; |
| String path = "/dst-not-present"; |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| |
| // Add one mount table: |
| // /dst-not-present --> ns0---/dst-not-present. |
| nnFs.mkdirs(new Path(path)); |
| MountTable mountTable = |
| MountTable.newInstance(path, Collections.singletonMap("ns0", path)); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| |
| Router router = routerContext.getRouter(); |
| RouterQuotaManager quotaManager = router.getQuotaManager(); |
| RouterQuotaUpdateService updateService = |
| router.getQuotaCacheUpdateService(); |
| // verify quota usage is updated in RouterQuotaManager. |
| updateService.periodicInvoke(); |
| RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage(path); |
| assertEquals(nsQuota, quotaUsage.getQuota()); |
| assertEquals(ssQuota, quotaUsage.getSpaceQuota()); |
| assertEquals(1, quotaUsage.getFileAndDirectoryCount()); |
| assertEquals(0, quotaUsage.getSpaceConsumed()); |
| |
| // Update quota to [nsQuota*2, ssQuota*2]. |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota * 2) |
| .spaceQuota(ssQuota * 2).build()); |
| updateMountTable(mountTable); |
| // Remove /dst-not-present. |
| nnFs.delete(new Path(path), true); |
| |
| // verify quota is updated in RouterQuotaManager. |
| updateService.periodicInvoke(); |
| quotaUsage = quotaManager.getQuotaUsage(path); |
| assertEquals(nsQuota * 2, quotaUsage.getQuota()); |
| assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota()); |
| assertEquals(0, quotaUsage.getFileAndDirectoryCount()); |
| assertEquals(0, quotaUsage.getSpaceConsumed()); |
| } |
| |
| /** |
| * Add three mount tables. |
| * /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota] |
| * /dir-1/dir-2 --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2] |
| * /dir-1/dir-2/dir-3 --> ns0---/dir-3 [QUOTA_UNSET, QUOTA_UNSET] |
| * /dir-4 --> ns0---/dir-4 [QUOTA_UNSET, QUOTA_UNSET] |
| * |
| * Expect three remote locations' global quota. |
| * ns0---/dir-1 --> [nsQuota, ssQuota] |
| * ns0---/dir-2 --> [nsQuota, ssQuota * 2] |
| * ns0---/dir-3 --> [nsQuota, ssQuota * 2] |
| * ns0---/dir-4 --> [-1, -1] |
| */ |
| private void prepareGlobalQuotaTestMountTable(long nsQuota, long ssQuota) |
| throws IOException { |
| final FileSystem nnFs = nnContext1.getFileSystem(); |
| |
| // Create destination directory |
| nnFs.mkdirs(new Path("/dir-1")); |
| nnFs.mkdirs(new Path("/dir-2")); |
| nnFs.mkdirs(new Path("/dir-3")); |
| nnFs.mkdirs(new Path("/dir-4")); |
| |
| MountTable mountTable = MountTable.newInstance("/dir-1", |
| Collections.singletonMap("ns0", "/dir-1")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) |
| .spaceQuota(ssQuota).build()); |
| addMountTable(mountTable); |
| mountTable = MountTable.newInstance("/dir-1/dir-2", |
| Collections.singletonMap("ns0", "/dir-2")); |
| mountTable.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota * 2) |
| .build()); |
| addMountTable(mountTable); |
| mountTable = MountTable.newInstance("/dir-1/dir-2/dir-3", |
| Collections.singletonMap("ns0", "/dir-3")); |
| addMountTable(mountTable); |
| mountTable = MountTable.newInstance("/dir-4", |
| Collections.singletonMap("ns0", "/dir-4")); |
| addMountTable(mountTable); |
| |
| // Ensure setQuota RPC was invoked and mount table was updated. |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| } |
| |
| /** |
| * Add two mount tables. |
| * /type0 --> ns0---/type0 [-1, -1, {STORAGE_TYPE:quota}] |
| * /type0/type1 --> ns0---/type1 [-1, -1, {STORAGE_TYPE:quota}] |
| * |
| * Add two files with storage policy HOT. |
| * /type0/file --> ns0---/type0/file |
| * /type0/type1/file --> ns0---/type1/file |
| */ |
| private void prepareStorageTypeQuotaTestMountTable(StorageType type, |
| long blkSize, long quota0, long quota1, int len0, int len1) |
| throws Exception { |
| final FileSystem nnFs1 = nnContext1.getFileSystem(); |
| |
| nnFs1.mkdirs(new Path("/type0")); |
| nnFs1.mkdirs(new Path("/type1")); |
| ((DistributedFileSystem) nnContext1.getFileSystem()) |
| .createFile(new Path("/type0/file")).storagePolicyName("HOT") |
| .blockSize(blkSize).build().close(); |
| ((DistributedFileSystem) nnContext1.getFileSystem()) |
| .createFile(new Path("/type1/file")).storagePolicyName("HOT") |
| .blockSize(blkSize).build().close(); |
| DFSClient client = nnContext1.getClient(); |
| appendData("/type0/file", client, len0); |
| appendData("/type1/file", client, len1); |
| |
| MountTable mountTable = MountTable |
| .newInstance("/type0", Collections.singletonMap("ns0", "/type0")); |
| mountTable.setQuota( |
| new RouterQuotaUsage.Builder().typeQuota(type, quota0).build()); |
| addMountTable(mountTable); |
| mountTable = MountTable |
| .newInstance("/type0/type1", Collections.singletonMap("ns0", "/type1")); |
| mountTable.setQuota( |
| new RouterQuotaUsage.Builder().typeQuota(type, quota1).build()); |
| addMountTable(mountTable); |
| |
| // ensure mount table is updated to Router. |
| RouterQuotaUpdateService updateService = routerContext.getRouter() |
| .getQuotaCacheUpdateService(); |
| updateService.periodicInvoke(); |
| } |
| |
| private void verifyTypeQuotaAndConsume(long[] quota, long[] consume, |
| QuotaUsage usage) { |
| for (StorageType t : StorageType.values()) { |
| if (quota != null) { |
| assertEquals(quota[t.ordinal()], usage.getTypeQuota(t)); |
| } |
| if (consume != null) { |
| assertEquals(consume[t.ordinal()], usage.getTypeConsumed(t)); |
| } |
| } |
| } |
| } |