blob: d463833539de6c51c915e39cab452a2d32dc452a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.core;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.crail.CrailBuffer;
import org.apache.crail.CrailOutputStream;
import org.apache.crail.CrailResult;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.utils.CrailImmediateOperation;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
public class CoreOutputStream extends CoreStream implements CrailOutputStream {
private static final Logger LOG = CrailUtils.getLogger();
private AtomicLong inFlight;
private long writeHint;
private CrailImmediateOperation noOp;
private boolean open;
public CoreOutputStream(CoreNode file, long streamId, long writeHint) throws Exception {
super(file, streamId, file.getCapacity());
this.writeHint = Math.max(0, writeHint);
this.inFlight = new AtomicLong(0);
this.noOp = new CrailImmediateOperation(0);
this.open = true;
if (CrailConstants.DEBUG){
LOG.info("CoreOutputStream, open, path " + file.getPath() + ", fd " + file.getFd() + ", streamId " + streamId + ", isDir " + file.getType().isDirectory() + ", writeHint " + this.writeHint);
}
}
final public Future<CrailResult> write(CrailBuffer dataBuf) throws Exception {
if (!open) {
throw new IOException("Stream closed, cannot write");
}
if (dataBuf.remaining() <= 0) {
return noOp;
}
inFlight.incrementAndGet();
CoreDataOperation future = dataOperation(dataBuf);
if (position() < writeHint){
prefetchMetadata();
}
if (future.isSynchronous()){
future.get();
}
return future;
}
final public long getWriteHint() {
return this.writeHint;
}
public Future<Void> sync() throws IOException {
if (inFlight.get() != 0){
LOG.info("Cannot sync, pending operations, opcount " + inFlight.get());
throw new IOException("Cannot close, pending operations, opcount " + inFlight.get());
}
return super.sync();
}
public void close() throws Exception {
if (!open){
return;
}
if (inFlight.get() != 0){
LOG.info("Cannot close, pending operations, opcount " + inFlight.get() + ", path " + getFile().getPath());
throw new IOException("Cannot close, pending operations, opcount " + inFlight.get() + ", fd " + getFile().getFd() + ", streamId " + getStreamId() + ", capacity " + getFile().getCapacity());
}
sync().get();
updateIOStats();
node.closeOutputStream(this);
open = false;
if (CrailConstants.DEBUG){
LOG.info("CoreOutputStream, close, path " + this.getFile().getPath() + ", fd " + getFile().getFd() + ", streamId " + getStreamId() + ", capacity " + getFile().getCapacity());
}
}
// ----------------------
StorageFuture trigger(StorageEndpoint endpoint, CoreSubOperation opDesc, CrailBuffer buffer, BlockInfo block) throws Exception {
StorageFuture dataFuture = endpoint.write(buffer, block, opDesc.getBlockOffset());
return dataFuture;
}
synchronized void update(long newCapacity) {
inFlight.decrementAndGet();
setCapacity(newCapacity);
}
}