blob: ff64123068e83c0de95117c651dae0ef6c4bb902 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.storage.nvmf.client;
import org.apache.crail.CrailBuffer;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.storage.StorageResult;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class NvmfUnalignedWriteFuture implements StorageFuture {
private final NvmfStorageEndpoint endpoint;
private StorageFuture beginFuture;
private StorageFuture middleFuture;
private StorageFuture endFuture;
private final int written;
private NvmfStagingBufferCache.BufferCacheEntry beginBuffer;
private NvmfStagingBufferCache.BufferCacheEntry endBuffer;
private final boolean isSectorAligned(long address) {
return address % endpoint.getLBADataSize() == 0;
}
private final long floorToSectorSize(long address) {
return address - offsetInSector(address);
}
private final int floorToSectorSize(int length) {
return length - offsetInSector(length);
}
private final int leftInSector(long address) {
return endpoint.getLBADataSize() - offsetInSector(address);
}
private final int offsetInSector(long address) {
return (int)(address % endpoint.getLBADataSize());
}
NvmfUnalignedWriteFuture(NvmfStorageEndpoint endpoint, CrailBuffer buffer, BlockInfo blockInfo, long remoteOffset) throws Exception {
this.endpoint = endpoint;
this.written = buffer.remaining();
/* assume blockInfo.getAddr() is sector aligned */
assert isSectorAligned(blockInfo.getAddr());
long nextRemoteOffset = remoteOffset;
/* beginning */
if (!isSectorAligned(remoteOffset)) {
int copySize = Math.min(leftInSector(remoteOffset), buffer.remaining());
nextRemoteOffset = remoteOffset + copySize;
int oldLimit = buffer.limit();
buffer.limit(buffer.position() + copySize);
long alignedRemoteOffset = floorToSectorSize(remoteOffset);
long alignedRemoteAddress = blockInfo.getAddr() + alignedRemoteOffset;
beginBuffer = endpoint.getStagingBufferCache().getExisting(alignedRemoteAddress);
if (beginBuffer == null) {
/* we had to delete the old buffer because we ran out of space. This should happen rarely. */
beginBuffer = endpoint.getStagingBufferCache().get(alignedRemoteAddress);
endpoint.read(beginBuffer.getBuffer(), blockInfo, alignedRemoteOffset).get();
} else {
/* Wait for previous end operation to finish */
beginBuffer.getFuture().get();
}
CrailBuffer stagingBuffer = beginBuffer.getBuffer();
stagingBuffer.position(offsetInSector(remoteOffset));
stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
buffer.limit(oldLimit);
stagingBuffer.position(0);
beginFuture = endpoint.write(stagingBuffer, blockInfo, alignedRemoteOffset);
beginBuffer.setFuture(beginFuture);
stagingBuffer.position(offsetInSector(remoteOffset));
}
/* middle */
if (isSectorAligned(nextRemoteOffset) && buffer.remaining() >= endpoint.getLBADataSize()) {
int oldLimit = buffer.limit();
buffer.limit(buffer.position() + floorToSectorSize(buffer.remaining()));
int toWrite = buffer.remaining();
middleFuture = endpoint.write(buffer, blockInfo, nextRemoteOffset);
nextRemoteOffset += toWrite;
buffer.position(buffer.limit());
buffer.limit(oldLimit);
}
/* end */
if (buffer.remaining() > 0) {
endBuffer = endpoint.getStagingBufferCache().get(blockInfo.getAddr() + nextRemoteOffset);
CrailBuffer stagingBuffer = endBuffer.getBuffer();
stagingBuffer.position(0);
stagingBuffer.getByteBuffer().put(buffer.getByteBuffer());
stagingBuffer.position(0);
endFuture = endpoint.write(stagingBuffer, blockInfo, nextRemoteOffset);
endBuffer.setFuture(endFuture);
}
}
@Override
public boolean isSynchronous() {
return false;
}
@Override
public boolean cancel(boolean b) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
private static boolean checkIfFutureIsDone(StorageFuture future) {
return (future != null && future.isDone()) || future == null;
}
@Override
public boolean isDone() {
if (beginFuture != null && beginFuture.isDone()) {
if (beginBuffer != null) {
beginBuffer.put();
beginBuffer = null;
}
}
if (endFuture != null && endFuture.isDone()) {
if (endBuffer != null) {
endBuffer.put();
endBuffer = null;
}
}
return beginBuffer == null && checkIfFutureIsDone(middleFuture) && endBuffer == null;
}
@Override
public StorageResult get() throws InterruptedException, ExecutionException {
try {
return get(2, TimeUnit.MINUTES);
} catch (TimeoutException e) {
throw new ExecutionException(e);
}
}
@Override
public StorageResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
if (!isDone()) {
long start = System.nanoTime();
long end = start + TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
boolean waitTimeOut;
do {
waitTimeOut = System.nanoTime() > end;
} while (!isDone() && !waitTimeOut);
if (!isDone() && waitTimeOut) {
throw new TimeoutException("poll wait time out!");
}
}
if (beginFuture != null) {
beginFuture.get();
}
if (middleFuture != null) {
middleFuture.get();
}
if (endFuture != null) {
endFuture.get();
}
return () -> written;
}
}