| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.igfs; |
| |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.igfs.IgfsException; |
| import org.apache.ignite.igfs.IgfsMode; |
| import org.apache.ignite.igfs.IgfsPath; |
| import org.apache.ignite.igfs.IgfsPathNotFoundException; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.Nullable; |
| |
| import java.io.DataInput; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; |
| import static org.apache.ignite.igfs.IgfsMode.PRIMARY; |
| import static org.apache.ignite.igfs.IgfsMode.PROXY; |
| |
| /** |
| * Output stream to store data into grid cache with separate blocks. |
| */ |
| class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { |
| /** Maximum number of blocks in buffer. */ |
| private static final int MAX_BLOCKS_CNT = 16; |
| |
| /** IGFS context. */ |
| private IgfsContext igfsCtx; |
| |
| /** Meta info manager. */ |
| private final IgfsMetaManager meta; |
| |
| /** Data manager. */ |
| private final IgfsDataManager data; |
| |
| /** File descriptor. */ |
| @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") |
| private IgfsEntryInfo fileInfo; |
| |
| /** Space in file to write data. */ |
| @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") |
| private long space; |
| |
| /** Intermediate remainder to keep data. */ |
| private byte[] remainder; |
| |
| /** Data length in remainder. */ |
| private int remainderDataLen; |
| |
| /** Write completion future. */ |
| private final IgniteInternalFuture<Boolean> writeCompletionFut; |
| |
| /** IGFS mode. */ |
| private final IgfsMode mode; |
| |
| /** File worker batch. */ |
| private final IgfsFileWorkerBatch batch; |
| |
| /** Ensures that onClose)_ routine is called no more than once. */ |
| private final AtomicBoolean onCloseGuard = new AtomicBoolean(); |
| |
| /** Local IGFS metrics. */ |
| private final IgfsLocalMetrics metrics; |
| |
| /** Affinity written by this output stream. */ |
| private IgfsFileAffinityRange streamRange; |
| |
| /** |
| * Constructs file output stream. |
| * |
| * @param igfsCtx IGFS context. |
| * @param path Path to stored file. |
| * @param fileInfo File info to write binary data to. |
| * @param bufSize The size of the buffer to be used. |
| * @param mode Grid IGFS mode. |
| * @param batch Optional secondary file system batch. |
| * @param metrics Local IGFS metrics. |
| */ |
| IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode, |
| @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) { |
| super(path, optimizeBufferSize(bufSize, fileInfo)); |
| |
| assert fileInfo != null; |
| assert fileInfo.isFile() : "Unexpected file info: " + fileInfo; |
| assert mode != null && mode != PROXY; |
| assert mode == PRIMARY && batch == null || batch != null; |
| assert metrics != null; |
| |
| // File hasn't been locked. |
| if (fileInfo.lockId() == null) |
| throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path); |
| |
| assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId()); |
| |
| this.igfsCtx = igfsCtx; |
| meta = igfsCtx.meta(); |
| data = igfsCtx.data(); |
| |
| this.fileInfo = fileInfo; |
| this.mode = mode; |
| this.batch = batch; |
| this.metrics = metrics; |
| |
| streamRange = initialStreamRange(fileInfo); |
| |
| writeCompletionFut = data.writeStart(fileInfo); |
| } |
| |
| /** |
| * Optimize buffer size. |
| * |
| * @param bufSize Requested buffer size. |
| * @param fileInfo File info. |
| * @return Optimized buffer size. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) { |
| assert bufSize > 0; |
| |
| if (fileInfo == null) |
| return bufSize; |
| |
| int blockSize = fileInfo.blockSize(); |
| |
| if (blockSize <= 0) |
| return bufSize; |
| |
| if (bufSize <= blockSize) |
| // Optimize minimum buffer size to be equal file's block size. |
| return blockSize; |
| |
| int maxBufSize = blockSize * MAX_BLOCKS_CNT; |
| |
| if (bufSize > maxBufSize) |
| // There is no profit or optimization from larger buffers. |
| return maxBufSize; |
| |
| if (fileInfo.length() == 0) |
| // Make buffer size multiple of block size (optimized for new files). |
| return bufSize / blockSize * blockSize; |
| |
| return bufSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException { |
| int writeLen = block.remaining(); |
| |
| preStoreDataBlocks(null, writeLen); |
| |
| int blockSize = fileInfo.blockSize(); |
| |
| // If data length is not enough to fill full block, fill the remainder and return. |
| if (remainderDataLen + writeLen < blockSize) { |
| if (remainder == null) |
| remainder = new byte[blockSize]; |
| else if (remainder.length != blockSize) { |
| assert remainderDataLen == remainder.length; |
| |
| byte[] allocated = new byte[blockSize]; |
| |
| U.arrayCopy(remainder, 0, allocated, 0, remainder.length); |
| |
| remainder = allocated; |
| } |
| |
| block.get(remainder, remainderDataLen, writeLen); |
| |
| remainderDataLen += writeLen; |
| } |
| else { |
| remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block, |
| false, streamRange, batch); |
| |
| remainderDataLen = remainder == null ? 0 : remainder.length; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException { |
| preStoreDataBlocks(in, len); |
| |
| int blockSize = fileInfo.blockSize(); |
| |
| // If data length is not enough to fill full block, fill the remainder and return. |
| if (remainderDataLen + len < blockSize) { |
| if (remainder == null) |
| remainder = new byte[blockSize]; |
| else if (remainder.length != blockSize) { |
| assert remainderDataLen == remainder.length; |
| |
| byte[] allocated = new byte[blockSize]; |
| |
| U.arrayCopy(remainder, 0, allocated, 0, remainder.length); |
| |
| remainder = allocated; |
| } |
| |
| in.readFully(remainder, remainderDataLen, len); |
| |
| remainderDataLen += len; |
| } |
| else { |
| remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len, |
| false, streamRange, batch); |
| |
| remainderDataLen = remainder == null ? 0 : remainder.length; |
| } |
| } |
| |
| /** |
| * Initializes data loader if it was not initialized yet and updates written space. |
| * |
| * @param len Data length to be written. |
| */ |
| private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException { |
| // Check if any exception happened while writing data. |
| if (writeCompletionFut.isDone()) { |
| assert ((GridFutureAdapter)writeCompletionFut).isFailed(); |
| |
| if (in != null) |
| in.skipBytes(len); |
| |
| writeCompletionFut.get(); |
| } |
| |
| bytes += len; |
| space += len; |
| } |
| |
| /** |
| * Flushes this output stream and forces any buffered output bytes to be written out. |
| * |
| * @exception IOException if an I/O error occurs. |
| */ |
| @Override public synchronized void flush() throws IOException { |
| boolean exists; |
| |
| try { |
| exists = meta.exists(fileInfo.id()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("File to read file metadata: " + path, e); |
| } |
| |
| if (!exists) { |
| onClose(true); |
| |
| throw new IOException("File was concurrently deleted: " + path); |
| } |
| |
| super.flush(); |
| |
| try { |
| if (remainder != null) { |
| data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, |
| ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); |
| |
| remainder = null; |
| remainderDataLen = 0; |
| } |
| |
| if (space > 0) { |
| data.awaitAllAcksReceived(fileInfo.id()); |
| |
| IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange); |
| |
| if (fileInfo0 == null) |
| throw new IOException("File was concurrently deleted: " + path); |
| else |
| fileInfo = fileInfo0; |
| |
| streamRange = initialStreamRange(fileInfo); |
| |
| space = 0; |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onClose() throws IOException { |
| onClose(false); |
| } |
| |
| /** |
| * Close callback. It will be called only once in synchronized section. |
| * |
| * @param deleted Whether we already know that the file was deleted. |
| * @throws IOException If failed. |
| */ |
| private void onClose(boolean deleted) throws IOException { |
| assert Thread.holdsLock(this); |
| |
| if (onCloseGuard.compareAndSet(false, true)) { |
| // Notify backing secondary file system batch to finish. |
| if (mode != PRIMARY) { |
| assert batch != null; |
| |
| batch.finish(); |
| } |
| |
| // Ensure file existence. |
| boolean exists; |
| |
| try { |
| exists = !deleted && meta.exists(fileInfo.id()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("File to read file metadata: " + path, e); |
| } |
| |
| if (exists) { |
| IOException err = null; |
| |
| try { |
| data.writeClose(fileInfo); |
| |
| writeCompletionFut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e); |
| } |
| |
| metrics.addWrittenBytesTime(bytes, time); |
| |
| // Await secondary file system processing to finish. |
| if (mode == DUAL_SYNC) { |
| try { |
| batch.await(); |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = new IOException("Failed to close secondary file system stream [path=" + path + |
| ", fileInfo=" + fileInfo + ']', e); |
| } |
| } |
| |
| long modificationTime = System.currentTimeMillis(); |
| |
| try { |
| meta.unlock(fileInfo, modificationTime); |
| } |
| catch (IgfsPathNotFoundException ignore) { |
| data.delete(fileInfo); // Safety to ensure that all data blocks are deleted. |
| |
| throw new IOException("File was concurrently deleted: " + path); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("File to read file metadata: " + path, e); |
| } |
| |
| if (err != null) |
| throw err; |
| } |
| else { |
| try { |
| if (mode == DUAL_SYNC) |
| batch.await(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException("Failed to close secondary file system stream [path=" + path + |
| ", fileInfo=" + fileInfo + ']', e); |
| } |
| finally { |
| data.delete(fileInfo); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets initial affinity range. This range will have 0 length and will start from first |
| * non-occupied file block. |
| * |
| * @param fileInfo File info to build initial range for. |
| * @return Affinity range. |
| */ |
| private IgfsFileAffinityRange initialStreamRange(IgfsEntryInfo fileInfo) { |
| if (!igfsCtx.configuration().isFragmentizerEnabled()) |
| return null; |
| |
| if (!Boolean.parseBoolean(fileInfo.properties().get(IgfsUtils.PROP_PREFER_LOCAL_WRITES))) |
| return null; |
| |
| int blockSize = fileInfo.blockSize(); |
| |
| // Find first non-occupied block offset. |
| long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize; |
| |
| // Need to get last affinity key and reuse it if we are on the same node. |
| long lastBlockOff = off - fileInfo.blockSize(); |
| |
| if (lastBlockOff < 0) |
| lastBlockOff = 0; |
| |
| IgfsFileMap map = fileInfo.fileMap(); |
| |
| IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false); |
| |
| IgniteUuid affKey = data.nextAffinityKey(prevAffKey); |
| |
| return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(IgfsOutputStreamImpl.class, this); |
| } |
| } |