implement RPC to remove datanodes
diff --git a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
index fc52f14..00eebd9 100644
--- a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
@@ -22,25 +22,29 @@
import java.nio.ByteBuffer;
public class DataNodeStatistics {
- public static final int CSIZE = 12;
+ public static final int CSIZE = 14;
private long serviceId;
private int freeBlockCount;
+ private short status;
public DataNodeStatistics(){
this.serviceId = 0;
this.freeBlockCount = 0;
+ this.status = 0;
}
public int write(ByteBuffer buffer){
buffer.putLong(serviceId);
buffer.putInt(freeBlockCount);
+ buffer.putShort(status);
return CSIZE;
}
public void update(ByteBuffer buffer) throws UnknownHostException {
this.serviceId = buffer.getLong();
this.freeBlockCount = buffer.getInt();
+ this.status = buffer.getShort();
}
public int getFreeBlockCount() {
@@ -51,6 +55,14 @@
this.freeBlockCount = blockCount;
}
+ public short getStatus() {
+ return this.status;
+ }
+
+ public void setStatus(short status) {
+ this.status = status;
+ }
+
public void setStatistics(DataNodeStatistics statistics) {
this.serviceId = statistics.getServiceId();
this.freeBlockCount = statistics.getFreeBlockCount();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
index d434fdb..91ae152 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
@@ -19,6 +19,7 @@
package org.apache.crail.rpc;
import java.io.IOException;
+import java.net.InetAddress;
import org.apache.crail.CrailNodeType;
import org.apache.crail.metadata.BlockInfo;
@@ -59,6 +60,9 @@
public abstract RpcFuture<RpcPing> pingNameNode()
throws Exception;
+
+ public abstract RpcFuture<RpcRemoveDataNode> removeDataNode(
+ InetAddress ipaddr, int port) throws Exception;
public abstract void close() throws Exception;
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
index f8d38f5..605193c 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.crail.rpc;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.crail.CrailNodeType;
@@ -135,6 +136,12 @@
}
@Override
+ public RpcFuture<RpcRemoveDataNode> removeDataNode(
+ InetAddress ipaddr, int port) throws Exception {
+ return connections[0].removeDataNode(ipaddr, port);
+ }
+
+ @Override
public void close() throws Exception {
for (RpcConnection connection : connections){
connection.close();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
index 367e772..199f130 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
@@ -55,7 +55,8 @@
public static short ERR_DIR_LOCATION_AFFINITY_MISMATCH = 26;
public static short ERR_ADD_BLOCK_FAILED = 27;
public static short ERR_CREATE_FILE_BUG = 28;
-
+ public static short ERR_DATANODE_STOP = 29;
+
static {
messages[ERR_OK] = "ERROR: No error, all fine";
messages[ERR_UNKNOWN] = "ERROR: Unknown error";
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
new file mode 100644
index 0000000..ad3db57
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
@@ -0,0 +1,5 @@
+package org.apache.crail.rpc;
+
+public interface RpcRemoveDataNode extends RpcResponse {
+ public short getData();
+}
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
index 26409f7..7b53855 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
@@ -19,6 +19,7 @@
package org.apache.crail.namenode.rpc.darpc;
import java.io.IOException;
+import java.net.InetAddress;
import org.apache.crail.CrailNodeType;
import org.apache.crail.conf.CrailConstants;
@@ -35,6 +36,7 @@
import org.apache.crail.rpc.RpcGetFile;
import org.apache.crail.rpc.RpcGetLocation;
import org.apache.crail.rpc.RpcPing;
+import org.apache.crail.rpc.RpcRemoveDataNode;
import org.apache.crail.rpc.RpcProtocol;
import org.apache.crail.rpc.RpcRenameFile;
import org.apache.crail.rpc.RpcRequestMessage;
@@ -268,6 +270,23 @@
return nameNodeFuture;
}
+
+ @Override
+ public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress ipaddr, int port) throws Exception {
+
+ RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(ipaddr, port);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeDataNodeReq);
+ request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+
+ RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(removeDataNodeRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcRemoveDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcRemoveDataNode>(future, removeDataNodeRes);
+
+ return nameNodeFuture;
+ }
@Override
public void close() throws Exception {
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
index f87ba9d..f496edf 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
@@ -45,6 +45,7 @@
private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+ private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
public DaRPCNameNodeRequest() {
this.cmd = 0;
@@ -60,6 +61,7 @@
this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
+ this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq();
}
public DaRPCNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
@@ -115,6 +117,11 @@
this.type = message.getType();
this.pingNameNodeReq = message;
}
+
+ public DaRPCNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) {
+ this.type = message.getType();
+ this.removeDataNodeReq = message;
+ }
public void setCommand(short command) {
this.cmd = command;
@@ -163,6 +170,9 @@
case RpcProtocol.REQ_PING_NAMENODE:
written += pingNameNodeReq.write(buffer);
break;
+ case RpcProtocol.REQ_REMOVE_DATANODE:
+ written += removeDataNodeReq.write(buffer);
+ break;
}
return written;
@@ -206,6 +216,9 @@
case RpcProtocol.REQ_PING_NAMENODE:
pingNameNodeReq.update(buffer);
break;
+ case RpcProtocol.REQ_REMOVE_DATANODE:
+ removeDataNodeReq.update(buffer);
+ break;
}
}
@@ -260,4 +273,8 @@
public RpcRequestMessage.PingNameNodeReq pingNameNode(){
return this.pingNameNodeReq;
}
+
+ public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+ return this.removeDataNodeReq;
+ }
}
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
index a4e75f4..bd1d84b 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
@@ -40,6 +40,7 @@
private RpcResponseMessage.GetLocationRes getLocationRes;
private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+ private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
public DaRPCNameNodeResponse() {
this.type = 0;
@@ -54,6 +55,7 @@
this.getLocationRes = new RpcResponseMessage.GetLocationRes();
this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+ this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
}
public DaRPCNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -100,6 +102,11 @@
this.type = message.getType();
this.pingNameNodeRes = message;
}
+
+ public DaRPCNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) {
+ this.type = message.getType();
+ this.removeDataNodeRes = message;
+ }
public void setType(short type) throws Exception {
this.type = type;
@@ -149,6 +156,11 @@
throw new Exception("Response type not set");
}
break;
+ case RpcProtocol.RES_REMOVE_DATANODE:
+ if (removeDataNodeRes == null){
+ throw new Exception("Response type not set");
+ }
+ break;
}
}
@@ -188,7 +200,10 @@
break;
case RpcProtocol.RES_PING_NAMENODE:
written += pingNameNodeRes.write(buffer);
- break;
+ break;
+ case RpcProtocol.RES_REMOVE_DATANODE:
+ written += removeDataNodeRes.write(buffer);
+ break;
}
return written;
@@ -235,6 +250,10 @@
pingNameNodeRes.update(buffer);
pingNameNodeRes.setError(error);
break;
+ case RpcProtocol.RES_REMOVE_DATANODE:
+ removeDataNodeRes.update(buffer);
+ removeDataNodeRes.setError(error);
+ break;
}
}
@@ -285,4 +304,8 @@
public RpcResponseMessage.PingNameNodeRes pingNameNode(){
return this.pingNameNodeRes;
}
+
+ public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+ return this.removeDataNodeRes;
+ }
}
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
index d22e09a..37e5503 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
@@ -45,6 +45,7 @@
private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+ private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
public TcpNameNodeRequest() {
this.cmd = 0;
@@ -60,7 +61,8 @@
this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
- }
+ this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq();
+ }
public TcpNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
this.type = message.getType();
@@ -115,7 +117,12 @@
this.type = message.getType();
this.pingNameNodeReq = message;
}
-
+
+ public TcpNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) {
+ this.type = message.getType();
+ this.removeDataNodeReq = message;
+ }
+
public void setCommand(short command) {
this.cmd = command;
}
@@ -163,6 +170,9 @@
case RpcProtocol.REQ_PING_NAMENODE:
written += pingNameNodeReq.write(buffer);
break;
+ case RpcProtocol.REQ_REMOVE_DATANODE:
+ written += removeDataNodeReq.write(buffer);
+ break;
}
return written;
@@ -206,6 +216,9 @@
case RpcProtocol.REQ_PING_NAMENODE:
pingNameNodeReq.update(buffer);
break;
+ case RpcProtocol.REQ_REMOVE_DATANODE:
+ removeDataNodeReq.update(buffer);
+ break;
}
}
@@ -260,4 +273,8 @@
public RpcRequestMessage.PingNameNodeReq pingNameNode(){
return this.pingNameNodeReq;
}
+
+ public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+ return this.removeDataNodeReq;
+ }
}
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
index 73912df..8a01a19 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
@@ -43,6 +43,7 @@
private RpcResponseMessage.GetLocationRes getLocationRes;
private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+ private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
public TcpNameNodeResponse() {
this.type = 0;
@@ -56,6 +57,7 @@
this.getLocationRes = new RpcResponseMessage.GetLocationRes();
this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+ this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes();
}
public TcpNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -102,6 +104,11 @@
this.type = message.getType();
this.pingNameNodeRes = message;
}
+
+ public TcpNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) {
+ this.type = message.getType();
+ this.removeDataNodeRes = message;
+ }
public void setType(short type) throws Exception {
this.type = type;
@@ -143,7 +150,10 @@
break;
case RpcProtocol.RES_PING_NAMENODE:
written += pingNameNodeRes.write(buffer);
- break;
+ break;
+ case RpcProtocol.RES_REMOVE_DATANODE:
+ written += removeDataNodeRes.write(buffer);
+ break;
}
return written;
@@ -189,7 +199,11 @@
case RpcProtocol.RES_PING_NAMENODE:
pingNameNodeRes.update(buffer);
pingNameNodeRes.setError(error);
- break;
+ break;
+ case RpcProtocol.RES_REMOVE_DATANODE:
+ removeDataNodeRes.update(buffer);
+ removeDataNodeRes.setError(error);
+ break;
}
}
@@ -240,4 +254,8 @@
public RpcResponseMessage.PingNameNodeRes pingNameNode(){
return this.pingNameNodeRes;
}
+
+ public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+ return this.removeDataNodeRes;
+ }
}
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
index decdf1c..6f96e4f 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
@@ -31,6 +31,7 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.net.InetAddress;
public class TcpRpcConnection implements RpcConnection {
static private final Logger LOG = CrailUtils.getLogger();
@@ -184,4 +185,15 @@
return new TcpFuture<RpcPing>(future, resp);
}
+ public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress addr, int port) throws Exception {
+ RpcRequestMessage.RemoveDataNodeReq req = new RpcRequestMessage.RemoveDataNodeReq(addr, port);
+ RpcResponseMessage.RemoveDataNodeRes resp = new RpcResponseMessage.RemoveDataNodeRes();
+
+ TcpNameNodeRequest request = new TcpNameNodeRequest(req);
+ TcpNameNodeResponse response = new TcpNameNodeResponse(resp);
+ request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+ NaRPCFuture<TcpNameNodeRequest, TcpNameNodeResponse> future = endpoint.issueRequest(request, response);
+ return new TcpFuture<RpcRemoveDataNode>(future, resp);
+ }
+
}
\ No newline at end of file
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
index d5a339d..10c32d3 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
@@ -40,6 +40,7 @@
public static final short CMD_DUMP_NAMENODE = 10;
public static final short CMD_PING_NAMENODE = 11;
public static final short CMD_GET_DATANODE = 12;
+ public static final short CMD_REMOVE_DATANODE = 13;
//request types
public static final short REQ_CREATE_FILE = 1;
@@ -53,7 +54,8 @@
public static final short REQ_DUMP_NAMENODE = 10;
public static final short REQ_PING_NAMENODE = 11;
public static final short REQ_GET_DATANODE = 12;
-
+ public static final short REQ_REMOVE_DATANODE = 13;
+
//response types
public static final short RES_VOID = 1;
public static final short RES_CREATE_FILE = 2;
@@ -64,6 +66,7 @@
public static final short RES_GET_LOCATION = 7;
public static final short RES_PING_NAMENODE = 9;
public static final short RES_GET_DATANODE = 10;
+ public static final short RES_REMOVE_DATANODE = 11;
static {
@@ -79,6 +82,7 @@
requestTypes[CMD_DUMP_NAMENODE] = REQ_DUMP_NAMENODE;
requestTypes[CMD_PING_NAMENODE] = REQ_PING_NAMENODE;
requestTypes[CMD_GET_DATANODE] = REQ_GET_DATANODE;
+ requestTypes[CMD_REMOVE_DATANODE] = REQ_REMOVE_DATANODE;
responseTypes[0] = 0;
responseTypes[CMD_CREATE_FILE] = RES_CREATE_FILE;
@@ -92,6 +96,7 @@
responseTypes[CMD_DUMP_NAMENODE] = RES_VOID;
responseTypes[CMD_PING_NAMENODE] = RES_PING_NAMENODE;
responseTypes[CMD_GET_DATANODE] = RES_GET_DATANODE;
+ responseTypes[CMD_REMOVE_DATANODE] = RES_REMOVE_DATANODE;
}
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
index be043a2..7d1f414 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
@@ -18,6 +18,8 @@
package org.apache.crail.rpc;
+import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -571,7 +573,63 @@
public void update(ByteBuffer buffer) {
op = buffer.getInt();
}
- }
+ }
+
+ public static class RemoveDataNodeReq implements RpcProtocol.NameNodeRpcMessage {
+ private InetAddress ipAddr;
+ private int port;
+
+ public RemoveDataNodeReq(){
+ this.ipAddr = null;
+ this.port = 0;
+ }
+
+ public RemoveDataNodeReq(InetAddress addr, int port){
+ this.ipAddr = addr;
+ this.port = port;
+ }
+
+ public int size() {
+ // sizeof(ip-addr) + sizeof(port)
+ return 4 + Integer.BYTES;
+ }
+
+ public short getType(){
+ return RpcProtocol.REQ_REMOVE_DATANODE;
+ }
+
+ public int write(ByteBuffer buffer) throws IOException {
+ int size = size();
+
+ checkSize(buffer.remaining());
+
+ buffer.put(this.getIPAddress().getAddress());
+ buffer.putInt(this.port());
+ return size;
+ }
+
+ private void checkSize(int remaining) throws IOException {
+ if(this.size() > remaining)
+ throw new IOException("Only " + remaining + " remaining bytes stored in buffer, however " + this.size() + " bytes are required");
+ }
+
+ public void update(ByteBuffer buffer) throws IOException {
+ checkSize(buffer.remaining());
+
+ byte[] b = new byte[4];
+ buffer.get(b);
+ this.ipAddr = InetAddress.getByAddress(b);
+ this.port = buffer.getInt();
+ }
+
+ public InetAddress getIPAddress(){
+ return this.ipAddr;
+ }
+
+ public int port(){
+ return this.port;
+ }
+ }
}
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
index d4175af..32697ea 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
@@ -598,6 +598,10 @@
public void setFreeBlockCount(int blockCount) {
this.statistics.setFreeBlockCount(blockCount);
}
+
+ public void setStatus(short status) {
+ this.statistics.setStatus(status);
+ }
public short getError(){
return 0;
@@ -652,4 +656,49 @@
this.error = error;
}
}
+
+ public static class RemoveDataNodeRes implements RpcProtocol.NameNodeRpcMessage, RpcRemoveDataNode {
+ public static int CSIZE = Short.BYTES;
+
+ private short data;
+ private short error;
+
+ public RemoveDataNodeRes() {
+ this.data = 0;
+ this.error = 0;
+ }
+
+ public int size() {
+ return CSIZE;
+ }
+
+ public short getType(){
+ return RpcProtocol.RES_REMOVE_DATANODE;
+ }
+
+ public int write(ByteBuffer buffer) {
+ buffer.putShort(data);
+ return CSIZE;
+ }
+
+ public void update(ByteBuffer buffer) {
+ data = buffer.getShort();
+ }
+
+ public short getData(){
+ return data;
+ }
+
+ public void setData(short data) {
+ this.data = data;
+ }
+
+ public short getError(){
+ return error;
+ }
+
+ public void setError(short error) {
+ this.error = error;
+ }
+ }
}