blob: e69acdbc6d02aaf9bbb29edb4b61c4dc59928174 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.storage.rdma.client;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.crail.CrailBuffer;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.memory.OffHeapBuffer;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.storage.StorageUtils;
import org.apache.crail.storage.rdma.RdmaConstants;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
import com.ibm.disni.verbs.*;
import sun.misc.Unsafe;
public class RdmaStorageLocalEndpoint implements StorageEndpoint {
private static final Logger LOG = CrailUtils.getLogger();
private ConcurrentHashMap<Long, CrailBuffer> bufferMap;
private Unsafe unsafe;
private InetSocketAddress address;
public RdmaStorageLocalEndpoint(InetSocketAddress datanodeAddr) throws Exception {
LOG.info("new local endpoint for address " + datanodeAddr);
String dataPath = StorageUtils.getDatanodeDirectory(RdmaConstants.STORAGE_RDMA_DATA_PATH, datanodeAddr);
File dataDir = new File(dataPath);
if (!dataDir.exists()){
throw new Exception("Local RDMA data path missing");
}
this.address = datanodeAddr;
this.bufferMap = new ConcurrentHashMap<Long, CrailBuffer>();
this.unsafe = getUnsafe();
for (File dataFile : dataDir.listFiles()) {
long lba = Long.parseLong(dataFile.getName());
OffHeapBuffer offHeapBuffer = OffHeapBuffer.wrap(mmap(dataFile));
bufferMap.put(lba, offHeapBuffer);
}
}
@Override
public StorageFuture write(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException,
InterruptedException {
if (buffer.remaining() > CrailConstants.BLOCK_SIZE){
throw new IOException("write size too large " + buffer.remaining());
}
if (buffer.remaining() <= 0){
throw new IOException("write size too small, len " + buffer.remaining());
}
if (remoteOffset < 0){
throw new IOException("remote offset too small " + remoteOffset);
}
long alignedLba = getAlignedLba(remoteMr.getLba());
long lbaOffset = getLbaOffset(remoteMr.getLba());
CrailBuffer mappedBuffer = bufferMap.get(alignedLba);
if (mappedBuffer == null){
throw new IOException("No mapped buffer for this key " + remoteMr.getLkey() + ", address " + address);
}
if (lbaOffset + remoteOffset + buffer.remaining() > RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE){
long tmpAddr = lbaOffset + remoteOffset + buffer.remaining();
throw new IOException("remote fileOffset + remoteOffset + len too large " + tmpAddr);
}
long srcAddr = buffer.address() + buffer.position();
long dstAddr = mappedBuffer.address() + lbaOffset + remoteOffset;
RdmaLocalFuture future = new RdmaLocalFuture(unsafe, srcAddr, dstAddr, buffer.remaining());
return future;
}
@Override
public StorageFuture read(CrailBuffer buffer, BlockInfo remoteMr, long remoteOffset) throws IOException,
InterruptedException {
if (buffer.remaining() > CrailConstants.BLOCK_SIZE){
throw new IOException("read size too large");
}
if (buffer.remaining() <= 0){
throw new IOException("read size too small, len " + buffer.remaining());
}
if (remoteOffset < 0){
throw new IOException("remote offset too small " + remoteOffset);
}
if (buffer.position() < 0){
throw new IOException("local offset too small " + buffer.position());
}
long alignedLba = getAlignedLba(remoteMr.getLba());
long lbaOffset = getLbaOffset(remoteMr.getLba());
CrailBuffer mappedBuffer = bufferMap.get(alignedLba);
if (mappedBuffer == null){
throw new IOException("No mapped buffer for this key");
}
if (lbaOffset + remoteOffset + buffer.remaining() > RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE){
long tmpAddr = lbaOffset + remoteOffset + buffer.remaining();
throw new IOException("remote fileOffset + remoteOffset + len too large " + tmpAddr);
}
long srcAddr = mappedBuffer.address() + lbaOffset + remoteOffset;
long dstAddr = buffer.address() + buffer.position();
RdmaLocalFuture future = new RdmaLocalFuture(unsafe, srcAddr, dstAddr, buffer.remaining());
return future;
}
private static long getAlignedLba(long remoteLba){
return remoteLba / RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE;
}
private static long getLbaOffset(long remoteLba){
return remoteLba % RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE;
}
@Override
public void close() throws IOException, InterruptedException {
}
public void connect(SocketAddress inetAddress, int i)
throws InterruptedException, IOException {
}
public int getEndpointId() {
return 0;
}
public int getFreeSlots() {
return 0;
}
public boolean isConnected() {
return true;
}
public IbvQP getQp() {
return null;
}
public String getAddress() throws IOException {
return null;
}
public RdmaCmId getContext() {
return null;
}
private MappedByteBuffer mmap(File file) throws IOException{
RandomAccessFile randomFile = new RandomAccessFile(file.getAbsolutePath(), "rw");
FileChannel channel = randomFile.getChannel();
MappedByteBuffer mappedBuffer = channel.map(MapMode.READ_WRITE, 0, RdmaConstants.STORAGE_RDMA_ALLOCATION_SIZE);
randomFile.close();
return mappedBuffer;
}
private Unsafe getUnsafe() throws Exception {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
return unsafe;
}
@Override
public boolean isLocal() {
return true;
}
}