blob: 71d6c0da899f74463d1a3e3089f1e93b31c1d4b5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.core;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.crail.CrailBuffer;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.metadata.FileInfo;
import org.apache.crail.rpc.RpcConnection;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcFuture;
import org.apache.crail.rpc.RpcGetBlock;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.utils.BufferCheckpoint;
import org.apache.crail.utils.CrailUtils;
import org.apache.crail.utils.EndpointCache;
import org.apache.crail.utils.BlockCache.FileBlockCache;
import org.apache.crail.utils.NextBlockCache.FileNextBlockCache;
import org.slf4j.Logger;
public abstract class CoreStream {
private static final Logger LOG = CrailUtils.getLogger();
protected CoreDataStore fs;
protected CoreNode node;
private EndpointCache endpointCache;
private RpcConnection namenodeClientRpc;
private FileBlockCache blockCache;
private FileNextBlockCache nextBlockCache;
private BufferCheckpoint bufferCheckpoint;
private FileInfo fileInfo;
private long position;
private long syncedCapacity;
private long streamId;
private CoreIOStatistics ioStats;
private HashMap<Integer, CoreSubOperation> blockMap;
private LinkedList<RpcFuture<RpcGetBlock>> pendingBlocks;
abstract StorageFuture trigger(StorageEndpoint endpoint, CoreSubOperation opDesc, CrailBuffer buffer, BlockInfo block) throws Exception;
abstract void update(long newCapacity);
CoreStream(CoreNode node, long streamId, long fileOffset) throws Exception {
this.node = node;
this.fs = node.getFileSystem();
this.fileInfo = node.getFileInfo();
this.endpointCache = fs.getDatanodeEndpointCache();
this.namenodeClientRpc = fs.getNamenodeClientRpc();
this.blockCache = fs.getBlockCache(fileInfo.getFd());
this.nextBlockCache = fs.getNextBlockCache(fileInfo.getFd());
this.bufferCheckpoint = fs.getBufferCheckpoint();
this.position = fileOffset;
this.syncedCapacity = fileInfo.getCapacity();
this.streamId = streamId;
this.ioStats = new CoreIOStatistics("core");
this.blockMap = new HashMap<Integer, CoreSubOperation>();
this.pendingBlocks = new LinkedList<RpcFuture<RpcGetBlock>>();
}
final CoreDataOperation dataOperation(CrailBuffer dataBuf) throws Exception {
blockMap.clear();
pendingBlocks.clear();
CoreDataOperation multiOperation = new CoreDataOperation(this, dataBuf);
//compute off, len for the fragments, start transfer or start RPC if block info is missing
while(multiOperation.remaining() > 0){
long blockRemaining = blockRemaining();
int opLen = CrailUtils.minFileBuf(blockRemaining, multiOperation.remaining());
CoreSubOperation subOperation = new CoreSubOperation(fileInfo.getFd(), position, multiOperation.getCurrentBufferPosition(), opLen);
// LOG.info("OpDesc: " + opDesc.toString());
ioStats.incTotalOps((long) opLen);
if (blockCache.containsKey(subOperation.key())){
BlockInfo block = blockCache.get(subOperation.key());
StorageFuture subFuture = this.prepareAndTrigger(subOperation, dataBuf, block);
multiOperation.add(subFuture);
this.ioStats.incCachedOps();
} else if (nextBlockCache.containsKey(subOperation.key())){
RpcFuture<RpcGetBlock> rpcFuture = nextBlockCache.get(subOperation.key());
blockMap.put(rpcFuture.getTicket(), subOperation);
pendingBlocks.add(rpcFuture);
} else {
this.syncedCapacity = fileInfo.getCapacity();
RpcFuture<RpcGetBlock> rpcFuture = namenodeClientRpc.getBlock(fileInfo.getFd(), fileInfo.getToken(), position, syncedCapacity);
blockMap.put(rpcFuture.getTicket(), subOperation);
pendingBlocks.add(rpcFuture);
}
position += opLen;
multiOperation.incProcessedLen(opLen);
}
//wait for RPC results and start reads for those blocks as well
for (RpcFuture<RpcGetBlock> rpcFuture = pendingBlocks.poll(); rpcFuture != null; rpcFuture = pendingBlocks.poll()){
if (!rpcFuture.isDone()){
this.ioStats.incBlockingOps();
if (rpcFuture.isPrefetched()){
this.ioStats.incPrefetchedBlockingOps();
}
} else {
this.ioStats.incNonblockingOps();
if (rpcFuture.isPrefetched()){
this.ioStats.incPrefetchedNonblockingOps();
}
}
RpcGetBlock getBlockRes = rpcFuture.get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
if (!rpcFuture.isDone()){
throw new IOException("rpc timeout ");
}
if (getBlockRes.getError() != RpcErrors.ERR_OK) {
LOG.info("inputStream: " + RpcErrors.messages[getBlockRes.getError()]);
throw new IOException(RpcErrors.messages[getBlockRes.getError()]);
}
BlockInfo block = getBlockRes.getBlockInfo();
CoreSubOperation subOperation = blockMap.get(rpcFuture.getTicket());
StorageFuture subFuture = prepareAndTrigger(subOperation, dataBuf, block);
multiOperation.add(subFuture);
blockCache.put(subOperation.key(), block);
}
if (!multiOperation.isProcessed()){
throw new IOException("Internal error, processed data != operation length");
}
dataBuf.limit(multiOperation.getBufferLimit());
dataBuf.position(multiOperation.getCurrentBufferPosition());
return multiOperation;
}
final void prefetchMetadata() throws Exception {
long key = CoreSubOperation.createKey(fileInfo.getFd(), position);
if (blockCache.containsKey(key)){
return;
}
if (nextBlockCache.containsKey(key)){
return;
}
this.syncedCapacity = fileInfo.getCapacity();
RpcFuture<RpcGetBlock> nextBlock = namenodeClientRpc.getBlock(fileInfo.getFd(), fileInfo.getToken(), position, syncedCapacity);
nextBlock.setPrefetched(true);
nextBlockCache.put(key, nextBlock);
this.ioStats.incPrefetchedOps();
}
void seek(long pos) throws IOException {
long newOffset = Math.min(fileInfo.getCapacity(), Math.max(0, pos));
if (newOffset == pos){
this.position = newOffset;
} else {
throw new IOException("seek position out of range, pos " + pos + ", fileCapacity " + fileInfo.getCapacity());
}
}
Future<Void> sync() throws IOException {
Future<Void> future = null;
if (fileInfo.getToken() > 0 && syncedCapacity < fileInfo.getCapacity()){
syncedCapacity = fileInfo.getCapacity();
future = new SyncNodeFuture(namenodeClientRpc.setFile(fileInfo, false));
} else {
future = new NoOperation();
}
return future;
}
void updateIOStats() {
ioStats.setCapacity(fileInfo.getCapacity());
}
long getStreamId() {
return streamId;
}
public long position() {
return position;
}
CoreIOStatistics getCoreStatistics(){
return ioStats;
}
public CoreNode getFile(){
return node;
}
BufferCheckpoint getBufferCheckpoint(){
return bufferCheckpoint;
}
//-----------------
void setCapacity(long currentCapacity) {
fileInfo.setCapacity(currentCapacity);
}
private long blockRemaining(){
long blockOffset = position % CrailConstants.BLOCK_SIZE;
long blockRemaining = CrailConstants.BLOCK_SIZE - blockOffset;
return blockRemaining;
}
private StorageFuture prepareAndTrigger(CoreSubOperation opDesc, CrailBuffer dataBuf, BlockInfo block) throws Exception {
try {
StorageEndpoint endpoint = endpointCache.getDataEndpoint(block.getDnInfo());
dataBuf.clear();
dataBuf.position(opDesc.getBufferPosition());
dataBuf.limit(dataBuf.position() + opDesc.getLen());
StorageFuture subFuture = trigger(endpoint, opDesc, dataBuf, block);
incStats(endpoint.isLocal());
return subFuture;
} catch(IOException e){
LOG.info("ERROR: failed data operation");
e.printStackTrace();
throw e;
}
}
private void incStats(boolean isLocal){
if (CrailConstants.STATISTICS){
if (isLocal){
ioStats.incLocalOps();
if (fileInfo.getType().isDirectory()){
ioStats.incLocalDirOps();
}
} else {
ioStats.incRemoteOps();
if (fileInfo.getType().isDirectory()){
ioStats.incRemoteDirOps();
}
}
}
}
}