blob: 9d56f130c201106bad5c28b0918415552db5e909 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.StringUtils;
/**
* Test utility to mimic a federated HDFS cluster with a router and a state
* store.
*/
public class StateStoreDFSCluster extends MiniRouterDFSCluster {
private static final Class<?> DEFAULT_FILE_RESOLVER =
MountTableResolver.class;
private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
MembershipNamenodeResolver.class;
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval)
throws IOException, InterruptedException {
this(ha, numNameservices, numNamenodes, heartbeatInterval,
cacheFlushInterval, DEFAULT_FILE_RESOLVER);
}
public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
throws IOException, InterruptedException {
super(ha, numNameservices, numNamenodes, heartbeatInterval,
cacheFlushInterval);
// Attach state store and resolvers to router
Configuration stateStoreConfig = getStateStoreConfiguration();
// Use state store backed resolvers
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
fileResolver, FileSubclusterResolver.class);
this.addRouterOverrides(stateStoreConfig);
}
public StateStoreDFSCluster(boolean ha, int numNameservices,
Class<?> fileResolver) throws IOException, InterruptedException {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver);
}
public StateStoreDFSCluster(boolean ha, int numNameservices)
throws IOException, InterruptedException {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
public StateStoreDFSCluster(boolean ha, int numNameservices,
int numNamnodes) throws IOException, InterruptedException {
this(ha, numNameservices, numNamnodes,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
/////////////////////////////////////////////////////////////////////////////
// State Store Test Fixtures
/////////////////////////////////////////////////////////////////////////////
/**
* Adds test fixtures for NN registation for each NN nameservice -> NS
* namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
* STANDBY safeMode -> false blockPool -> test.
*
* @param stateStore State Store.
* @throws IOException If it cannot register.
*/
public void createTestRegistration(StateStoreService stateStore)
throws IOException {
List<MembershipState> entries = new ArrayList<MembershipState>();
for (NamenodeContext nn : this.getNamenodes()) {
MembershipState entry = createMockRegistrationForNamenode(
nn.getNameserviceId(), nn.getNamenodeId(),
FederationNamenodeServiceState.STANDBY);
entries.add(entry);
}
synchronizeRecords(
stateStore, entries, MembershipState.class);
}
public void createTestMountTable(StateStoreService stateStore)
throws IOException {
List<MountTable> mounts = generateMockMountTable();
synchronizeRecords(stateStore, mounts, MountTable.class);
stateStore.refreshCaches();
}
public List<MountTable> generateMockMountTable() throws IOException {
// create table entries
List<MountTable> entries = new ArrayList<>();
for (String ns : this.getNameservices()) {
Map<String, String> destMap = new HashMap<>();
destMap.put(ns, getNamenodePathForNS(ns));
// Direct path
String fedPath = getFederatedPathForNS(ns);
MountTable entry = MountTable.newInstance(fedPath, destMap);
entries.add(entry);
}
// Root path goes to nameservice 1
Map<String, String> destMap = new HashMap<>();
String ns0 = this.getNameservices().get(0);
destMap.put(ns0, "/");
MountTable entry = MountTable.newInstance("/", destMap);
entries.add(entry);
return entries;
}
/**
* Get the client configuration which targets all the Routers. It uses the HA
* setup to fails over between them.
* @return Configuration for the client which uses two routers.
*/
public Configuration getRouterClientConf() {
List<RouterContext> routers = getRouters();
Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
int i = 0;
List<String> names = new ArrayList<>(routers.size());
for (RouterContext routerContext : routers) {
String name = "r" + i++;
clientConf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
"localhost:" + routerContext.getRpcPort());
names.add(name);
}
clientConf.set(DFSUtil.addKeySuffixes(
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
StringUtils.join(",", names));
return clientConf;
}
}