blob: 77d21e3d4cec03eee6d313e65e7bc575bb7deae4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
/**
* Tests quota behaviors in Router-based Federation.
*/
public class TestRouterQuota {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static NamenodeContext nnContext2;
private static RouterContext routerContext;
private static MountTableResolver resolver;
private static final int BLOCK_SIZE = 512;
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.quota()
.rpc()
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s");
// override some hdfs settings that used in testing space quota
Configuration hdfsConf = new Configuration(false);
hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1);
cluster.addRouterOverrides(routerConf);
cluster.addNamenodeOverrides(hdfsConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null);
routerContext = cluster.getRandomRouter();
Router router = routerContext.getRouter();
resolver = (MountTableResolver) router.getSubclusterResolver();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testNamespaceQuotaExceed() throws Exception {
long nsQuota = 3;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /nsquota --> ns0---testdir1
// /nsquota/subdir --> ns1---testdir2
nnFs1.mkdirs(new Path("/testdir1"));
nnFs2.mkdirs(new Path("/testdir2"));
MountTable mountTable1 = MountTable.newInstance("/nsquota",
Collections.singletonMap("ns0", "/testdir1"));
mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir",
Collections.singletonMap("ns1", "/testdir2"));
mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
addMountTable(mountTable2);
final FileSystem routerFs = routerContext.getFileSystem();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean isNsQuotaViolated = false;
try {
// create new directory to trigger NSQuotaExceededException
routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID()));
routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID()));
} catch (NSQuotaExceededException e) {
isNsQuotaViolated = true;
} catch (IOException ignored) {
}
return isNsQuotaViolated;
}
}, 5000, 60000);
// mkdir in real FileSystem should be okay
nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID()));
nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID()));
// delete/rename call should be still okay
routerFs.delete(new Path("/nsquota"), true);
routerFs.rename(new Path("/nsquota/subdir"), new Path("/nsquota/subdir"));
}
@Test
public void testStorageSpaceQuotaaExceed() throws Exception {
long ssQuota = 3071;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /ssquota --> ns0---testdir3
// /ssquota/subdir --> ns1---testdir4
nnFs1.mkdirs(new Path("/testdir3"));
nnFs2.mkdirs(new Path("/testdir4"));
MountTable mountTable1 = MountTable.newInstance("/ssquota",
Collections.singletonMap("ns0", "/testdir3"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir",
Collections.singletonMap("ns1", "/testdir4"));
mountTable2
.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
addMountTable(mountTable2);
final DFSClient routerClient = routerContext.getClient();
routerClient.create("/ssquota/file", true).close();
routerClient.create("/ssquota/subdir/file", true).close();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean isDsQuotaViolated = false;
try {
// append data to trigger NSQuotaExceededException
appendData("/ssquota/file", routerClient, BLOCK_SIZE);
appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
} catch (DSQuotaExceededException e) {
isDsQuotaViolated = true;
} catch (IOException ignored) {
}
return isDsQuotaViolated;
}
}, 5000, 60000);
// append data to destination path in real FileSystem should be okay
appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE);
appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
}
/**
* Add a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to add.
* @return If it was successfully added.
* @throws IOException Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(entry);
AddMountTableEntryResponse addResponse =
mountTableManager.addMountTableEntry(addRequest);
// Reload the Router cache
resolver.loadCache(true);
return addResponse.getStatus();
}
/**
* Append data in specified file.
* @param path Path of file.
* @param client DFS Client.
* @param dataLen The length of write data.
* @throws IOException
*/
private void appendData(String path, DFSClient client, int dataLen)
throws IOException {
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null,
null);
byte[] data = new byte[dataLen];
stream.write(data);
stream.close();
}
@Test
public void testSetQuota() throws Exception {
long nsQuota = 5;
long ssQuota = 100;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /setquota --> ns0---testdir5
// /setquota/subdir --> ns1---testdir6
nnFs1.mkdirs(new Path("/testdir5"));
nnFs2.mkdirs(new Path("/testdir6"));
MountTable mountTable1 = MountTable.newInstance("/setquota",
Collections.singletonMap("ns0", "/testdir5"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable1);
// don't set quota for subpath of mount table
MountTable mountTable2 = MountTable.newInstance("/setquota/subdir",
Collections.singletonMap("ns1", "/testdir6"));
addMountTable(mountTable2);
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
// ensure setQuota RPC call was invoked
updateService.periodicInvoke();
ClientProtocol client1 = nnContext1.getClient().getNamenode();
ClientProtocol client2 = nnContext2.getClient().getNamenode();
final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5");
final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6");
assertEquals(nsQuota, quota1.getQuota());
assertEquals(ssQuota, quota1.getSpaceQuota());
assertEquals(nsQuota, quota2.getQuota());
assertEquals(ssQuota, quota2.getSpaceQuota());
}
@Test
public void testGetQuota() throws Exception {
long nsQuota = 10;
long ssQuota = 100;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /getquota --> ns0---/testdir7
// /getquota/subdir1 --> ns0---/testdir7/subdir
// /getquota/subdir2 --> ns1---/testdir8
nnFs1.mkdirs(new Path("/testdir7"));
nnFs1.mkdirs(new Path("/testdir7/subdir"));
nnFs2.mkdirs(new Path("/testdir8"));
MountTable mountTable1 = MountTable.newInstance("/getquota",
Collections.singletonMap("ns0", "/testdir7"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1",
Collections.singletonMap("ns0", "/testdir7/subdir"));
addMountTable(mountTable2);
MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2",
Collections.singletonMap("ns1", "/testdir8"));
addMountTable(mountTable3);
// use router client to create new files
DFSClient routerClient = routerContext.getClient();
routerClient.create("/getquota/file", true).close();
routerClient.create("/getquota/subdir1/file", true).close();
routerClient.create("/getquota/subdir2/file", true).close();
ClientProtocol clientProtocol = routerContext.getClient().getNamenode();
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota");
// the quota should be aggregated
assertEquals(6, quota.getFileAndDirectoryCount());
}
@Test
public void testStaleQuotaRemoving() throws Exception {
long nsQuota = 20;
long ssQuota = 200;
String stalePath = "/stalequota";
final FileSystem nnFs1 = nnContext1.getFileSystem();
// Add one mount tables:
// /stalequota --> ns0---/testdir9
nnFs1.mkdirs(new Path("/testdir9"));
MountTable mountTable = MountTable.newInstance(stalePath,
Collections.singletonMap("ns0", "/testdir9"));
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
// Call periodicInvoke to ensure quota for stalePath was
// loaded into quota manager.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
// use quota manager to get its quota usage and do verification
RouterQuotaManager quotaManager = routerContext.getRouter()
.getQuotaManager();
RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath);
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
// remove stale path entry
removeMountTable(stalePath);
updateService.periodicInvoke();
// the stale entry should be removed and we will get null
quota = quotaManager.getQuotaUsage(stalePath);
assertNull(quota);
}
/**
* Remove a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to remove.
* @return If it was successfully removed.
* @throws IOException Problems removing entries.
*/
private boolean removeMountTable(String path) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest
.newInstance(path);
RemoveMountTableEntryResponse removeResponse = mountTableManager
.removeMountTableEntry(removeRequest);
// Reload the Router cache
resolver.loadCache(true);
return removeResponse.getStatus();
}
@Test
public void testQuotaUpdating() throws Exception {
long nsQuota = 30;
long ssQuota = 1024;
String path = "/updatequota";
final FileSystem nnFs1 = nnContext1.getFileSystem();
// Add one mount table:
// /updatequota --> ns0---/testdir10
nnFs1.mkdirs(new Path("/testdir10"));
MountTable mountTable = MountTable.newInstance(path,
Collections.singletonMap("ns0", "/testdir10"));
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
// Call periodicInvoke to ensure quota updated in quota manager
// and state store.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
// verify initial quota value
MountTable updatedMountTable = getMountTable(path);
RouterQuotaUsage quota = updatedMountTable.getQuota();
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
assertEquals(1, quota.getFileAndDirectoryCount());
assertEquals(0, quota.getSpaceConsumed());
// mkdir and write a new file
final FileSystem routerFs = routerContext.getFileSystem();
routerFs.mkdirs(new Path(path + "/" + UUID.randomUUID()));
DFSClient routerClient = routerContext.getClient();
routerClient.create(path + "/file", true).close();
appendData(path + "/file", routerClient, BLOCK_SIZE);
updateService.periodicInvoke();
updatedMountTable = getMountTable(path);
quota = updatedMountTable.getQuota();
// verify if quota has been updated in state store
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
assertEquals(3, quota.getFileAndDirectoryCount());
assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
}
/**
* Get the mount table entries of specified path through the admin API.
* @param path Mount table entry to get.
* @return If it was successfully got.
* @throws IOException Problems getting entries.
*/
private MountTable getMountTable(String path) throws IOException {
// Reload the Router cache
resolver.loadCache(true);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(path);
GetMountTableEntriesResponse response = mountTableManager
.getMountTableEntries(getRequest);
List<MountTable> results = response.getEntries();
return !results.isEmpty() ? results.get(0) : null;
}
@Test
public void testQuotaSynchronization() throws IOException {
long updateNsQuota = 3;
long updateSsQuota = 4;
FileSystem nnFs = nnContext1.getFileSystem();
nnFs.mkdirs(new Path("/testsync"));
MountTable mountTable = MountTable.newInstance("/quotaSync",
Collections.singletonMap("ns0", "/testsync"), Time.now(), Time.now());
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(1)
.spaceQuota(2).build());
// Add new mount table
addMountTable(mountTable);
// ensure the quota is not set as updated value
QuotaUsage realQuota = nnContext1.getFileSystem()
.getQuotaUsage(new Path("/testsync"));
assertNotEquals(updateNsQuota, realQuota.getQuota());
assertNotEquals(updateSsQuota, realQuota.getSpaceQuota());
// Call periodicInvoke to ensure quota updated in quota manager
// and state store.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(updateNsQuota)
.spaceQuota(updateSsQuota).build());
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
.newInstance(mountTable);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
mountTableManager.updateMountTableEntry(updateRequest);
// verify if the quota is updated in real path
realQuota = nnContext1.getFileSystem().getQuotaUsage(
new Path("/testsync"));
assertEquals(updateNsQuota, realQuota.getQuota());
assertEquals(updateSsQuota, realQuota.getSpaceQuota());
// Clear the quota
mountTable.setQuota(new RouterQuotaUsage.Builder()
.quota(HdfsConstants.QUOTA_RESET)
.spaceQuota(HdfsConstants.QUOTA_RESET).build());
updateRequest = UpdateMountTableEntryRequest
.newInstance(mountTable);
client = routerContext.getAdminClient();
mountTableManager = client.getMountTableManager();
mountTableManager.updateMountTableEntry(updateRequest);
// verify if the quota is updated in real path
realQuota = nnContext1.getFileSystem().getQuotaUsage(
new Path("/testsync"));
assertEquals(HdfsConstants.QUOTA_RESET, realQuota.getQuota());
assertEquals(HdfsConstants.QUOTA_RESET, realQuota.getSpaceQuota());
}
@Test
public void testQuotaRefreshAfterQuotaExceed() throws Exception {
long nsQuota = 3;
long ssQuota = 100;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /setquota1 --> ns0---testdir11
// /setquota2 --> ns1---testdir12
nnFs1.mkdirs(new Path("/testdir11"));
nnFs2.mkdirs(new Path("/testdir12"));
MountTable mountTable1 = MountTable.newInstance("/setquota1",
Collections.singletonMap("ns0", "/testdir11"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/setquota2",
Collections.singletonMap("ns1", "/testdir12"));
mountTable2
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable2);
final FileSystem routerFs = routerContext.getFileSystem();
// Create directory to make directory count equals to nsQuota
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
// create one more directory to exceed the nsQuota
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
// Call RouterQuotaUpdateService#periodicInvoke to update quota cache
updateService.periodicInvoke();
// Reload the Router cache
resolver.loadCache(true);
RouterQuotaManager quotaManager =
routerContext.getRouter().getQuotaManager();
ClientProtocol client1 = nnContext1.getClient().getNamenode();
ClientProtocol client2 = nnContext2.getClient().getNamenode();
QuotaUsage quota1 = client1.getQuotaUsage("/testdir11");
QuotaUsage quota2 = client2.getQuotaUsage("/testdir12");
QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setquota1");
QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setquota2");
// Verify quota usage
assertEquals(4, quota1.getFileAndDirectoryCount());
assertEquals(4, cacheQuota1.getFileAndDirectoryCount());
assertEquals(1, quota2.getFileAndDirectoryCount());
assertEquals(1, cacheQuota2.getFileAndDirectoryCount());
try {
// create new directory to trigger NSQuotaExceededException
routerFs.mkdirs(new Path("/testdir11/" + UUID.randomUUID()));
fail("Mkdir should be failed under dir /testdir11.");
} catch (NSQuotaExceededException ignored) {
}
// Create directory under the other mount point
routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID()));
routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID()));
// Call RouterQuotaUpdateService#periodicInvoke to update quota cache
updateService.periodicInvoke();
quota1 = client1.getQuotaUsage("/testdir11");
cacheQuota1 = quotaManager.getQuotaUsage("/setquota1");
quota2 = client2.getQuotaUsage("/testdir12");
cacheQuota2 = quotaManager.getQuotaUsage("/setquota2");
// Verify whether quota usage cache is update by periodicInvoke().
assertEquals(4, quota1.getFileAndDirectoryCount());
assertEquals(4, cacheQuota1.getFileAndDirectoryCount());
assertEquals(3, quota2.getFileAndDirectoryCount());
assertEquals(3, cacheQuota2.getFileAndDirectoryCount());
}
/**
* Verify whether mount table and quota usage cache is updated properly.
* {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
* the cache and the mount table even if the destination directory for some
* mount entry is not present in the filesystem.
*/
@Test
public void testQuotaRefreshWhenDestinationNotPresent() throws Exception {
long nsQuota = 5;
long ssQuota = 3*BLOCK_SIZE;
final FileSystem nnFs = nnContext1.getFileSystem();
// Add three mount tables:
// /setdir1 --> ns0---testdir13
// /setdir2 --> ns0---testdir14
// Create destination directory
nnFs.mkdirs(new Path("/testdir13"));
nnFs.mkdirs(new Path("/testdir14"));
MountTable mountTable = MountTable.newInstance("/setdir1",
Collections.singletonMap("ns0", "/testdir13"));
mountTable
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
mountTable = MountTable.newInstance("/setdir2",
Collections.singletonMap("ns0", "/testdir14"));
mountTable
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
final DFSClient routerClient = routerContext.getClient();
// Create file
routerClient.create("/setdir1/file1", true).close();
routerClient.create("/setdir2/file2", true).close();
// append data to the file
appendData("/setdir1/file1", routerClient, BLOCK_SIZE);
appendData("/setdir2/file2", routerClient, BLOCK_SIZE);
RouterQuotaUpdateService updateService =
routerContext.getRouter().getQuotaCacheUpdateService();
// Update quota cache
updateService.periodicInvoke();
// Reload the Router cache
resolver.loadCache(true);
ClientProtocol client1 = nnContext1.getClient().getNamenode();
RouterQuotaManager quotaManager =
routerContext.getRouter().getQuotaManager();
QuotaUsage quota1 = client1.getQuotaUsage("/testdir13");
QuotaUsage quota2 = client1.getQuotaUsage("/testdir14");
QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setdir1");
QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setdir2");
// Get quota details in mount table
MountTable updatedMountTable = getMountTable("/setdir1");
RouterQuotaUsage mountQuota1 = updatedMountTable.getQuota();
updatedMountTable = getMountTable("/setdir2");
RouterQuotaUsage mountQuota2 = updatedMountTable.getQuota();
// Verify quota usage
assertEquals(2, quota1.getFileAndDirectoryCount());
assertEquals(2, cacheQuota1.getFileAndDirectoryCount());
assertEquals(2, mountQuota1.getFileAndDirectoryCount());
assertEquals(2, quota2.getFileAndDirectoryCount());
assertEquals(2, cacheQuota2.getFileAndDirectoryCount());
assertEquals(2, mountQuota2.getFileAndDirectoryCount());
assertEquals(BLOCK_SIZE, quota1.getSpaceConsumed());
assertEquals(BLOCK_SIZE, cacheQuota1.getSpaceConsumed());
assertEquals(BLOCK_SIZE, mountQuota1.getSpaceConsumed());
assertEquals(BLOCK_SIZE, quota2.getSpaceConsumed());
assertEquals(BLOCK_SIZE, cacheQuota2.getSpaceConsumed());
assertEquals(BLOCK_SIZE, mountQuota2.getSpaceConsumed());
FileSystem routerFs = routerContext.getFileSystem();
// Remove destination directory for the mount entry
routerFs.delete(new Path("/setdir1"), true);
// Create file
routerClient.create("/setdir2/file3", true).close();
// append data to the file
appendData("/setdir2/file3", routerClient, BLOCK_SIZE);
int updatedSpace = BLOCK_SIZE + BLOCK_SIZE;
// Update quota cache
updateService.periodicInvoke();
quota2 = client1.getQuotaUsage("/testdir14");
cacheQuota1 = quotaManager.getQuotaUsage("/setdir1");
cacheQuota2 = quotaManager.getQuotaUsage("/setdir2");
// Get quota details in mount table
updatedMountTable = getMountTable("/setdir1");
mountQuota1 = updatedMountTable.getQuota();
updatedMountTable = getMountTable("/setdir2");
mountQuota2 = updatedMountTable.getQuota();
// If destination is not present the quota usage should be reset to 0
assertEquals(0, cacheQuota1.getFileAndDirectoryCount());
assertEquals(0, mountQuota1.getFileAndDirectoryCount());
assertEquals(0, cacheQuota1.getSpaceConsumed());
assertEquals(0, mountQuota1.getSpaceConsumed());
// Verify current quota usage for other mount entries
assertEquals(3, quota2.getFileAndDirectoryCount());
assertEquals(3, cacheQuota2.getFileAndDirectoryCount());
assertEquals(3, mountQuota2.getFileAndDirectoryCount());
assertEquals(updatedSpace, quota2.getSpaceConsumed());
assertEquals(updatedSpace, cacheQuota2.getSpaceConsumed());
assertEquals(updatedSpace, mountQuota2.getSpaceConsumed());
}
}