Support for multiple namenodes
diff --git a/client/src/main/java/com/ibm/crail/conf/CrailConstants.java b/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
index d97b43f..8369d19 100644
--- a/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
+++ b/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
@@ -22,6 +22,7 @@
package com.ibm.crail.conf;
import java.io.IOException;
+
import org.slf4j.Logger;
import com.ibm.crail.utils.CrailUtils;
@@ -30,7 +31,7 @@
private static final Logger LOG = CrailUtils.getLogger();
public static final String VERSION_KEY = "crail.version";
- public static int VERSION = 2900;
+ public static int VERSION = 2990;
public static final String DIRECTORY_DEPTH_KEY = "crail.directorydepth";
public static int DIRECTORY_DEPTH = 16;
@@ -102,6 +103,9 @@
public static final String NAMENODE_RPC_TYPE_KEY = "crail.namenode.rpctype";
public static String NAMENODE_RPC_TYPE = "com.ibm.crail.namenode.rpc.darpc.DaRPCNameNode";
+ public static final String NAMENODE_RPC_SERVICE_KEY = "crail.namenode.rpcservice";
+ public static String NAMENODE_RPC_SERVICE = "com.ibm.crail.namenode.NameNodeService";
+
//storage interface
public static final String STORAGE_TYPES_KEY = "crail.storage.types";
public static String STORAGE_TYPES = "com.ibm.crail.storage.rdma.RdmaStorageTier";
diff --git a/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java b/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
index ae3a7c8..d178995 100644
--- a/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
+++ b/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
@@ -29,6 +29,7 @@
import java.util.LinkedList;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +54,7 @@
import com.ibm.crail.metadata.DataNodeInfo;
import com.ibm.crail.metadata.FileInfo;
import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.rpc.RpcDispatcher;
import com.ibm.crail.rpc.RpcErrors;
import com.ibm.crail.rpc.RpcClient;
import com.ibm.crail.rpc.RpcConnection;
@@ -77,8 +79,8 @@
private static AtomicInteger fsCount = new AtomicInteger(0);
//namenode operations
- private RpcClient rpcNameNode;
- private RpcConnection namenodeClientRpc;
+ private RpcClient rpcClient;
+ private RpcConnection rpcConnection;
//datanode operations
private EndpointCache datanodeEndpointCache;
@@ -121,11 +123,22 @@
//Namenode
InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
- this.rpcNameNode = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
- rpcNameNode.init(conf, null);
- rpcNameNode.printConf(LOG);
- this.namenodeClientRpc = rpcNameNode.connect(nnAddr);
- LOG.info("connected to namenode at " + nnAddr);
+ this.rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
+ rpcClient.init(conf, null);
+ rpcClient.printConf(LOG);
+ ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+ ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+ while(!namenodeList.isEmpty()){
+ InetSocketAddress address = namenodeList.poll();
+ RpcConnection connection = rpcClient.connect(address);
+ connectionList.add(connection);
+ }
+ if (connectionList.size() == 1){
+ this.rpcConnection = connectionList.poll();
+ } else {
+ this.rpcConnection = new RpcDispatcher(connectionList);
+ }
+ LOG.info("connected to namenode(s) " + rpcConnection);
//Client
this.fsId = fsCount.getAndIncrement();
@@ -159,7 +172,7 @@
LOG.info("createNode: name " + path + ", type " + type + ", storageAffinity " + storageClass + ", locationAffinity " + locationClass);
}
- RpcFuture<RpcCreateFile> fileRes = namenodeClientRpc.createFile(name, type, storageClass.value(), locationClass.value());
+ RpcFuture<RpcCreateFile> fileRes = rpcConnection.createFile(name, type, storageClass.value(), locationClass.value());
return new CreateNodeFuture(this, path, type, fileRes);
}
@@ -214,7 +227,7 @@
LOG.info("lookupDirectory: path " + path);
}
- RpcFuture<RpcGetFile> fileRes = namenodeClientRpc.getFile(name, false);
+ RpcFuture<RpcGetFile> fileRes = rpcConnection.getFile(name, false);
return new LookupNodeFuture(this, path, fileRes);
}
@@ -251,7 +264,7 @@
LOG.info("rename: srcname " + src + ", dstname " + dst);
}
- RpcFuture<RpcRenameFile> renameRes = namenodeClientRpc.renameFile(srcPath, dstPath);
+ RpcFuture<RpcRenameFile> renameRes = rpcConnection.renameFile(srcPath, dstPath);
return new RenameNodeFuture(this, src, dst, renameRes);
}
@@ -319,7 +332,7 @@
LOG.info("delete: name " + path + ", recursive " + recursive);
}
- RpcFuture<RpcDeleteFile> fileRes = namenodeClientRpc.removeFile(name, recursive);
+ RpcFuture<RpcDeleteFile> fileRes = rpcConnection.removeFile(name, recursive);
return new DeleteNodeFuture(this, path, recursive, fileRes);
}
@@ -364,7 +377,7 @@
LOG.info("getDirectoryList: " + name);
}
- RpcGetFile fileRes = namenodeClientRpc.getFile(directory, false).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+ RpcGetFile fileRes = rpcConnection.getFile(directory, false).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
if (fileRes.getError() != RpcErrors.ERR_OK) {
LOG.info("getDirectoryList: " + RpcErrors.messages[fileRes.getError()]);
throw new FileNotFoundException(RpcErrors.messages[fileRes.getError()]);
@@ -408,7 +421,7 @@
HashMap<Long, DataNodeInfo> offset2DataNode = new HashMap<Long, DataNodeInfo>();
for (long current = CrailUtils.blockStartAddress(start); current < start + len; current += CrailConstants.BLOCK_SIZE){
- RpcGetLocation getLocationRes = namenodeClientRpc.getLocation(name, current).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+ RpcGetLocation getLocationRes = rpcConnection.getLocation(name, current).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
if (getLocationRes.getError() != RpcErrors.ERR_OK) {
LOG.info("location: " + RpcErrors.messages[getLocationRes.getError()]);
throw new IOException(RpcErrors.messages[getLocationRes.getError()]);
@@ -475,11 +488,11 @@
}
public void dumpNameNode() throws Exception {
- namenodeClientRpc.dumpNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+ rpcConnection.dumpNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
}
public void ping() throws Exception {
- RpcPing pingRes = namenodeClientRpc.pingNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+ RpcPing pingRes = rpcConnection.pingNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
if (pingRes.getError() != RpcErrors.ERR_OK) {
LOG.info("Ping: " + RpcErrors.messages[pingRes.getError()]);
throw new IOException(RpcErrors.messages[pingRes.getError()]);
@@ -541,13 +554,14 @@
bufferCache.close();
datanodeEndpointCache.close();
- rpcNameNode.close();
+ rpcConnection.close();
+ rpcClient.close();
this.isOpen = false;
}
public void closeFile(FileInfo fileInfo) throws Exception {
if (fileInfo.getToken() > 0){
- namenodeClientRpc.setFile(fileInfo, true).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+ rpcConnection.setFile(fileInfo, true).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
@@ -633,7 +647,7 @@
}
RpcConnection getNamenodeClientRpc() {
- return namenodeClientRpc;
+ return rpcConnection;
}
EndpointCache getDatanodeEndpointCache() {
diff --git a/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java b/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
index 4e9ea3b..8662cb0 100644
--- a/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
@@ -25,24 +25,24 @@
import java.nio.ByteBuffer;
public class DataNodeStatistics {
- public static final int CSIZE = 4;
+ public static final int CSIZE = 12;
+ private long serviceId;
private int freeBlockCount;
public DataNodeStatistics(){
+ this.serviceId = 0;
this.freeBlockCount = 0;
}
- public DataNodeStatistics(int freeBlockCount){
- this.freeBlockCount = freeBlockCount;
- }
-
public int write(ByteBuffer buffer){
+ buffer.putLong(serviceId);
buffer.putInt(freeBlockCount);
return CSIZE;
}
public void update(ByteBuffer buffer) throws UnknownHostException {
+ this.serviceId = buffer.getLong();
this.freeBlockCount = buffer.getInt();
}
@@ -55,6 +55,15 @@
}
public void setStatistics(DataNodeStatistics statistics) {
+ this.serviceId = statistics.getServiceId();
this.freeBlockCount = statistics.getFreeBlockCount();
+ }
+
+ public void setServiceId(long serviceId) {
+ this.serviceId = serviceId;
}
+
+ public long getServiceId(){
+ return serviceId;
+ }
}
diff --git a/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java b/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
index e4e37c6..3a4c619 100644
--- a/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
+++ b/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
@@ -63,6 +63,8 @@
public abstract RpcFuture<RpcPing> pingNameNode()
throws Exception;
+ public abstract void close() throws Exception;
+
@SuppressWarnings("unchecked")
public static RpcConnection createInstance(String name) throws Exception {
Class<?> nodeClass = Class.forName(name);
diff --git a/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java b/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java
new file mode 100644
index 0000000..ad92993
--- /dev/null
+++ b/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java
@@ -0,0 +1,137 @@
+package com.ibm.crail.rpc;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+
+import com.ibm.crail.CrailNodeType;
+import com.ibm.crail.metadata.BlockInfo;
+import com.ibm.crail.metadata.DataNodeInfo;
+import com.ibm.crail.metadata.FileInfo;
+import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.utils.CrailUtils;
+
+public class RpcDispatcher implements RpcConnection {
+ private static final Logger LOG = CrailUtils.getLogger();
+
+ private RpcConnection[] connections;
+ private int setBlockIndex;
+ private int getDataNodeIndex;
+
+ public RpcDispatcher(ConcurrentLinkedQueue<RpcConnection> connectionList) {
+ connections = new RpcConnection[connectionList.size()];
+ for (int i = 0; i < connections.length; i++){
+ connections[i] = connectionList.poll();
+ }
+ this.setBlockIndex = 0;
+ this.getDataNodeIndex = 0;
+ }
+
+ @Override
+ public RpcFuture<RpcCreateFile> createFile(FileName filename,
+ CrailNodeType type, int storageClass, int locationClass)
+ throws IOException {
+ int index = filename.getComponent(0) % connections.length;
+// LOG.info("issuing create file for filename [" + filename.toString() + "], on index " + index);
+ return connections[index].createFile(filename, type, storageClass, locationClass);
+ }
+
+ @Override
+ public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable)
+ throws IOException {
+ int index = filename.getComponent(0) % connections.length;
+// LOG.info("issuing get file for filename [" + filename.toString() + "], on index " + index);
+ return connections[index].getFile(filename, writeable);
+ }
+
+ @Override
+ public RpcFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close)
+ throws IOException {
+ long connectionsLength = (long) connections.length;
+ long _index = fileInfo.getFd() % connectionsLength;
+ int index = (int) _index;
+// LOG.info("issuing set file for fd [" + fileInfo.getFd() + "], on index " + index);
+ return connections[index].setFile(fileInfo, close);
+ }
+
+ @Override
+ public RpcFuture<RpcDeleteFile> removeFile(FileName filename,
+ boolean recursive) throws IOException {
+ int index = filename.getComponent(0) % connections.length;
+// LOG.info("issuing remove file for filename [" + filename.toString() + "], on index " + index);
+ return connections[index].removeFile(filename, recursive);
+ }
+
+ @Override
+ public RpcFuture<RpcRenameFile> renameFile(FileName srcHash,
+ FileName dstHash) throws IOException {
+ int srcIndex = srcHash.getComponent(0) % connections.length;
+ int dstIndex = srcHash.getComponent(0) % connections.length;
+// LOG.info("issuing remove file for src [" + srcHash.toString() + "," + dstHash.toString() + "], on index " + srcIndex);
+ if (srcIndex != dstIndex){
+ throw new IOException("Rename not supported across namenode domains");
+ } else {
+ return connections[srcIndex].renameFile(srcHash, dstHash);
+ }
+ }
+
+ @Override
+ public RpcFuture<RpcGetBlock> getBlock(long fd, long token, long position,
+ long capacity) throws IOException {
+ long connectionsLength = (long) connections.length;
+ long _index = fd % connectionsLength;
+ int index = (int) _index;
+// LOG.info("issuing get block for fd [" + fd + "], on index " + index);
+ return connections[index].getBlock(fd, token, position, capacity);
+ }
+
+ @Override
+ public RpcFuture<RpcGetLocation> getLocation(FileName fileName,
+ long position) throws IOException {
+ int index = fileName.getComponent(0) % connections.length;
+// LOG.info("issuing get location for filename [" + fileName.toString() + "], on index " + index);
+ return connections[index].getLocation(fileName, position);
+ }
+
+ @Override
+ public RpcFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
+// LOG.info("issuing set block on index " + setBlockIndex);
+ RpcFuture<RpcVoid> res = connections[setBlockIndex].setBlock(blockInfo);
+ setBlockIndex = (setBlockIndex + 1) % connections.length;
+ return res;
+ }
+
+ @Override
+ public RpcFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo)
+ throws Exception {
+// LOG.info("issuing get datanode on index " + getDataNodeIndex);
+ RpcFuture<RpcGetDataNode> res = connections[getDataNodeIndex].getDataNode(dnInfo);
+ getDataNodeIndex = (getDataNodeIndex + 1) % connections.length;
+ return res;
+ }
+
+ @Override
+ public RpcFuture<RpcVoid> dumpNameNode() throws Exception {
+ return connections[0].dumpNameNode();
+ }
+
+ @Override
+ public RpcFuture<RpcPing> pingNameNode() throws Exception {
+ return connections[0].pingNameNode();
+ }
+
+ @Override
+ public String toString() {
+ String address = "";
+ for (RpcConnection connection : connections){
+ address = address + ", " + connection.toString();
+ }
+
+ return address;
+ }
+
+ @Override
+ public void close() throws Exception {
+ connections[0].close();
+ }
+}
diff --git a/client/src/main/java/com/ibm/crail/utils/CrailUtils.java b/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
index 25baafa..bf77510 100644
--- a/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
+++ b/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
@@ -30,6 +30,8 @@
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -54,11 +56,81 @@
}
public static InetSocketAddress getNameNodeAddress() {
- URI uri = URI.create(CrailConstants.NAMENODE_ADDRESS);
- InetSocketAddress nnAddr = createSocketAddrForHost(uri.getHost(), uri.getPort());
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
+ while(tupleTokenizer.hasMoreTokens()){
+ String address = tupleTokenizer.nextToken();
+ URI uri = URI.create(address);
+ namenodes.add(uri);
+ }
+
+ URI master = namenodes.poll();
+ InetSocketAddress nnAddr = createSocketAddrForHost(master.getHost(), master.getPort());
return nnAddr;
}
+ public static URI getPrimaryNameNode() {
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
+ while(tupleTokenizer.hasMoreTokens()){
+ String address = tupleTokenizer.nextToken();
+ URI uri = URI.create(address);
+ namenodes.add(uri);
+ }
+
+ URI master = namenodes.poll();
+ return master;
+ }
+
+ public static boolean verifyNamenode(String namenode) {
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ ConcurrentHashMap<String, Object> namenodes = new ConcurrentHashMap<String, Object>();
+ while(tupleTokenizer.hasMoreTokens()){
+ String address = tupleTokenizer.nextToken();
+ URI uri = URI.create(address);
+ String node = uri.getHost() + ":" + uri.getPort();
+ namenodes.put(node, node);
+ }
+
+ URI uri = URI.create(namenode);
+ String node = uri.getHost() + ":" + uri.getPort();
+ return namenodes.containsKey(node);
+ }
+
+ public static ConcurrentLinkedQueue<InetSocketAddress> getNameNodeList() {
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ ConcurrentLinkedQueue<InetSocketAddress> namenodes = new ConcurrentLinkedQueue<InetSocketAddress>();
+ while(tupleTokenizer.hasMoreTokens()){
+ String token = tupleTokenizer.nextToken();
+ URI uri = URI.create(token);
+ InetSocketAddress address = createSocketAddrForHost(uri.getHost(), uri.getPort());
+ namenodes.add(address);
+ }
+ return namenodes;
+ }
+
+ public static long getServiceId(String namenode) {
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ ConcurrentHashMap<String, Long> namenodes = new ConcurrentHashMap<String, Long>();
+ long serviceId = 0;
+ while(tupleTokenizer.hasMoreTokens()){
+ String address = tupleTokenizer.nextToken();
+ URI uri = URI.create(address);
+ String node = uri.getHost() + ":" + uri.getPort();
+ namenodes.put(node, serviceId++);
+ }
+
+ URI uri = URI.create(namenode);
+ String node = uri.getHost() + ":" + uri.getPort();
+ long id = namenodes.get(node);
+ return id;
+ }
+
+ public static long getServiceSize() {
+ StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+ return tupleTokenizer.countTokens();
+ }
+
public static final long blockStartAddress(long offset) {
long blockCount = offset / CrailConstants.BLOCK_SIZE;
return blockCount*CrailConstants.BLOCK_SIZE;
@@ -205,4 +277,5 @@
}
return address;
}
+
}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java b/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
index b70f23e..8c0bb94 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
@@ -21,24 +21,19 @@
package com.ibm.crail.namenode;
-import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import com.ibm.crail.CrailNodeType;
-import com.ibm.crail.CrailStorageClass;
import com.ibm.crail.conf.CrailConstants;
import com.ibm.crail.metadata.BlockInfo;
import com.ibm.crail.metadata.FileInfo;
-import com.ibm.crail.metadata.FileName;
public abstract class AbstractNode extends FileInfo implements Delayed {
- private static AtomicLong fdcount = new AtomicLong(0);
-
+// private static AtomicLong fdcount = new AtomicLong(0);
private int fileComponent;
private AtomicLong dirOffsetCounter;
private ConcurrentHashMap<Integer, AbstractNode> children;
@@ -46,22 +41,8 @@
private int storageClass;
private int locationClass;
- public static AbstractNode createRoot() throws IOException {
- return new DirectoryBlocks(new FileName("/").getFileComponent(), CrailNodeType.DIRECTORY, CrailConstants.STORAGE_ROOTCLASS, 0);
- }
-
- public static AbstractNode createNode(int fileComponent, CrailNodeType type, int storageClass, int locationClass) throws IOException {
- if (type == CrailNodeType.DIRECTORY){
- return new DirectoryBlocks(fileComponent, CrailNodeType.DIRECTORY, storageClass, locationClass);
- } else if (type == CrailNodeType.MULTIFILE){
- return new DirectoryBlocks(fileComponent, CrailNodeType.MULTIFILE, storageClass, locationClass);
- } else {
- return new FileBlocks(fileComponent, CrailNodeType.DATAFILE, storageClass, locationClass);
- }
- }
-
- public AbstractNode(int fileComponent, CrailNodeType type, int storageClass, int locationAffinity){
- super(fdcount.incrementAndGet(), type);
+ public AbstractNode(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationAffinity){
+ super(fd, type);
this.fileComponent = fileComponent;
this.storageClass = storageClass;
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java b/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
index 002ab48..bfcfd54 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
@@ -31,8 +31,8 @@
public class DirectoryBlocks extends AbstractNode {
private ConcurrentHashMap<Integer, BlockInfo> blocks;
- DirectoryBlocks(int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
- super(fileComponent, type, storageClass, locationClass);
+ DirectoryBlocks(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
+ super(fd, fileComponent, type, storageClass, locationClass);
this.blocks = new ConcurrentHashMap<Integer, BlockInfo>();
}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java b/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
index 9b2b348..1884143 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
@@ -37,8 +37,8 @@
private final Lock readLock;
private final Lock writeLock;
- public FileBlocks(int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
- super(fileComponent, type, storageClass, locationClass);
+ public FileBlocks(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
+ super(fd, fileComponent, type, storageClass, locationClass);
this.blocks = new ArrayList<BlockInfo>(CrailConstants.NAMENODE_FILEBLOCKS);
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java b/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
index 7d44631..30d2687 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
@@ -23,18 +23,31 @@
import java.io.IOException;
+import com.ibm.crail.CrailNodeType;
import com.ibm.crail.conf.CrailConstants;
import com.ibm.crail.metadata.FileName;
import com.ibm.crail.rpc.RpcErrors;
import com.ibm.crail.rpc.RpcNameNodeState;
public class FileStore {
+ private Sequencer sequencer;
private AbstractNode root;
- public FileStore() throws IOException {
- this.root = DirectoryBlocks.createRoot();
+ public FileStore(Sequencer sequencer) throws IOException {
+ this.sequencer = sequencer;
+ this.root = createNode(new FileName("/").getFileComponent(), CrailNodeType.DIRECTORY, CrailConstants.STORAGE_ROOTCLASS, 0);
}
+ public AbstractNode createNode(int fileComponent, CrailNodeType type, int storageClass, int locationClass) throws IOException {
+ if (type == CrailNodeType.DIRECTORY){
+ return new DirectoryBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.DIRECTORY, storageClass, locationClass);
+ } else if (type == CrailNodeType.MULTIFILE){
+ return new DirectoryBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.MULTIFILE, storageClass, locationClass);
+ } else {
+ return new FileBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.DATAFILE, storageClass, locationClass);
+ }
+ }
+
public AbstractNode retrieveFile(FileName filename, RpcNameNodeState error) throws Exception{
return retrieveFileInternal(filename, filename.getLength(), error);
}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java b/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
index 157fad6..f02dc00 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
@@ -21,38 +21,73 @@
package com.ibm.crail.namenode;
-import java.util.concurrent.DelayQueue;
-
+import java.net.URI;
+import java.util.Arrays;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import com.ibm.crail.conf.CrailConfiguration;
import com.ibm.crail.conf.CrailConstants;
import com.ibm.crail.rpc.RpcBinding;
+import com.ibm.crail.rpc.RpcNameNodeService;
import com.ibm.crail.utils.CrailUtils;
public class NameNode {
private static final Logger LOG = CrailUtils.getLogger();
public static void main(String args[]) throws Exception {
- CrailConfiguration conf = new CrailConfiguration();
-
LOG.info("initalizing namenode ");
+ CrailConfiguration conf = new CrailConfiguration();
CrailConstants.updateConstants(conf);
+
+ URI uri = CrailUtils.getPrimaryNameNode();
+ String address = uri.getHost();
+ int port = uri.getPort();
+
+ if (args != null) {
+ Option addressOption = Option.builder("a").desc("ip address namenode is started on").hasArg().build();
+ Option portOption = Option.builder("p").desc("port namenode is started on").hasArg().build();
+ Options options = new Options();
+ options.addOption(portOption);
+ options.addOption(addressOption);
+ CommandLineParser parser = new DefaultParser();
+
+ try {
+ CommandLine line = parser.parse(options, Arrays.copyOfRange(args, 0, args.length));
+ if (line.hasOption(addressOption.getOpt())) {
+ address = line.getOptionValue(addressOption.getOpt());
+ }
+ if (line.hasOption(portOption.getOpt())) {
+ port = Integer.parseInt(line.getOptionValue(portOption.getOpt()));
+ }
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("Namenode", options);
+ System.exit(-1);
+ }
+ }
+
+ String namenode = "crail://" + address + ":" + port;
+ long serviceId = CrailUtils.getServiceId(namenode);
+ long serviceSize = CrailUtils.getServiceSize();
+ if (!CrailUtils.verifyNamenode(namenode)){
+ throw new Exception("Namenode address/port [" + namenode + "] has to be listed in crail.namenode.address " + CrailConstants.NAMENODE_ADDRESS);
+ }
+
+ CrailConstants.NAMENODE_ADDRESS = namenode + "?id=" + serviceId + "&size=" + serviceSize;
CrailConstants.printConf();
CrailConstants.verify();
- DelayQueue<AbstractNode> deleteQueue = new DelayQueue<AbstractNode>();
- NameNodeService service = new NameNodeService(deleteQueue);
-
+ RpcNameNodeService service = RpcNameNodeService.createInstance(CrailConstants.NAMENODE_RPC_SERVICE);
RpcBinding rpcBinding = RpcBinding.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
rpcBinding.init(conf, null);
rpcBinding.printConf(LOG);
-
- GCServer gcServer = new GCServer(service, deleteQueue);
-
- Thread gc = new Thread(gcServer);
- gc.start();
-
rpcBinding.run(service);
System.exit(0);;
}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java b/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
index b405de5..3aac619 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
@@ -22,8 +22,11 @@
package com.ibm.crail.namenode;
import java.io.IOException;
+import java.net.URI;
+import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -41,26 +44,42 @@
import com.ibm.crail.rpc.RpcResponseMessage;
import com.ibm.crail.utils.CrailUtils;
-public class NameNodeService implements RpcNameNodeService {
+public class NameNodeService implements RpcNameNodeService, Sequencer {
private static final Logger LOG = CrailUtils.getLogger();
//data structures for datanodes, blocks, files
+ private long serviceId;
+ private long serviceSize;
+ private AtomicLong sequenceId;
private BlockStore blockStore;
private DelayQueue<AbstractNode> deleteQueue;
private FileStore fileTree;
private ConcurrentHashMap<Long, AbstractNode> fileTable;
-
+ private GCServer gcServer;
- NameNodeService(DelayQueue<AbstractNode> deleteQueue) throws IOException {
+ public NameNodeService() throws IOException {
+ URI uri = URI.create(CrailConstants.NAMENODE_ADDRESS);
+ String query = uri.getRawQuery();
+ StringTokenizer tokenizer = new StringTokenizer(query, "&");
+ this.serviceId = Long.parseLong(tokenizer.nextToken().substring(3));
+ this.serviceSize = Long.parseLong(tokenizer.nextToken().substring(5));
+ this.sequenceId = new AtomicLong(serviceId);
this.blockStore = new BlockStore();
- this.deleteQueue = deleteQueue;
- this.fileTree = new FileStore();
+ this.deleteQueue = new DelayQueue<AbstractNode>();
+ this.fileTree = new FileStore(this);
this.fileTable = new ConcurrentHashMap<Long, AbstractNode>();
+ this.gcServer = new GCServer(this, deleteQueue);
AbstractNode root = fileTree.getRoot();
fileTable.put(root.getFd(), root);
+ Thread gc = new Thread(gcServer);
+ gc.start();
}
+ public long getNextId(){
+ return sequenceId.getAndAdd(serviceSize);
+ }
+
@Override
public short createFile(RpcRequestMessage.CreateFileReq request, RpcResponseMessage.CreateFileRes response, RpcNameNodeState errorState) throws Exception {
//check protocol
@@ -99,7 +118,7 @@
locationClass = parentInfo.getLocationClass();
}
- AbstractNode fileInfo = FileBlocks.createNode(fileHash.getFileComponent(), type, storageClass, locationClass);
+ AbstractNode fileInfo = fileTree.createNode(fileHash.getFileComponent(), type, storageClass, locationClass);
if (!parentInfo.addChild(fileInfo)){
return RpcErrors.ERR_FILE_EXISTS;
}
@@ -387,6 +406,7 @@
return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
}
+ response.setServiceId(serviceId);
response.setFreeBlockCount(dnInfoNn.getBlockCount());
return RpcErrors.ERR_OK;
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java b/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java
new file mode 100644
index 0000000..96de5a8
--- /dev/null
+++ b/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java
@@ -0,0 +1,5 @@
+package com.ibm.crail.namenode;
+
+public interface Sequencer {
+ long getNextId();
+}
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
index afd1bb3..5a00ff5 100644
--- a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
@@ -21,61 +21,27 @@
package com.ibm.crail.namenode.rpc.darpc;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import org.slf4j.Logger;
-import com.ibm.crail.conf.CrailConfiguration;
import com.ibm.crail.rpc.RpcBinding;
-import com.ibm.crail.rpc.RpcConnection;
import com.ibm.crail.rpc.RpcNameNodeService;
import com.ibm.crail.utils.CrailUtils;
-import com.ibm.darpc.DaRPCClientEndpoint;
-import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.rdma.RdmaServerEndpoint;
-public class DaRPCNameNode implements RpcBinding {
+public class DaRPCNameNode extends DaRPCNameNodeClient implements RpcBinding {
private static final Logger LOG = CrailUtils.getLogger();
- private DaRPCClientGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientGroup;
- private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientEp;
-
private DaRPCServerGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeServerGroup;
private RdmaServerEndpoint<DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse>> namenodeServerEp;
public DaRPCNameNode(){
- this.namenodeClientEp = null;
- this.namenodeClientGroup = null;
this.namenodeServerEp = null;
this.namenodeServerGroup = null;
}
- public void init(CrailConfiguration conf, String[] args) throws IOException{
- DaRPCConstants.updateConstants(conf);
- DaRPCConstants.verify();
- }
-
- public void printConf(Logger logger){
- DaRPCConstants.printConf(logger);
- }
-
- @Override
- public RpcConnection connect(InetSocketAddress address) throws Exception {
- DaRPCNameNodeProtocol namenodeProtocol = new DaRPCNameNodeProtocol();
- this.namenodeClientGroup = DaRPCClientGroup.createClientGroup(namenodeProtocol, 100, DaRPCConstants.NAMENODE_DARPC_MAXINLINE, DaRPCConstants.NAMENODE_DARPC_RECVQUEUE, DaRPCConstants.NAMENODE_DARPC_SENDQUEUE);
- LOG.info("rpc group started, recvQueue " + namenodeClientGroup.recvQueueSize());
- this.namenodeClientEp = namenodeClientGroup.createEndpoint();
- InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
- LOG.info("connecting to namenode at " + nnAddr);
- URI uri = URI.create("rdma://" + nnAddr.getAddress().getHostAddress() + ":" + nnAddr.getPort());
- namenodeClientEp.connect(uri);
- DaRPCNameNodeClient namenodeClientRpc = new DaRPCNameNodeClient(namenodeClientEp);
- return namenodeClientRpc;
-
- }
-
@Override
public void run(RpcNameNodeService service) {
try {
@@ -109,14 +75,6 @@
@Override
public void close() {
try {
- if (namenodeClientEp != null){
- namenodeClientEp.close();
- namenodeClientEp = null;
- }
- if (namenodeClientGroup != null){
- namenodeClientGroup.close();
- namenodeClientGroup = null;
- }
if (namenodeServerEp != null){
namenodeServerEp.close();
namenodeServerEp = null;
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
index a457c51..688dbb4 100644
--- a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
@@ -1,284 +1,59 @@
-/*
- * Crail: A Multi-tiered Distributed Direct Access File System
- *
- * Author: Patrick Stuedi <stu@zurich.ibm.com>
- *
- * Copyright (C) 2016, IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
package com.ibm.crail.namenode.rpc.darpc;
import java.io.IOException;
-
+import java.net.InetSocketAddress;
+import java.net.URI;
import org.slf4j.Logger;
-
-import com.ibm.crail.CrailNodeType;
-import com.ibm.crail.conf.CrailConstants;
-import com.ibm.crail.metadata.BlockInfo;
-import com.ibm.crail.metadata.DataNodeInfo;
-import com.ibm.crail.metadata.FileInfo;
-import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.conf.CrailConfiguration;
+import com.ibm.crail.rpc.RpcClient;
import com.ibm.crail.rpc.RpcConnection;
-import com.ibm.crail.rpc.RpcCreateFile;
-import com.ibm.crail.rpc.RpcDeleteFile;
-import com.ibm.crail.rpc.RpcGetBlock;
-import com.ibm.crail.rpc.RpcGetDataNode;
-import com.ibm.crail.rpc.RpcGetFile;
-import com.ibm.crail.rpc.RpcGetLocation;
-import com.ibm.crail.rpc.RpcFuture;
-import com.ibm.crail.rpc.RpcPing;
-import com.ibm.crail.rpc.RpcProtocol;
-import com.ibm.crail.rpc.RpcRenameFile;
-import com.ibm.crail.rpc.RpcRequestMessage;
-import com.ibm.crail.rpc.RpcResponseMessage;
-import com.ibm.crail.rpc.RpcVoid;
import com.ibm.crail.utils.CrailUtils;
import com.ibm.darpc.DaRPCClientEndpoint;
-import com.ibm.darpc.DaRPCFuture;
-import com.ibm.darpc.DaRPCStream;
+import com.ibm.darpc.DaRPCClientGroup;
-public class DaRPCNameNodeClient implements RpcConnection {
+public class DaRPCNameNodeClient implements RpcClient {
private static final Logger LOG = CrailUtils.getLogger();
- private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> rpcEndpoint;
- private DaRPCStream<DaRPCNameNodeRequest, DaRPCNameNodeResponse> stream;
+ private DaRPCClientGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientGroup;
- public DaRPCNameNodeClient(DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) throws IOException {
- this.rpcEndpoint = endpoint;
- this.stream = endpoint.createStream();
- }
-
- @Override
- public RpcFuture<RpcCreateFile> createFile(FileName filename, CrailNodeType type, int storageClass, int locationClass) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: createFile, fileType " + type + ", storageClass " + storageClass + ", locationClass " + locationClass);
- }
-
- RpcRequestMessage.CreateFileReq createFileReq = new RpcRequestMessage.CreateFileReq(filename, type, storageClass, locationClass);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(createFileReq);
- request.setCommand(RpcProtocol.CMD_CREATE_FILE);
-
- RpcResponseMessage.CreateFileRes fileRes = new RpcResponseMessage.CreateFileRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcCreateFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcCreateFile>(future, fileRes);
-
- return nameNodeFuture;
+ public DaRPCNameNodeClient(){
+ this.namenodeClientGroup = null;
}
- @Override
- public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: getFile, writeable " + writeable);
- }
-
- RpcRequestMessage.GetFileReq getFileReq = new RpcRequestMessage.GetFileReq(filename, writeable);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getFileReq);
- request.setCommand(RpcProtocol.CMD_GET_FILE);
+ public void init(CrailConfiguration conf, String[] args) throws IOException{
+ DaRPCConstants.updateConstants(conf);
+ DaRPCConstants.verify();
+ }
+
+ public void printConf(Logger logger){
+ DaRPCConstants.printConf(logger);
+ }
- RpcResponseMessage.GetFileRes fileRes = new RpcResponseMessage.GetFileRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcGetFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetFile>(future, fileRes);
-
- return nameNodeFuture;
- }
-
@Override
- public DaRPCNameNodeFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: setFile, id " + fileInfo.getFd() + ", close " + close);
- }
+ public RpcConnection connect(InetSocketAddress address) throws Exception {
+ DaRPCNameNodeProtocol namenodeProtocol = new DaRPCNameNodeProtocol();
+ this.namenodeClientGroup = DaRPCClientGroup.createClientGroup(namenodeProtocol, 100, DaRPCConstants.NAMENODE_DARPC_MAXINLINE, DaRPCConstants.NAMENODE_DARPC_RECVQUEUE, DaRPCConstants.NAMENODE_DARPC_SENDQUEUE);
+ LOG.info("rpc group started, recvQueue " + namenodeClientGroup.recvQueueSize());
+ DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeEndopoint = namenodeClientGroup.createEndpoint();
+// LOG.info("connecting to namenode at " + address);
+ URI uri = URI.create("rdma://" + address.getAddress().getHostAddress() + ":" + address.getPort());
+ namenodeEndopoint.connect(uri);
+ DaRPCNameNodeConnection connection = new DaRPCNameNodeConnection(namenodeEndopoint);
+ return connection;
- RpcRequestMessage.SetFileReq setFileReq = new RpcRequestMessage.SetFileReq(fileInfo, close);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setFileReq);
- request.setCommand(RpcProtocol.CMD_SET_FILE);
-
- RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-
- return nameNodeFuture;
}
-
- @Override
- public DaRPCNameNodeFuture<RpcDeleteFile> removeFile(FileName filename, boolean recursive) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: removeFile");
- }
-
- RpcRequestMessage.RemoveFileReq removeReq = new RpcRequestMessage.RemoveFileReq(filename, recursive);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeReq);
- request.setCommand(RpcProtocol.CMD_REMOVE_FILE);
-
- RpcResponseMessage.DeleteFileRes fileRes = new RpcResponseMessage.DeleteFileRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcDeleteFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcDeleteFile>(future, fileRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcRenameFile> renameFile(FileName srcHash, FileName dstHash) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: renameFile");
- }
-
- RpcRequestMessage.RenameFileReq renameReq = new RpcRequestMessage.RenameFileReq(srcHash, dstHash);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(renameReq);
- request.setCommand(RpcProtocol.CMD_RENAME_FILE);
-
- RpcResponseMessage.RenameRes renameRes = new RpcResponseMessage.RenameRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(renameRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcRenameFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcRenameFile>(future, renameRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcGetBlock> getBlock(long fd, long token, long position, long capacity) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: getBlock, fd " + fd + ", token " + token + ", position " + position + ", capacity " + capacity);
- }
-
- RpcRequestMessage.GetBlockReq getBlockReq = new RpcRequestMessage.GetBlockReq(fd, token, position, capacity);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getBlockReq);
- request.setCommand(RpcProtocol.CMD_GET_BLOCK);
-
- RpcResponseMessage.GetBlockRes getBlockRes = new RpcResponseMessage.GetBlockRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getBlockRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcGetBlock> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetBlock>(future, getBlockRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcGetLocation> getLocation(FileName fileName, long position) throws IOException {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: getLocation, position " + position);
- }
-
- RpcRequestMessage.GetLocationReq getLocationReq = new RpcRequestMessage.GetLocationReq(fileName, position);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getLocationReq);
- request.setCommand(RpcProtocol.CMD_GET_LOCATION);
- RpcResponseMessage.GetLocationRes getLocationRes = new RpcResponseMessage.GetLocationRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getLocationRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcGetLocation> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetLocation>(future, getLocationRes);
-
- return nameNodeFuture;
- }
-
@Override
- public DaRPCNameNodeFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
- if (CrailConstants.DEBUG){
- LOG.debug("RPC: setBlock, ");
- }
-
- RpcRequestMessage.SetBlockReq setBlockReq = new RpcRequestMessage.SetBlockReq(blockInfo);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setBlockReq);
- request.setCommand(RpcProtocol.CMD_SET_BLOCK);
-
- RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo) throws Exception {
- RpcRequestMessage.GetDataNodeReq getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(dnInfo);
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getDataNodeReq);
- request.setCommand(RpcProtocol.CMD_GET_DATANODE);
-
- RpcResponseMessage.GetDataNodeRes getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getDataNodeRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcGetDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetDataNode>(future, getDataNodeRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcVoid> dumpNameNode() throws Exception {
-
-
- RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(dumpNameNodeReq);
- request.setCommand(RpcProtocol.CMD_DUMP_NAMENODE);
-
- RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-
- return nameNodeFuture;
- }
-
- @Override
- public DaRPCNameNodeFuture<RpcPing> pingNameNode() throws Exception {
-
- RpcRequestMessage.PingNameNodeReq pingReq = new RpcRequestMessage.PingNameNodeReq();
- DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(pingReq);
- request.setCommand(RpcProtocol.CMD_PING_NAMENODE);
-
- RpcResponseMessage.PingNameNodeRes pingRes = new RpcResponseMessage.PingNameNodeRes();
- DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(pingRes);
-
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-
- DaRPCNameNodeFuture<RpcPing> nameNodeFuture = new DaRPCNameNodeFuture<RpcPing>(future, pingRes);
-
- return nameNodeFuture;
- }
-
- private DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> issueRPC(DaRPCNameNodeRequest request, DaRPCNameNodeResponse response) throws IOException{
+ public void close() {
try {
- DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = stream.request(request, response, false);
- return future;
- } catch(IOException e){
- LOG.info("ERROR: RPC failed, messagesSend " + rpcEndpoint.getMessagesSent() + ", messagesReceived " + rpcEndpoint.getMessagesReceived() + ", isConnected " + rpcEndpoint.isConnected() + ", qpNum " + rpcEndpoint.getQp().getQp_num());
- throw e;
+ if (namenodeClientGroup != null){
+ namenodeClientGroup.close();
+ namenodeClientGroup = null;
+ }
+ } catch(Exception e){
+ e.printStackTrace();
+ LOG.info("Error while closing " + e.getMessage());
}
}
+
}
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
new file mode 100644
index 0000000..96a2ea3
--- /dev/null
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
@@ -0,0 +1,301 @@
+/*
+ * Crail: A Multi-tiered Distributed Direct Access File System
+ *
+ * Author: Patrick Stuedi <stu@zurich.ibm.com>
+ *
+ * Copyright (C) 2016, IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.crail.namenode.rpc.darpc;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+
+import com.ibm.crail.CrailNodeType;
+import com.ibm.crail.conf.CrailConstants;
+import com.ibm.crail.metadata.BlockInfo;
+import com.ibm.crail.metadata.DataNodeInfo;
+import com.ibm.crail.metadata.FileInfo;
+import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.rpc.RpcConnection;
+import com.ibm.crail.rpc.RpcCreateFile;
+import com.ibm.crail.rpc.RpcDeleteFile;
+import com.ibm.crail.rpc.RpcGetBlock;
+import com.ibm.crail.rpc.RpcGetDataNode;
+import com.ibm.crail.rpc.RpcGetFile;
+import com.ibm.crail.rpc.RpcGetLocation;
+import com.ibm.crail.rpc.RpcFuture;
+import com.ibm.crail.rpc.RpcPing;
+import com.ibm.crail.rpc.RpcProtocol;
+import com.ibm.crail.rpc.RpcRenameFile;
+import com.ibm.crail.rpc.RpcRequestMessage;
+import com.ibm.crail.rpc.RpcResponseMessage;
+import com.ibm.crail.rpc.RpcVoid;
+import com.ibm.crail.utils.CrailUtils;
+import com.ibm.darpc.DaRPCClientEndpoint;
+import com.ibm.darpc.DaRPCFuture;
+import com.ibm.darpc.DaRPCStream;
+
+public class DaRPCNameNodeConnection implements RpcConnection {
+ private static final Logger LOG = CrailUtils.getLogger();
+
+ private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> rpcEndpoint;
+ private DaRPCStream<DaRPCNameNodeRequest, DaRPCNameNodeResponse> stream;
+
+ public DaRPCNameNodeConnection(DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) throws IOException {
+ this.rpcEndpoint = endpoint;
+ this.stream = endpoint.createStream();
+ }
+
+ @Override
+ public RpcFuture<RpcCreateFile> createFile(FileName filename, CrailNodeType type, int storageClass, int locationClass) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: createFile, fileType " + type + ", storageClass " + storageClass + ", locationClass " + locationClass);
+ }
+
+ RpcRequestMessage.CreateFileReq createFileReq = new RpcRequestMessage.CreateFileReq(filename, type, storageClass, locationClass);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(createFileReq);
+ request.setCommand(RpcProtocol.CMD_CREATE_FILE);
+
+ RpcResponseMessage.CreateFileRes fileRes = new RpcResponseMessage.CreateFileRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcCreateFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcCreateFile>(future, fileRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: getFile, writeable " + writeable);
+ }
+
+ RpcRequestMessage.GetFileReq getFileReq = new RpcRequestMessage.GetFileReq(filename, writeable);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getFileReq);
+ request.setCommand(RpcProtocol.CMD_GET_FILE);
+
+ RpcResponseMessage.GetFileRes fileRes = new RpcResponseMessage.GetFileRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcGetFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetFile>(future, fileRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: setFile, id " + fileInfo.getFd() + ", close " + close);
+ }
+
+ RpcRequestMessage.SetFileReq setFileReq = new RpcRequestMessage.SetFileReq(fileInfo, close);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setFileReq);
+ request.setCommand(RpcProtocol.CMD_SET_FILE);
+
+ RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcDeleteFile> removeFile(FileName filename, boolean recursive) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: removeFile");
+ }
+
+ RpcRequestMessage.RemoveFileReq removeReq = new RpcRequestMessage.RemoveFileReq(filename, recursive);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeReq);
+ request.setCommand(RpcProtocol.CMD_REMOVE_FILE);
+
+ RpcResponseMessage.DeleteFileRes fileRes = new RpcResponseMessage.DeleteFileRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcDeleteFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcDeleteFile>(future, fileRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcRenameFile> renameFile(FileName srcHash, FileName dstHash) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: renameFile");
+ }
+
+ RpcRequestMessage.RenameFileReq renameReq = new RpcRequestMessage.RenameFileReq(srcHash, dstHash);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(renameReq);
+ request.setCommand(RpcProtocol.CMD_RENAME_FILE);
+
+ RpcResponseMessage.RenameRes renameRes = new RpcResponseMessage.RenameRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(renameRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcRenameFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcRenameFile>(future, renameRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcGetBlock> getBlock(long fd, long token, long position, long capacity) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: getBlock, fd " + fd + ", token " + token + ", position " + position + ", capacity " + capacity);
+ }
+
+ RpcRequestMessage.GetBlockReq getBlockReq = new RpcRequestMessage.GetBlockReq(fd, token, position, capacity);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getBlockReq);
+ request.setCommand(RpcProtocol.CMD_GET_BLOCK);
+
+ RpcResponseMessage.GetBlockRes getBlockRes = new RpcResponseMessage.GetBlockRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getBlockRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcGetBlock> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetBlock>(future, getBlockRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcGetLocation> getLocation(FileName fileName, long position) throws IOException {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: getLocation, position " + position);
+ }
+
+ RpcRequestMessage.GetLocationReq getLocationReq = new RpcRequestMessage.GetLocationReq(fileName, position);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getLocationReq);
+ request.setCommand(RpcProtocol.CMD_GET_LOCATION);
+
+ RpcResponseMessage.GetLocationRes getLocationRes = new RpcResponseMessage.GetLocationRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getLocationRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcGetLocation> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetLocation>(future, getLocationRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
+ if (CrailConstants.DEBUG){
+ LOG.debug("RPC: setBlock, ");
+ }
+
+ RpcRequestMessage.SetBlockReq setBlockReq = new RpcRequestMessage.SetBlockReq(blockInfo);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setBlockReq);
+ request.setCommand(RpcProtocol.CMD_SET_BLOCK);
+
+ RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo) throws Exception {
+ RpcRequestMessage.GetDataNodeReq getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(dnInfo);
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getDataNodeReq);
+ request.setCommand(RpcProtocol.CMD_GET_DATANODE);
+
+ RpcResponseMessage.GetDataNodeRes getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getDataNodeRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcGetDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetDataNode>(future, getDataNodeRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcVoid> dumpNameNode() throws Exception {
+
+
+ RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(dumpNameNodeReq);
+ request.setCommand(RpcProtocol.CMD_DUMP_NAMENODE);
+
+ RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public DaRPCNameNodeFuture<RpcPing> pingNameNode() throws Exception {
+
+ RpcRequestMessage.PingNameNodeReq pingReq = new RpcRequestMessage.PingNameNodeReq();
+ DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(pingReq);
+ request.setCommand(RpcProtocol.CMD_PING_NAMENODE);
+
+ RpcResponseMessage.PingNameNodeRes pingRes = new RpcResponseMessage.PingNameNodeRes();
+ DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(pingRes);
+
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+
+ DaRPCNameNodeFuture<RpcPing> nameNodeFuture = new DaRPCNameNodeFuture<RpcPing>(future, pingRes);
+
+ return nameNodeFuture;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (rpcEndpoint != null){
+ rpcEndpoint.close();
+ rpcEndpoint = null;
+ }
+ }
+
+ private DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> issueRPC(DaRPCNameNodeRequest request, DaRPCNameNodeResponse response) throws IOException{
+ try {
+ DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = stream.request(request, response, false);
+ return future;
+ } catch(IOException e){
+ LOG.info("ERROR: RPC failed, messagesSend " + rpcEndpoint.getMessagesSent() + ", messagesReceived " + rpcEndpoint.getMessagesReceived() + ", isConnected " + rpcEndpoint.isConnected() + ", qpNum " + rpcEndpoint.getQp().getQp_num());
+ throw e;
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return rpcEndpoint.getDstAddr().toString();
+ } catch(Exception e){
+ return "Unknown";
+ }
+ }
+}
diff --git a/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java b/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
index b5ba5f9..62d339f 100644
--- a/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
+++ b/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
@@ -22,6 +22,7 @@
package com.ibm.crail.rpc;
public interface RpcNameNodeService {
+
public abstract short createFile(RpcRequestMessage.CreateFileReq request,
RpcResponseMessage.CreateFileRes response, RpcNameNodeState errorState)
throws Exception;
@@ -67,4 +68,16 @@
public abstract short ping(RpcRequestMessage.PingNameNodeReq request,
RpcResponseMessage.PingNameNodeRes response, RpcNameNodeState errorState)
throws Exception;
+
+ @SuppressWarnings("unchecked")
+ public static RpcNameNodeService createInstance(String name) throws Exception {
+ Class<?> serviceClass = Class.forName(name);
+ if (RpcNameNodeService.class.isAssignableFrom(serviceClass)){
+ Class<? extends RpcNameNodeService> rpcService = (Class<? extends RpcNameNodeService>) serviceClass;
+ RpcNameNodeService service = rpcService.newInstance();
+ return service;
+ } else {
+ throw new Exception("Cannot instantiate RPC service of type " + name);
+ }
+ }
}
diff --git a/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
index b189a4f..0e5b338 100644
--- a/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-
import com.ibm.crail.metadata.BlockInfo;
import com.ibm.crail.metadata.DataNodeStatistics;
import com.ibm.crail.metadata.FileInfo;
@@ -592,6 +591,10 @@
public short getError(){
return 0;
+ }
+
+ public void setServiceId(long serviceId) {
+ this.statistics.setServiceId(serviceId);
}
}
diff --git a/storage/src/main/java/com/ibm/crail/storage/StorageServer.java b/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
index c03d23b..fef5458 100644
--- a/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
+++ b/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
@@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -41,6 +42,7 @@
import com.ibm.crail.metadata.DataNodeStatistics;
import com.ibm.crail.rpc.RpcClient;
import com.ibm.crail.rpc.RpcConnection;
+import com.ibm.crail.rpc.RpcDispatcher;
import com.ibm.crail.utils.CrailUtils;
public interface StorageServer {
@@ -122,29 +124,63 @@
storageTier.init(conf, extraParams);
storageTier.printConf(LOG);
- InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
RpcClient rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
rpcClient.init(conf, args);
rpcClient.printConf(LOG);
- RpcConnection rpcConnection = rpcClient.connect(nnAddr);
- LOG.info("connected to namenode at " + nnAddr);
+
+ ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+ ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+ while(!namenodeList.isEmpty()){
+ InetSocketAddress address = namenodeList.poll();
+ RpcConnection connection = rpcClient.connect(address);
+ connectionList.add(connection);
+ }
+ RpcConnection rpcConnection = connectionList.peek();
+ if (connectionList.size() > 1){
+ rpcConnection = new RpcDispatcher(connectionList);
+ }
+ LOG.info("connected to namenode(s) " + rpcConnection.toString());
StorageServer server = storageTier.launchServer();
StorageRpcClient storageRpc = new StorageRpcClient(storageType, CrailStorageClass.get(storageClass), server.getAddress(), rpcConnection);
+ HashMap<Long, Long> blockCount = new HashMap<Long, Long>();
+ long sumCount = 0;
while (server.isAlive()) {
StorageResource resource = server.allocateResource();
if (resource == null){
break;
} else {
storageRpc.setBlock(resource.getAddress(), resource.getLength(), resource.getKey());
- LOG.info("datanode statistics, freeBlocks " + storageRpc.getDataNode().getFreeBlockCount());
+ DataNodeStatistics stats = storageRpc.getDataNode();
+ long newCount = stats.getFreeBlockCount();
+ long serviceId = stats.getServiceId();
+
+ long oldCount = 0;
+ if (blockCount.containsKey(serviceId)){
+ oldCount = blockCount.get(serviceId);
+ }
+ long diffCount = newCount - oldCount;
+ blockCount.put(serviceId, newCount);
+ sumCount += diffCount;
+ LOG.info("datanode statistics, freeBlocks " + sumCount);
}
}
while (server.isAlive()) {
- DataNodeStatistics statistics = storageRpc.getDataNode();
- LOG.info("datanode statistics, freeBlocks " + statistics.getFreeBlockCount());
+ DataNodeStatistics stats = storageRpc.getDataNode();
+ long newCount = stats.getFreeBlockCount();
+ long serviceId = stats.getServiceId();
+
+ long oldCount = 0;
+ if (blockCount.containsKey(serviceId)){
+ oldCount = blockCount.get(serviceId);
+ }
+ long diffCount = newCount - oldCount;
+ blockCount.put(serviceId, newCount);
+ sumCount += diffCount;
+
+ LOG.info("datanode statistics, freeBlocks " + sumCount);
Thread.sleep(2000);
}
}