blob: 89a1e2cf16971338511730b85847564c65878ad2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.namenode.rpc.darpc;
import java.util.concurrent.atomic.AtomicLong;
import java.io.IOException;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcNameNodeService;
import org.apache.crail.rpc.RpcNameNodeState;
import org.apache.crail.rpc.RpcProtocol;
import org.apache.crail.rpc.RpcRequestMessage;
import org.apache.crail.rpc.RpcResponseMessage;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService;
public class DaRPCServiceDispatcher extends DaRPCNameNodeProtocol implements DaRPCService<DaRPCNameNodeRequest, DaRPCNameNodeResponse> {
private static final Logger LOG = CrailUtils.getLogger();
private RpcNameNodeService service;
private AtomicLong totalOps;
private AtomicLong createOps;
private AtomicLong lookupOps;
private AtomicLong setOps;
private AtomicLong removeOps;
private AtomicLong renameOps;
private AtomicLong getOps;
private AtomicLong locationOps;
private AtomicLong errorOps;
public DaRPCServiceDispatcher(RpcNameNodeService service){
this.service = service;
this.totalOps = new AtomicLong(0);
this.createOps = new AtomicLong(0);
this.lookupOps = new AtomicLong(0);
this.setOps = new AtomicLong(0);
this.removeOps = new AtomicLong(0);
this.renameOps = new AtomicLong(0);
this.getOps = new AtomicLong(0);
this.locationOps = new AtomicLong(0);
this.errorOps = new AtomicLong(0);
}
public void processServerEvent(DaRPCServerEvent<DaRPCNameNodeRequest, DaRPCNameNodeResponse> event) {
DaRPCNameNodeRequest request = event.getReceiveMessage();
DaRPCNameNodeResponse response = event.getSendMessage();
short error = RpcErrors.ERR_OK;
try {
response.setType(RpcProtocol.responseTypes[request.getCmd()]);
response.setError((short) 0);
switch(request.getCmd()) {
case RpcProtocol.CMD_CREATE_FILE:
this.totalOps.incrementAndGet();
this.createOps.incrementAndGet();
error = service.createFile(request.createFile(), response.createFile(), response);
break;
case RpcProtocol.CMD_GET_FILE:
this.totalOps.incrementAndGet();
this.lookupOps.incrementAndGet();
error = service.getFile(request.getFile(), response.getFile(), response);
break;
case RpcProtocol.CMD_SET_FILE:
this.totalOps.incrementAndGet();
this.setOps.incrementAndGet();
error = service.setFile(request.setFile(), response.getVoid(), response);
break;
case RpcProtocol.CMD_REMOVE_FILE:
this.totalOps.incrementAndGet();
this.removeOps.incrementAndGet();
error = service.removeFile(request.removeFile(), response.delFile(), response);
break;
case RpcProtocol.CMD_RENAME_FILE:
this.totalOps.incrementAndGet();
this.renameOps.incrementAndGet();
error = service.renameFile(request.renameFile(), response.getRename(), response);
break;
case RpcProtocol.CMD_GET_BLOCK:
this.totalOps.incrementAndGet();
this.getOps.incrementAndGet();
error = service.getBlock(request.getBlock(), response.getBlock(), response);
break;
case RpcProtocol.CMD_GET_LOCATION:
this.totalOps.incrementAndGet();
this.locationOps.incrementAndGet();
error = service.getLocation(request.getLocation(), response.getLocation(), response);
break;
case RpcProtocol.CMD_SET_BLOCK:
error = service.setBlock(request.setBlock(), response.getVoid(), response);
break;
case RpcProtocol.CMD_GET_DATANODE:
error = service.getDataNode(request.getDataNode(), response.getDataNode(), response);
break;
case RpcProtocol.CMD_DUMP_NAMENODE:
error = service.dump(request.dumpNameNode(), response.getVoid(), response);
break;
case RpcProtocol.CMD_PING_NAMENODE:
error = this.stats(request.pingNameNode(), response.pingNameNode(), response);
error = service.ping(request.pingNameNode(), response.pingNameNode(), response);
break;
default:
error = RpcErrors.ERR_INVALID_RPC_CMD;
LOG.info("Rpc command not valid, opcode " + request.getCmd());
}
} catch(Exception e){
error = RpcErrors.ERR_UNKNOWN;
this.errorOps.incrementAndGet();
LOG.info(RpcErrors.messages[RpcErrors.ERR_UNKNOWN] + e.getMessage());
e.printStackTrace();
}
try {
response.setError(error);
event.triggerResponse();
} catch(Exception e){
LOG.info("ERROR: RPC failed, messagesSend ");
e.printStackTrace();
}
}
public short stats(RpcRequestMessage.PingNameNodeReq request, RpcResponseMessage.PingNameNodeRes response, RpcNameNodeState errorState) throws Exception {
if (!RpcProtocol.verifyProtocol(RpcProtocol.CMD_PING_NAMENODE, request, response)){
return RpcErrors.ERR_PROTOCOL_MISMATCH;
}
LOG.info("totalOps " + totalOps.get());
LOG.info("errorOps " + errorOps.get());
LOG.info("createOps " + createOps.get());
LOG.info("lookupOps " + lookupOps.get());
LOG.info("setOps " + setOps.get());
LOG.info("removeOps " + removeOps.get());
LOG.info("renameOps " + renameOps.get());
LOG.info("getOps " + getOps.get());
LOG.info("locationOps " + locationOps.get());
return RpcErrors.ERR_OK;
}
@Override
public void open(DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) {
try {
LOG.info("RPC connection, qpnum " + endpoint.getQp().getQp_num());
} catch(IOException e) {
LOG.info("RPC connection, cannot get qpnum, because QP is not open.\n");
}
}
@Override
public void close(DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) {
try {
LOG.info("disconnecting RPC connection, qpnum " + endpoint.getQp().getQp_num());
endpoint.close();
} catch(Exception e){
}
}
}