blob: 4813b535e37291f8f752f6930e1a230b4ea7d273 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically update the {@link RouterQuotaUsage}
* cached information in the {@link Router} and update corresponding
* mount table in State Store.
*/
public class RouterQuotaUpdateService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(RouterQuotaUpdateService.class);
private MountTableStore mountTableStore;
private RouterRpcServer rpcServer;
/** Router using this Service. */
private final Router router;
/** Router Quota manager. */
private RouterQuotaManager quotaManager;
public RouterQuotaUpdateService(final Router router) throws IOException {
super(RouterQuotaUpdateService.class.getName());
this.router = router;
this.rpcServer = router.getRpcServer();
this.quotaManager = router.getQuotaManager();
if (this.quotaManager == null) {
throw new IOException("Router quota manager is not initialized.");
}
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.setIntervalMs(conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL,
RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS));
super.serviceInit(conf);
}
@Override
protected void periodicInvoke() {
LOG.debug("Start to update quota cache.");
try {
List<MountTable> updateMountTables = new LinkedList<>();
List<MountTable> mountTables = getQuotaSetMountTables();
for (MountTable entry : mountTables) {
String src = entry.getSourcePath();
RouterQuotaUsage oldQuota = entry.getQuota();
long nsQuota = oldQuota.getQuota();
long ssQuota = oldQuota.getSpaceQuota();
QuotaUsage currentQuotaUsage = null;
// Check whether destination path exists in filesystem. If destination
// is not present, reset the usage. For other mount entry get current
// quota usage
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
if (ret == null) {
currentQuotaUsage = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(0)
.quota(nsQuota)
.spaceConsumed(0)
.spaceQuota(ssQuota).build();
} else {
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
// If any exception occurs catch it and proceed with other entries.
try {
currentQuotaUsage = this.rpcServer.getQuotaModule()
.getQuotaUsage(src);
} catch (IOException ioe) {
LOG.error("Unable to get quota usage for " + src, ioe);
continue;
}
}
// If quota is not set in some subclusters under federation path,
// set quota for this path.
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_RESET) {
try {
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
} catch (IOException ioe) {
LOG.error("Unable to set quota at remote location for "
+ src, ioe);
}
}
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
currentQuotaUsage);
this.quotaManager.put(src, newQuota);
entry.setQuota(newQuota);
// only update mount tables which quota was changed
if (!oldQuota.equals(newQuota)) {
updateMountTables.add(entry);
LOG.debug(
"Update quota usage entity of path: {}, nsCount: {},"
+ " nsQuota: {}, ssCount: {}, ssQuota: {}.",
src, newQuota.getFileAndDirectoryCount(),
newQuota.getQuota(), newQuota.getSpaceConsumed(),
newQuota.getSpaceQuota());
}
}
updateMountTableEntries(updateMountTables);
} catch (IOException e) {
LOG.error("Quota cache updated error.", e);
}
}
/**
* Get mount table store management interface.
* @return MountTableStore instance.
* @throws IOException
*/
private MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("Mount table state store is not available.");
}
}
return this.mountTableStore;
}
/**
* Get all the existing mount tables.
* @return List of mount tables.
* @throws IOException
*/
private List<MountTable> getMountTableEntries() throws IOException {
// scan mount tables from root path
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance("/");
GetMountTableEntriesResponse getResponse = getMountTableStore()
.getMountTableEntries(getRequest);
return getResponse.getEntries();
}
/**
* Get mount tables which quota was set.
* During this time, the quota usage cache will also be updated by
* quota manager:
* 1. Stale paths (entries) will be removed.
* 2. Existing entries will be override and updated.
* @return List of mount tables which quota was set.
* @throws IOException
*/
private List<MountTable> getQuotaSetMountTables() throws IOException {
List<MountTable> mountTables = getMountTableEntries();
Set<String> stalePaths = new HashSet<>();
for (String path : this.quotaManager.getAll()) {
stalePaths.add(path);
}
List<MountTable> neededMountTables = new LinkedList<>();
for (MountTable entry : mountTables) {
// select mount tables which is quota set
if (isQuotaSet(entry)) {
neededMountTables.add(entry);
}
// update mount table entries info in quota cache
String src = entry.getSourcePath();
this.quotaManager.put(src, entry.getQuota());
stalePaths.remove(src);
}
// remove stale paths that currently cached
for (String stalePath : stalePaths) {
this.quotaManager.remove(stalePath);
}
return neededMountTables;
}
/**
* Check if the quota was set in given MountTable.
* @param mountTable Mount table entry.
*/
private boolean isQuotaSet(MountTable mountTable) {
if (mountTable != null) {
return this.quotaManager.isQuotaSet(mountTable.getQuota());
}
return false;
}
/**
* Generate a new quota based on old quota and current quota usage value.
* @param oldQuota Old quota stored in State Store.
* @param currentQuotaUsage Current quota usage value queried from
* subcluster.
* @return A new RouterQuotaUsage.
*/
private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
QuotaUsage currentQuotaUsage) {
RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
.quota(oldQuota.getQuota())
.spaceConsumed(currentQuotaUsage.getSpaceConsumed())
.spaceQuota(oldQuota.getSpaceQuota()).build();
return newQuota;
}
/**
* Write out updated mount table entries into State Store.
* @param updateMountTables Mount tables to be updated.
* @throws IOException
*/
private void updateMountTableEntries(List<MountTable> updateMountTables)
throws IOException {
for (MountTable entry : updateMountTables) {
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
.newInstance(entry);
try {
getMountTableStore().updateMountTableEntry(updateRequest);
} catch (IOException e) {
LOG.error("Quota update error for mount entry "
+ entry.getSourcePath(), e);
}
}
}
}