blob: 7b53855877a629c322f93ca10ff66e0d253301a8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.namenode.rpc.darpc;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.crail.CrailNodeType;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.metadata.DataNodeInfo;
import org.apache.crail.metadata.FileInfo;
import org.apache.crail.metadata.FileName;
import org.apache.crail.rpc.RpcConnection;
import org.apache.crail.rpc.RpcCreateFile;
import org.apache.crail.rpc.RpcDeleteFile;
import org.apache.crail.rpc.RpcFuture;
import org.apache.crail.rpc.RpcGetBlock;
import org.apache.crail.rpc.RpcGetDataNode;
import org.apache.crail.rpc.RpcGetFile;
import org.apache.crail.rpc.RpcGetLocation;
import org.apache.crail.rpc.RpcPing;
import org.apache.crail.rpc.RpcRemoveDataNode;
import org.apache.crail.rpc.RpcProtocol;
import org.apache.crail.rpc.RpcRenameFile;
import org.apache.crail.rpc.RpcRequestMessage;
import org.apache.crail.rpc.RpcResponseMessage;
import org.apache.crail.rpc.RpcVoid;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCFuture;
import com.ibm.darpc.DaRPCStream;
public class DaRPCNameNodeConnection implements RpcConnection {
private static final Logger LOG = CrailUtils.getLogger();
private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> rpcEndpoint;
private DaRPCStream<DaRPCNameNodeRequest, DaRPCNameNodeResponse> stream;
public DaRPCNameNodeConnection(DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) throws IOException {
this.rpcEndpoint = endpoint;
this.stream = endpoint.createStream();
}
@Override
public RpcFuture<RpcCreateFile> createFile(FileName filename, CrailNodeType type, int storageClass, int locationClass, boolean enumerable) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: createFile, fileType " + type + ", storageClass " + storageClass + ", locationClass " + locationClass);
}
RpcRequestMessage.CreateFileReq createFileReq = new RpcRequestMessage.CreateFileReq(filename, type, storageClass, locationClass, enumerable);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(createFileReq);
request.setCommand(RpcProtocol.CMD_CREATE_FILE);
RpcResponseMessage.CreateFileRes fileRes = new RpcResponseMessage.CreateFileRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcCreateFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcCreateFile>(future, fileRes);
return nameNodeFuture;
}
@Override
public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: getFile, writeable " + writeable);
}
RpcRequestMessage.GetFileReq getFileReq = new RpcRequestMessage.GetFileReq(filename, writeable);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getFileReq);
request.setCommand(RpcProtocol.CMD_GET_FILE);
RpcResponseMessage.GetFileRes fileRes = new RpcResponseMessage.GetFileRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcGetFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetFile>(future, fileRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: setFile, id " + fileInfo.getFd() + ", close " + close);
}
RpcRequestMessage.SetFileReq setFileReq = new RpcRequestMessage.SetFileReq(fileInfo, close);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setFileReq);
request.setCommand(RpcProtocol.CMD_SET_FILE);
RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcDeleteFile> removeFile(FileName filename, boolean recursive) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: removeFile");
}
RpcRequestMessage.RemoveFileReq removeReq = new RpcRequestMessage.RemoveFileReq(filename, recursive);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeReq);
request.setCommand(RpcProtocol.CMD_REMOVE_FILE);
RpcResponseMessage.DeleteFileRes fileRes = new RpcResponseMessage.DeleteFileRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcDeleteFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcDeleteFile>(future, fileRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcRenameFile> renameFile(FileName srcHash, FileName dstHash) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: renameFile");
}
RpcRequestMessage.RenameFileReq renameReq = new RpcRequestMessage.RenameFileReq(srcHash, dstHash);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(renameReq);
request.setCommand(RpcProtocol.CMD_RENAME_FILE);
RpcResponseMessage.RenameRes renameRes = new RpcResponseMessage.RenameRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(renameRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcRenameFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcRenameFile>(future, renameRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcGetBlock> getBlock(long fd, long token, long position, long capacity) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: getBlock, fd " + fd + ", token " + token + ", position " + position + ", capacity " + capacity);
}
RpcRequestMessage.GetBlockReq getBlockReq = new RpcRequestMessage.GetBlockReq(fd, token, position, capacity);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getBlockReq);
request.setCommand(RpcProtocol.CMD_GET_BLOCK);
RpcResponseMessage.GetBlockRes getBlockRes = new RpcResponseMessage.GetBlockRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getBlockRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcGetBlock> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetBlock>(future, getBlockRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcGetLocation> getLocation(FileName fileName, long position) throws IOException {
if (CrailConstants.DEBUG){
LOG.debug("RPC: getLocation, position " + position);
}
RpcRequestMessage.GetLocationReq getLocationReq = new RpcRequestMessage.GetLocationReq(fileName, position);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getLocationReq);
request.setCommand(RpcProtocol.CMD_GET_LOCATION);
RpcResponseMessage.GetLocationRes getLocationRes = new RpcResponseMessage.GetLocationRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getLocationRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcGetLocation> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetLocation>(future, getLocationRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
if (CrailConstants.DEBUG){
LOG.debug("RPC: setBlock, ");
}
RpcRequestMessage.SetBlockReq setBlockReq = new RpcRequestMessage.SetBlockReq(blockInfo);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setBlockReq);
request.setCommand(RpcProtocol.CMD_SET_BLOCK);
RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo) throws Exception {
RpcRequestMessage.GetDataNodeReq getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(dnInfo);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getDataNodeReq);
request.setCommand(RpcProtocol.CMD_GET_DATANODE);
RpcResponseMessage.GetDataNodeRes getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getDataNodeRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcGetDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetDataNode>(future, getDataNodeRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcVoid> dumpNameNode() throws Exception {
RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(dumpNameNodeReq);
request.setCommand(RpcProtocol.CMD_DUMP_NAMENODE);
RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
return nameNodeFuture;
}
@Override
public DaRPCNameNodeFuture<RpcPing> pingNameNode() throws Exception {
RpcRequestMessage.PingNameNodeReq pingReq = new RpcRequestMessage.PingNameNodeReq();
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(pingReq);
request.setCommand(RpcProtocol.CMD_PING_NAMENODE);
RpcResponseMessage.PingNameNodeRes pingRes = new RpcResponseMessage.PingNameNodeRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(pingRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcPing> nameNodeFuture = new DaRPCNameNodeFuture<RpcPing>(future, pingRes);
return nameNodeFuture;
}
@Override
public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress ipaddr, int port) throws Exception {
RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(ipaddr, port);
DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeDataNodeReq);
request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(removeDataNodeRes);
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
DaRPCNameNodeFuture<RpcRemoveDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcRemoveDataNode>(future, removeDataNodeRes);
return nameNodeFuture;
}
@Override
public void close() throws Exception {
if (rpcEndpoint != null){
rpcEndpoint.close();
rpcEndpoint = null;
}
}
private DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> issueRPC(DaRPCNameNodeRequest request, DaRPCNameNodeResponse response) throws IOException{
try {
DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = stream.request(request, response, false);
return future;
} catch(IOException e){
LOG.info("ERROR: RPC failed, messagesSend " + rpcEndpoint.getMessagesSent() + ", messagesReceived " + rpcEndpoint.getMessagesReceived() + ", isConnected " + rpcEndpoint.isConnected() + ", qpNum " + rpcEndpoint.getQp().getQp_num());
throw e;
}
}
@Override
public String toString() {
try {
return rpcEndpoint.getDstAddr().toString();
} catch(Exception e){
return "Unknown";
}
}
}