blob: 7687216a98a21d7b29455fdcec0369c629215de1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
/**
* This class is responsible for handling all of the Admin calls to the HDFS
* router. It is created, started, and stopped by {@link Router}.
*/
public class RouterAdminServer extends AbstractService
implements MountTableManager {
private static final Logger LOG =
LoggerFactory.getLogger(RouterAdminServer.class);
private Configuration conf;
private final Router router;
private MountTableStore mountTableStore;
/** The Admin server that listens to requests from clients. */
private final Server adminServer;
private final InetSocketAddress adminAddress;
public RouterAdminServer(Configuration conf, Router router)
throws IOException {
super(RouterAdminServer.class.getName());
this.conf = conf;
this.router = router;
int handlerCount = this.conf.getInt(
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
ProtobufRpcEngine.class);
RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
new RouterAdminProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = RouterAdminProtocolService.
newReflectiveBlockingService(routerAdminProtocolTranslator);
InetSocketAddress confRpcAddress = conf.getSocketAddr(
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT,
DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT);
String bindHost = conf.get(
DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
confRpcAddress.getHostName());
LOG.info("Admin server binding to {}:{}",
bindHost, confRpcAddress.getPort());
this.adminServer = new RPC.Builder(this.conf)
.setProtocol(RouterAdminProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(confRpcAddress.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.build();
// The RPC-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddress = this.adminServer.getListenerAddress();
this.adminAddress = new InetSocketAddress(
confRpcAddress.getHostName(), listenAddress.getPort());
router.setAdminServerAddress(this.adminAddress);
}
/** Allow access to the client RPC server for testing. */
@VisibleForTesting
Server getAdminServer() {
return this.adminServer;
}
private MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("Mount table state store is not available.");
}
}
return this.mountTableStore;
}
/**
* Get the RPC address of the admin service.
* @return Administration service RPC address.
*/
public InetSocketAddress getRpcAddress() {
return this.adminAddress;
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.adminServer.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.adminServer != null) {
this.adminServer.stop();
}
super.serviceStop();
}
@Override
public AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException {
return getMountTableStore().addMountTableEntry(request);
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {
return getMountTableStore().updateMountTableEntry(request);
}
@Override
public RemoveMountTableEntryResponse removeMountTableEntry(
RemoveMountTableEntryRequest request) throws IOException {
return getMountTableStore().removeMountTableEntry(request);
}
@Override
public GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException {
return getMountTableStore().getMountTableEntries(request);
}
}