blob: f97624247cfb8d2826e4fa6d6ae2bd35e6dd32a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* Output stream to store data into grid cache with separate blocks.
*/
class IgfsOutputStreamImpl extends IgfsAbstractOutputStream {
/** Maximum number of blocks in buffer. */
private static final int MAX_BLOCKS_CNT = 16;
/** IGFS mode. */
private final IgfsMode mode;
/** Write completion future. */
private final IgniteInternalFuture<Boolean> writeFut;
/** File descriptor. */
private IgfsEntryInfo fileInfo;
/** Affinity written by this output stream. */
private IgfsFileAffinityRange streamRange;
/** Data length in remainder. */
protected int remainderDataLen;
/** Intermediate remainder to keep data. */
private byte[] remainder;
/** Space in file to write data. */
protected long space;
/**
* Constructs file output stream.
*
* @param igfsCtx IGFS context.
* @param path Path to stored file.
* @param fileInfo File info to write binary data to.
* @param bufSize The size of the buffer to be used.
* @param mode Grid IGFS mode.
* @param batch Optional secondary file system batch.
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch) {
super(igfsCtx, path, bufSize, batch);
assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
// File hasn't been locked.
if (fileInfo.lockId() == null)
throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
synchronized (mux) {
this.fileInfo = fileInfo;
this.mode = mode;
streamRange = initialStreamRange(fileInfo);
writeFut = igfsCtx.data().writeStart(fileInfo.id());
}
}
/**
* @return Length of file.
*/
private long length() {
return fileInfo.length();
}
/** {@inheritDoc} */
@Override protected int optimizeBufferSize(int bufSize) {
assert bufSize > 0;
if (fileInfo == null)
return bufSize;
int blockSize = fileInfo.blockSize();
if (blockSize <= 0)
return bufSize;
if (bufSize <= blockSize)
// Optimize minimum buffer size to be equal file's block size.
return blockSize;
int maxBufSize = blockSize * MAX_BLOCKS_CNT;
if (bufSize > maxBufSize)
// There is no profit or optimization from larger buffers.
return maxBufSize;
if (fileInfo.length() == 0)
// Make buffer size multiple of block size (optimized for new files).
return bufSize / blockSize * blockSize;
return bufSize;
}
/**
* Flushes this output stream and forces any buffered output bytes to be written out.
*
* @throws IOException if an I/O error occurs.
*/
@Override public void flush() throws IOException {
synchronized (mux) {
checkClosed(null, 0);
sendBufferIfNotEmpty();
flushRemainder();
awaitAcks();
// Update file length if needed.
if (igfsCtx.configuration().isUpdateFileLengthOnFlush() && space > 0) {
try {
IgfsEntryInfo fileInfo0 = igfsCtx.meta().reserveSpace(fileInfo.id(), space, streamRange);
if (fileInfo0 == null)
throw new IOException("File was concurrently deleted: " + path);
else
fileInfo = fileInfo0;
streamRange = initialStreamRange(fileInfo);
space = 0;
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to update file length data [path=" + path +
", space=" + space + ']', e);
}
}
}
}
/** {@inheritDoc} */
@Override public final void close() throws IOException {
synchronized (mux) {
// Do nothing if stream is already closed.
if (closed)
return;
// Set closed flag immediately.
closed = true;
// Flush data.
IOException err = null;
boolean flushSuccess = false;
try {
sendBufferIfNotEmpty();
flushRemainder();
igfsCtx.data().writeClose(fileInfo.id());
writeFut.get();
flushSuccess = true;
}
catch (Exception e) {
err = new IOException("Failed to flush data during stream close [path=" + path +
", fileInfo=" + fileInfo + ']', e);
}
// Finish batch before file unlocking to support the assertion that unlocked file batch,
// if any, must be in finishing state (e.g. append see more IgfsImpl.newBatch)
if (batch != null)
batch.finish();
// Unlock the file after data is flushed.
try {
if (flushSuccess && space > 0)
igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis(), true,
space, streamRange);
else
igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis());
}
catch (Exception e) {
if (err == null)
err = new IOException("File to release file lock: " + path, e);
else
err.addSuppressed(e);
}
// Finally, await secondary file system flush.
if (batch != null) {
if (mode == DUAL_SYNC) {
try {
batch.await();
}
catch (IgniteCheckedException e) {
if (err == null)
err = new IOException("Failed to close secondary file system stream [path=" + path +
", fileInfo=" + fileInfo + ']', e);
else
err.addSuppressed(e);
}
}
}
// Throw error, if any.
if (err != null)
throw err;
updateMetricsOnClose();
}
}
/**
* Flush remainder.
*
* @throws IOException If failed.
*/
private void flushRemainder() throws IOException {
try {
if (remainder != null) {
remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, null,
0, ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
remainder = null;
remainderDataLen = 0;
}
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e);
}
}
/** {@inheritDoc} */
@Override protected void send(Object data, int writeLen) throws IOException {
assert Thread.holdsLock(mux);
assert data instanceof ByteBuffer || data instanceof DataInput;
try {
// Increment metrics.
bytes += writeLen;
space += writeLen;
int blockSize = fileInfo.blockSize();
// If data length is not enough to fill full block, fill the remainder and return.
if (remainderDataLen + writeLen < blockSize) {
if (remainder == null)
remainder = new byte[blockSize];
else if (remainder.length != blockSize) {
assert remainderDataLen == remainder.length;
byte[] allocated = new byte[blockSize];
U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
remainder = allocated;
}
if (data instanceof ByteBuffer)
((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
else
((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
remainderDataLen += writeLen;
}
else {
if (data instanceof ByteBuffer) {
remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder,
remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
}
else {
remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder,
remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
}
remainderDataLen = remainder == null ? 0 : remainder.length;
}
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to store data into file: " + path, e);
}
}
/**
* Await acknowledgments.
*
* @throws IOException If failed.
*/
private void awaitAcks() throws IOException {
try {
igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e);
}
}
/**
* Gets initial affinity range. This range will have 0 length and will start from first
* non-occupied file block.
*
* @param fileInfo File info to build initial range for.
* @return Affinity range.
*/
private IgfsFileAffinityRange initialStreamRange(IgfsEntryInfo fileInfo) {
if (!igfsCtx.configuration().isFragmentizerEnabled())
return null;
if (!Boolean.parseBoolean(fileInfo.properties().get(IgfsUtils.PROP_PREFER_LOCAL_WRITES)))
return null;
int blockSize = fileInfo.blockSize();
// Find first non-occupied block offset.
long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize;
// Need to get last affinity key and reuse it if we are on the same node.
long lastBlockOff = off - fileInfo.blockSize();
if (lastBlockOff < 0)
lastBlockOff = 0;
IgfsFileMap map = fileInfo.fileMap();
IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
IgniteUuid affKey = igfsCtx.data().nextAffinityKey(prevAffKey);
return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsOutputStreamImpl.class, this);
}
}