blob: 91af1ca06ac7e793280dc16490a834a7ffd41ee6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The order resolver that depends upon the Router service.
*
* @param <K> The key type of subcluster mapping info queried from Router.
* @param <V> The value type of subcluster mapping info queried from Router.
*/
public abstract class RouterResolver<K, V> implements OrderedResolver {
private static final Logger LOG =
LoggerFactory.getLogger(RouterResolver.class);
/** Configuration key to set the minimum time to update subcluster info. */
public static final String MIN_UPDATE_PERIOD_KEY =
RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "router-resolver.update-period";
/** 10 seconds by default. */
private static final long MIN_UPDATE_PERIOD_DEFAULT = TimeUnit.SECONDS
.toMillis(10);
/** Router service. */
private final Router router;
/** Minimum update time. */
private final long minUpdateTime;
/** K -> T template mapping. */
private Map<K, V> subclusterMapping = null;
/** Last time the subcluster mapping was updated. */
private long lastUpdated;
public RouterResolver(final Configuration conf, final Router routerService) {
this.minUpdateTime = conf.getTimeDuration(MIN_UPDATE_PERIOD_KEY,
MIN_UPDATE_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
this.router = routerService;
}
@Override
public String getFirstNamespace(String path, PathLocation loc) {
updateSubclusterMapping();
return chooseFirstNamespace(path, loc);
}
/**
* The implementation for getting desired subcluster mapping info.
*
* @param membershipStore Membership store the resolver queried from.
* @return The map of desired type info.
*/
protected abstract Map<K, V> getSubclusterInfo(
MembershipStore membershipStore);
/**
* Choose the first namespace from queried subcluster mapping info.
*
* @param path Path to check.
* @param loc Federated location with multiple destinations.
* @return First namespace out of the locations.
*/
protected abstract String chooseFirstNamespace(String path, PathLocation loc);
/**
* Update <NamespaceId, Subcluster Info> mapping info periodically.
*/
private synchronized void updateSubclusterMapping() {
if (subclusterMapping == null
|| (monotonicNow() - lastUpdated) > minUpdateTime) {
// Fetch the mapping asynchronously
Thread updater = new Thread(new Runnable() {
@Override
public void run() {
final MembershipStore membershipStore = getMembershipStore();
if (membershipStore == null) {
LOG.error("Cannot access the Membership store.");
return;
}
subclusterMapping = getSubclusterInfo(membershipStore);
lastUpdated = monotonicNow();
}
});
updater.start();
// Wait until initialized
if (subclusterMapping == null) {
try {
LOG.debug("Wait to get the mapping for the first time");
updater.join();
} catch (InterruptedException e) {
LOG.error("Cannot wait for the updater to finish");
}
}
}
}
/**
* Get the Router RPC server.
*
* @return Router RPC server. Null if not possible.
*/
protected RouterRpcServer getRpcServer() {
if (this.router == null) {
return null;
}
return router.getRpcServer();
}
/**
* Get the Membership store.
*
* @return Membership store.
*/
protected MembershipStore getMembershipStore() {
StateStoreService stateStore = router.getStateStore();
if (stateStore == null) {
return null;
}
return stateStore.getRegisteredRecordStore(MembershipStore.class);
}
/**
* Get subcluster mapping info.
*
* @return The map of subcluster info.
*/
protected Map<K, V> getSubclusterMapping() {
return this.subclusterMapping;
}
}