blob: d30e62ce2f290889171755e79b999ec58636ecf2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.core;
import org.apache.crail.CrailBuffer;
import org.apache.crail.CrailResult;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.storage.StorageResult;
import org.apache.crail.utils.BufferCheckpoint;
import org.apache.crail.utils.MultiFuture;
class CoreDataOperation extends MultiFuture<StorageResult, CrailResult> implements CrailResult {
private CoreStream stream;
private CrailBuffer buffer;
private long fileOffset;
private int bufferPosition;
private int bufferLimit;
private int operationLength;
//current state
private BufferCheckpoint bufferCheckpoint;
private int inProcessLen;
private long completedLen;
private boolean isSynchronous;
public CoreDataOperation(CoreStream stream, CrailBuffer buffer) throws Exception{
this.stream = stream;
this.buffer = buffer;
this.fileOffset = stream.position();
this.bufferPosition = buffer.position();
this.bufferLimit = buffer.limit();
this.operationLength = buffer.remaining();
this.inProcessLen = 0;
this.completedLen = 0;
this.isSynchronous = false;
if (operationLength > 0){
this.bufferCheckpoint = stream.getBufferCheckpoint();
if (CrailConstants.DEBUG){
this.bufferCheckpoint.checkIn(buffer);
}
}
}
public long getLen() {
return completedLen;
}
void incProcessedLen(int opLen){
this.inProcessLen += opLen;
}
long getInProcessLen(){
return this.inProcessLen;
}
int getBufferLimit(){
return bufferLimit;
}
int remaining() {
return operationLength - inProcessLen;
}
int getCurrentBufferPosition(){
return bufferPosition + inProcessLen;
}
long getCurrentStreamPosition(){
return fileOffset + inProcessLen;
}
boolean isProcessed() {
return inProcessLen == operationLength;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public boolean isCancelled() {
return false;
}
public synchronized void add(StorageFuture dataFuture) {
super.add(dataFuture);
if (dataFuture.isSynchronous()){
this.isSynchronous = true;
}
}
//-----------
public void completeOperation(){
super.completeOperation();
if (this.isDone()){
stream.update(fileOffset + completedLen);
if (CrailConstants.DEBUG){
bufferCheckpoint.checkOut(buffer);
}
}
}
boolean isSynchronous() {
return isSynchronous;
}
@Override
public void aggregate(StorageResult result) {
completedLen += result.getLen();
}
@Override
public CrailResult getAggregate() {
return this;
}
}