blob: 459c14ec180421048c8480bd121682d88b826616 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.namenode.rpc.darpc;
import java.net.InetSocketAddress;
import org.apache.crail.conf.CrailConfiguration;
import org.apache.crail.rpc.RpcNameNodeService;
import org.apache.crail.rpc.RpcServer;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.RdmaServerEndpoint;
public class DaRPCNameNodeServer extends RpcServer {
private static final Logger LOG = CrailUtils.getLogger();
private RpcNameNodeService service;
private DaRPCServerGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeServerGroup;
private RdmaServerEndpoint<DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse>> namenodeServerEp;
public DaRPCNameNodeServer(RpcNameNodeService service){
this.service = service;
this.namenodeServerEp = null;
this.namenodeServerGroup = null;
}
public void init(CrailConfiguration conf, String[] args) throws Exception{
DaRPCConstants.updateConstants(conf);
DaRPCConstants.verify();
String _clusterAffinities[] = DaRPCConstants.NAMENODE_DARPC_AFFINITY.split(",");
long clusterAffinities[] = new long[_clusterAffinities.length];
for (int i = 0; i < clusterAffinities.length; i++){
int affinity = Integer.decode(_clusterAffinities[i]).intValue();
clusterAffinities[i] = 1L << affinity;
}
DaRPCServiceDispatcher darpcService = new DaRPCServiceDispatcher(service);
this.namenodeServerGroup = DaRPCServerGroup.createServerGroup(darpcService, clusterAffinities, -1, DaRPCConstants.NAMENODE_DARPC_MAXINLINE, DaRPCConstants.NAMENODE_DARPC_POLLING, DaRPCConstants.NAMENODE_DARPC_RECVQUEUE, DaRPCConstants.NAMENODE_DARPC_SENDQUEUE, DaRPCConstants.NAMENODE_DARPC_POLLSIZE, DaRPCConstants.NAMENODE_DARPC_CLUSTERSIZE);
LOG.info("rpc group started, recvQueue " + namenodeServerGroup.recvQueueSize());
this.namenodeServerEp = namenodeServerGroup.createServerEndpoint();
}
public void printConf(Logger logger){
DaRPCConstants.printConf(logger);
}
@Override
public void run() {
try {
InetSocketAddress addr = CrailUtils.getNameNodeAddress();
namenodeServerEp.bind(addr, DaRPCConstants.NAMENODE_DARPC_BACKLOG);
LOG.info("opened server at " + addr);
while (true) {
DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> clientEndpoint = namenodeServerEp.accept();
LOG.info("accepting RPC connection, qpnum " + clientEndpoint.getQp().getQp_num());
}
} catch(Exception e){
e.printStackTrace();
LOG.error(e.getMessage());
}
}
}