blob: cb25277e679e03933d88b9f5b08e9629fc27eabf [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.storage.rdma.client;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.crail.CrailBuffer;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.storage.rdma.MrCache;
import org.apache.crail.storage.rdma.RdmaConstants;
import org.apache.crail.storage.rdma.MrCache.DeviceMrCache;
import org.apache.crail.utils.AtomicIntegerModulo;
import com.ibm.disni.verbs.*;
import com.ibm.disni.verbs.SVCPostSend.SendWRMod;
import com.ibm.disni.verbs.SVCPostSend.SgeMod;
import com.ibm.disni.*;
public class RdmaStorageActiveEndpoint extends RdmaActiveEndpoint implements StorageEndpoint {
private LinkedBlockingQueue<SVCPostSend> writeOps;
private LinkedBlockingQueue<SVCPostSend> readOps;
private AtomicIntegerModulo opcount;
private Semaphore sendQueueAvailable;
private ConcurrentHashMap<Long, RdmaActiveFuture> futureMap;
private MrCache mrCache;
private DeviceMrCache deviceCache;
public RdmaStorageActiveEndpoint(RdmaStorageActiveGroup group, RdmaCmId id, boolean serverSide) throws IOException {
super(group, id, serverSide);
writeOps = new LinkedBlockingQueue<SVCPostSend>();
readOps = new LinkedBlockingQueue<SVCPostSend>();
this.opcount = new AtomicIntegerModulo();
this.futureMap = new ConcurrentHashMap<Long, RdmaActiveFuture>();
this.sendQueueAvailable = new Semaphore(RdmaConstants.STORAGE_RDMA_QUEUESIZE);
this.mrCache = group.getMrCache();
this.deviceCache = null;
}
@Override
protected synchronized void init() throws IOException {
super.init();
for (int i = 0; i < RdmaConstants.STORAGE_RDMA_QUEUESIZE; i++){
SVCPostSend write = initWriteOp();
writeOps.add(write);
SVCPostSend read = initReadOp();
readOps.add(read);
}
}
private SVCPostSend initWriteOp() throws IOException {
LinkedList<IbvSendWR> wrList_send = new LinkedList<IbvSendWR>();
IbvSendWR writeWR = new IbvSendWR();
writeWR.setWr_id(opcount.getAndIncrement());
writeWR.setOpcode(IbvSendWR.IBV_WR_RDMA_WRITE);
LinkedList<IbvSge> sgeListWrite = new LinkedList<IbvSge>();
IbvSge sgeSendWrite = new IbvSge();
sgeListWrite.add(sgeSendWrite);
writeWR.setSg_list(sgeListWrite);
wrList_send.add(writeWR);
IbvSendWR readWR = new IbvSendWR();
readWR.setWr_id(opcount.getAndIncrement());
readWR.setOpcode(IbvSendWR.IBV_WR_RDMA_READ);
readWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
LinkedList<IbvSge> sgeListRead = new LinkedList<IbvSge>();
IbvSge sgeSendRead = new IbvSge();
sgeSendRead.setLength(1);
sgeListRead.add(sgeSendRead);
readWR.setSg_list(sgeListRead);
wrList_send.add(readWR);
SVCPostSend rdmaOp = this.postSend(wrList_send);
return rdmaOp;
}
private SVCPostSend initReadOp() throws IOException{
LinkedList<IbvSendWR> wrList_send = new LinkedList<IbvSendWR>();
LinkedList<IbvSge> sgeList = new LinkedList<IbvSge>();
IbvSge sgeSend = new IbvSge();
IbvSendWR sendWR = new IbvSendWR();
sgeList.add(sgeSend);
sendWR.setSg_list(sgeList);
wrList_send.add(sendWR);
sendWR.setWr_id(opcount.getAndIncrement());
sendWR.setOpcode(IbvSendWR.IBV_WR_RDMA_READ);
sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
SVCPostSend rdmaOp = this.postSend(wrList_send);
return rdmaOp;
}
@Override
public StorageFuture write(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException, InterruptedException {
if (buffer.remaining() > CrailConstants.BLOCK_SIZE){
throw new IOException("write size too large " + buffer.remaining());
}
if (buffer.remaining() <= 0){
throw new IOException("write size too small, len " + buffer.remaining());
}
if (remoteOffset < 0){
throw new IOException("remote offset too small " + remoteOffset);
}
if (remoteMr.getAddr() == 0){
throw new IOException("remote addr is 0 " + remoteMr.getAddr());
}
if (remoteMr.getLkey() == 0){
throw new IOException("remote key is 0 " + remoteMr.getLkey());
}
if (deviceCache == null){
deviceCache = mrCache.getDeviceCache(this.getPd());
}
IbvMr localMr = deviceCache.get(buffer.getRegion());
if (localMr == null){
localMr = this.registerMemory(buffer.getRegion().getByteBuffer()).execute().free().getMr();
deviceCache.put(localMr);
}
long bufferAddress = buffer.address();
SVCPostSend writeOp = writeOps.take();
SendWRMod sendWriteWR = writeOp.getWrMod(0);
sendWriteWR.setWr_id(opcount.getAndIncrement());
sendWriteWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
sendWriteWR.getRdmaMod().setRkey(remoteMr.getLkey());
SgeMod sgeSendWrite = writeOp.getWrMod(0).getSgeMod(0);
sgeSendWrite.setAddr(bufferAddress + buffer.position());
sgeSendWrite.setLength(buffer.remaining());
sgeSendWrite.setLkey(localMr.getLkey());
SendWRMod sendReadWR = writeOp.getWrMod(1);
sendReadWR.setWr_id(opcount.getAndIncrement());
sendReadWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
sendReadWR.getRdmaMod().setRkey(remoteMr.getLkey());
SgeMod sgeSendRead = writeOp.getWrMod(1).getSgeMod(0);
sgeSendRead.setAddr(bufferAddress + buffer.position());
sgeSendRead.setLkey(localMr.getLkey());
sendQueueAvailable.acquire();
sendQueueAvailable.acquire();
if (writeOp.getWrMod(0).getRdmaMod().getRkey() == 0){
throw new IOException("stag is zero, can't be");
}
if (writeOp.getWrMod(1).getRdmaMod().getRkey() == 0){
throw new IOException("stag is zero, can't be");
}
RdmaActiveFuture future = new RdmaActiveFuture(sendReadWR.getWr_id(), sgeSendWrite.getLength(), true);
futureMap.put(future.getWrid(), future);
writeOp.execute();
writeOps.add(writeOp);
return future;
}
@Override
public StorageFuture read(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException, InterruptedException {
if (buffer.remaining() > CrailConstants.BLOCK_SIZE){
throw new IOException("read size too large");
}
if (buffer.remaining() <= 0){
throw new IOException("read size too small, len " + buffer.remaining());
}
if (remoteOffset < 0){
throw new IOException("remote offset too small " + remoteOffset);
}
if (remoteMr.getAddr() == 0){
throw new IOException("remote addr is 0 " + remoteMr.getAddr());
}
if (remoteMr.getLkey() == 0){
throw new IOException("remote key is 0 " + remoteMr.getLkey());
}
if (deviceCache == null){
deviceCache = mrCache.getDeviceCache(this.getPd());
}
IbvMr localMr = deviceCache.get(buffer.getRegion());
if (localMr == null){
localMr = this.registerMemory(buffer.getRegion().getByteBuffer()).execute().free().getMr();
deviceCache.put(localMr);
}
long bufferAddress = buffer.address();
SVCPostSend readOp = readOps.take();
SendWRMod sendWR = readOp.getWrMod(0);
sendWR.setWr_id(opcount.getAndIncrement());
SgeMod sgeSend = sendWR.getSgeMod(0);
sgeSend.setAddr(bufferAddress + buffer.position());
sgeSend.setLength(buffer.remaining());
sgeSend.setLkey(localMr.getLkey());
sendWR.getRdmaMod().setRemote_addr(remoteMr.getAddr() + remoteOffset);
sendWR.getRdmaMod().setRkey(remoteMr.getLkey());
sendQueueAvailable.acquire();
if (readOp.getWrMod(0).getRdmaMod().getRkey() == 0){
throw new IOException("stag is zero, can't be");
}
RdmaActiveFuture future = new RdmaActiveFuture(sendWR.getWr_id(), sgeSend.getLength(), false);
futureMap.put(future.getWrid(), future);
readOp.execute();
readOps.add(readOp);
return future;
}
@Override
public void dispatchCqEvent(IbvWC wc) throws IOException {
if (wc.getStatus() == 0){
RdmaActiveFuture future = futureMap.remove(wc.getWr_id());
if (future != null){
future.signal();
if (future.isWrite()){
sendQueueAvailable.release(2);
} else {
sendQueueAvailable.release();
}
} else {
throw new IOException("cannot find future object for wrid " + wc.getWr_id() + ", status " + wc.getStatus() + ", opcount " + opcount + ", wc.qpnum " + wc.getQp_num() + ", this.qp.num " + this.qp.getQp_num() + ", connstate " + this.getConnState() + ", futureMap.size " + futureMap.size());
}
} else if (wc.getStatus() == 5){
} else {
throw new IOException("error in wc, status " + wc.getStatus());
}
}
@Override
public void close() throws IOException, InterruptedException {
super.close();
}
public int getFreeSlots() {
return this.sendQueueAvailable.availablePermits();
}
public String getAddress() throws IOException {
return super.getDstAddr().toString();
}
public RdmaCmId getContext() {
return super.getIdPriv();
}
@Override
public boolean isLocal() {
return false;
}
}