| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.igfs; |
| |
| import java.io.DataInput; |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteInterruptedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.affinity.AffinityKeyMapper; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.FileSystemConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.igfs.IgfsBlockLocation; |
| import org.apache.ignite.igfs.IgfsException; |
| import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; |
| import org.apache.ignite.igfs.IgfsOutOfSpaceException; |
| import org.apache.ignite.igfs.IgfsPath; |
| import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRegion; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters; |
| import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor; |
| import org.apache.ignite.internal.processors.task.GridInternal; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.Nullable; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * Cache based file's data container. |
| */ |
| public class IgfsDataManager extends IgfsManager { |
| /** Data internal cache. */ |
| private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj; |
| |
| /** Data cache. */ |
| private IgniteInternalCache<Object, Object> dataCache; |
| |
| /** */ |
| private CountDownLatch dataCacheStartLatch; |
| |
| /** Group block size. */ |
| private long grpBlockSize; |
| |
| /** Group size. */ |
| private int grpSize; |
| |
| /** Byte buffer writer. */ |
| private ByteBufferBlocksWriter byteBufWriter = new ByteBufferBlocksWriter(); |
| |
| /** Data input writer. */ |
| private DataInputBlocksWriter dataInputWriter = new DataInputBlocksWriter(); |
| |
| /** Pending writes future. */ |
| private ConcurrentMap<IgniteUuid, WriteCompletionFuture> pendingWrites = new ConcurrentHashMap<>(); |
| |
| /** Affinity key generator. */ |
| private AtomicLong affKeyGen = new AtomicLong(); |
| |
| /** Request ID counter for write messages. */ |
| private AtomicLong reqIdCtr = new AtomicLong(); |
| |
| /** IGFS communication topic. */ |
| private Object topic; |
| |
| /** Async file delete worker. */ |
| private AsyncDeleteWorker delWorker; |
| |
| /** Async file delete worker. */ |
| private String dataCacheName; |
| |
| /** On-going remote reads futures. */ |
| private final ConcurrentHashMap<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts = |
| new ConcurrentHashMap<>(); |
| |
| /** |
| * |
| */ |
| void awaitInit() { |
| try { |
| dataCacheStartLatch.await(); |
| } |
| catch (InterruptedException e) { |
| throw new IgniteInterruptedException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void start0() throws IgniteCheckedException { |
| dataCacheStartLatch = new CountDownLatch(1); |
| |
| String igfsName = igfsCtx.configuration().getName(); |
| |
| topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); |
| |
| igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| if (msg instanceof IgfsBlocksMessage) |
| processBlocksMessage(nodeId, (IgfsBlocksMessage)msg); |
| else if (msg instanceof IgfsAckMessage) |
| processAckMessage(nodeId, (IgfsAckMessage)msg); |
| } |
| }); |
| |
| igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; |
| |
| DiscoveryEvent discoEvt = (DiscoveryEvent)evt; |
| |
| if (igfsCtx.igfsNode(discoEvt.eventNode())) { |
| for (WriteCompletionFuture future : pendingWrites.values()) { |
| future.onError(discoEvt.eventNode().id(), |
| new ClusterTopologyCheckedException("Node left grid before write completed: " + evt.node().id())); |
| } |
| } |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().igniteInstanceName(), |
| "igfs-" + igfsName + "-delete-worker", log); |
| |
| dataCacheName = igfsCtx.configuration().getDataCacheConfiguration().getName(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected void onKernalStart0() throws IgniteCheckedException { |
| dataCachePrj = igfsCtx.kernalContext().cache().getOrStartCache(dataCacheName); |
| |
| assert dataCachePrj != null; |
| |
| dataCache = (IgniteInternalCache)dataCachePrj; |
| |
| AffinityKeyMapper mapper = igfsCtx.kernalContext().cache() |
| .internalCache(dataCacheName).configuration().getAffinityMapper(); |
| |
| grpSize = mapper instanceof IgfsGroupDataBlocksKeyMapper ? |
| ((IgfsGroupDataBlocksKeyMapper)mapper).getGroupSize() : 1; |
| |
| grpBlockSize = igfsCtx.configuration().getBlockSize() * (long)grpSize; |
| |
| assert grpBlockSize != 0; |
| |
| igfsCtx.kernalContext().cache().internalCache(dataCacheName).preloader() |
| .startFuture().listen(new CI1<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> f) { |
| dataCacheStartLatch.countDown(); |
| } |
| }); |
| |
| new Thread(delWorker).start(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStop0(boolean cancel) { |
| if (cancel) |
| delWorker.cancel(); |
| else |
| delWorker.stop(); |
| |
| try { |
| // Always wait thread exit. |
| U.join(delWorker); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e); |
| } |
| } |
| |
| /** |
| * @return Number of bytes used to store files. |
| */ |
| public long spaceSize() { |
| return dataCachePrj.igfsDataSpaceUsed(); |
| } |
| |
| /** |
| * @return Maximum number of bytes for IGFS data cache. |
| */ |
| public long maxSpaceSize() { |
| DataRegion plc = dataCachePrj.context().dataRegion(); |
| |
| long size = plc != null ? plc.config().getMaxSize() : 0; |
| |
| return (size <= 0) ? 0 : size ; |
| } |
| |
| /** |
| * Generates next affinity key for local node based on current topology. If previous affinity key maps |
| * on local node, return previous affinity key to prevent unnecessary file map growth. |
| * |
| * @param prevAffKey Affinity key of previous block. |
| * @return Affinity key. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) { |
| // Do not generate affinity key for non-affinity nodes. |
| if (!dataCache.context().affinityNode()) |
| return null; |
| |
| UUID nodeId = igfsCtx.kernalContext().localNodeId(); |
| |
| if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal()) |
| return prevAffKey; |
| |
| while (true) { |
| IgniteUuid key = new IgniteUuid(nodeId, affKeyGen.getAndIncrement()); |
| |
| if (dataCache.affinity().mapKeyToNode(key).isLocal()) |
| return key; |
| } |
| } |
| |
| /** |
| * Maps affinity key to node. |
| * |
| * @param affinityKey Affinity key to map. |
| * @return Primary node for this key. |
| */ |
| public ClusterNode affinityNode(Object affinityKey) { |
| return dataCache.affinity().mapKeyToNode(affinityKey); |
| } |
| |
| /** |
| * Maps affinity key to node. |
| * |
| * @param affinityKey Affinity key to map. |
| * @return Primary node for this key. |
| */ |
| public Collection<ClusterNode> affinityNodes(Object affinityKey) { |
| return dataCache.affinity().mapKeyToPrimaryAndBackups(affinityKey); |
| } |
| |
| /** |
| * Creates new instance of explicit data streamer. |
| * |
| * @return New instance of data streamer. |
| */ |
| private IgniteDataStreamer<IgfsBlockKey, byte[]> dataStreamer() { |
| IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = |
| igfsCtx.kernalContext().<IgfsBlockKey, byte[]>dataStream().dataStreamer(dataCachePrj.name()); |
| |
| FileSystemConfiguration cfg = igfsCtx.configuration(); |
| |
| if (cfg.getPerNodeBatchSize() > 0) |
| ldr.perNodeBufferSize(cfg.getPerNodeBatchSize()); |
| |
| if (cfg.getPerNodeParallelBatchCount() > 0) |
| ldr.perNodeParallelOperations(cfg.getPerNodeParallelBatchCount()); |
| |
| ldr.receiver(DataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted()); |
| |
| return ldr; |
| } |
| |
| /** |
| * Get data block for specified file ID and block index. |
| * |
| * @param fileInfo File info. |
| * @param path Path reading from. |
| * @param blockIdx Block index. |
| * @param secReader Optional secondary file system reader. |
| * @return Requested data block or {@code null} if nothing found. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path, |
| final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader) |
| throws IgniteCheckedException { |
| assert fileInfo != null; |
| assert blockIdx >= 0; |
| |
| // Schedule block request BEFORE prefetch requests. |
| final IgfsBlockKey key = blockKey(blockIdx, fileInfo); |
| |
| if (log.isDebugEnabled() && |
| dataCache.affinity().isPrimaryOrBackup(igfsCtx.kernalContext().discovery().localNode(), key)) { |
| log.debug("Reading non-local data block [path=" + path + ", fileInfo=" + fileInfo + |
| ", blockIdx=" + blockIdx + ']'); |
| } |
| |
| IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key); |
| |
| if (secReader != null) { |
| Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL); |
| |
| fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() { |
| @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException { |
| byte[] res = fut.get(); |
| |
| if (res == null) { |
| GridFutureAdapter<byte[]> rmtReadFut = new GridFutureAdapter<>(); |
| |
| IgniteInternalFuture<byte[]> oldRmtReadFut = rmtReadFuts.putIfAbsent(key, rmtReadFut); |
| |
| if (oldRmtReadFut == null) { |
| try { |
| res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize()); |
| |
| rmtReadFut.onDone(res); |
| |
| putBlock(fileInfo.blockSize(), key, res); |
| } |
| catch (IgniteCheckedException e) { |
| rmtReadFut.onDone(e); |
| |
| throw e; |
| } |
| finally { |
| boolean rmv = rmtReadFuts.remove(key, rmtReadFut); |
| |
| assert rmv; |
| } |
| } |
| else { |
| // Wait for existing future to finish and get it's result. |
| res = oldRmtReadFut.get(); |
| |
| igfsCtx.metrics().addReadBlocks(1, 0); |
| } |
| } |
| else |
| igfsCtx.metrics().addReadBlocks(1, 0); |
| |
| return res; |
| } |
| }, exec); |
| } |
| else |
| igfsCtx.metrics().addReadBlocks(1, 0); |
| |
| return fut; |
| } |
| |
| /** |
| * Get data block for specified block index from secondary reader. |
| * |
| * @param path Path reading from. |
| * @param blockIdx Block index. |
| * @param secReader Optional secondary file system reader. |
| * @param blockSize Block size. |
| * @return Requested data block or {@code null} if nothing found. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx, |
| IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Reading non-local data block in the secondary file system [path=" + |
| path + ", blockIdx=" + blockIdx + ']'); |
| |
| long pos = blockIdx * blockSize; // Calculate position for Hadoop |
| |
| byte[] res = new byte[blockSize]; |
| |
| int read = 0; |
| |
| try { |
| int r; |
| |
| // Delegate to the secondary file system. |
| while (read < blockSize) { |
| r = secReader.read(pos + read, res, read, blockSize - read); |
| |
| if (r < 0) |
| break; |
| |
| read += r; |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to read data due to secondary file system " + |
| "exception: " + e.getMessage(), e); |
| } |
| |
| // If we did not read full block at the end of the file - trim it. |
| if (read != blockSize) |
| res = Arrays.copyOf(res, read); |
| |
| igfsCtx.metrics().addReadBlocks(1, 1); |
| |
| return res; |
| } |
| |
| /** |
| * Stores the given block in data cache. |
| * |
| * @param blockSize The size of the block. |
| * @param key The data cache key of the block. |
| * @param data The new value of the block. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException { |
| if (data.length < blockSize) |
| // partial (incomplete) block: |
| dataCachePrj.invoke(key, new IgfsDataPutProcessor(data)); |
| else { |
| // whole block: |
| assert data.length == blockSize; |
| |
| dataCachePrj.put(key, data); |
| } |
| } |
| |
| |
| /** |
| * Registers write future in igfs data manager. |
| * |
| * @param fileId File ID. |
| * @return Future that will be completed when all ack messages are received or when write failed. |
| */ |
| public IgniteInternalFuture<Boolean> writeStart(IgniteUuid fileId) { |
| WriteCompletionFuture fut = new WriteCompletionFuture(fileId); |
| |
| WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileId, fut); |
| |
| assert oldFut == null : "Opened write that is being concurrently written: " + fileId; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Registered write completion future for file output stream [fileId=" + fileId + |
| ", fut=" + fut + ']'); |
| |
| return fut; |
| } |
| |
| /** |
| * Notifies data manager that no further writes will be performed on stream. |
| * |
| * @param fileId File ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void writeClose(IgniteUuid fileId) throws IgniteCheckedException { |
| WriteCompletionFuture fut = pendingWrites.get(fileId); |
| |
| if (fut != null) |
| fut.markWaitingLastAck(); |
| } |
| |
| /** |
| * Store data blocks in file.<br/> |
| * Note! If file concurrently deleted we'll get lost blocks. |
| * |
| * @param fileInfo File info. |
| * @param reservedLen Reserved length. |
| * @param remainder Remainder. |
| * @param remainderLen Remainder length. |
| * @param data Data to store. |
| * @param flush Flush flag. |
| * @param affinityRange Affinity range to update if file write can be colocated. |
| * @param batch Optional secondary file system worker batch. |
| * |
| * @return Remainder if data did not fill full block. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public byte[] storeDataBlocks( |
| IgfsEntryInfo fileInfo, |
| long reservedLen, |
| @Nullable byte[] remainder, |
| int remainderLen, |
| ByteBuffer data, |
| boolean flush, |
| IgfsFileAffinityRange affinityRange, |
| @Nullable IgfsFileWorkerBatch batch |
| ) throws IgniteCheckedException { |
| //assert validTxState(any); // Allow this method call for any transaction state. |
| |
| return byteBufWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, data, data.remaining(), |
| flush, affinityRange, batch); |
| } |
| |
| /** |
| * Store data blocks in file.<br/> |
| * Note! If file concurrently deleted we'll got lost blocks. |
| * |
| * @param fileInfo File info. |
| * @param reservedLen Reserved length. |
| * @param remainder Remainder. |
| * @param remainderLen Remainder length. |
| * @param in Data to store. |
| * @param len Data length to store. |
| * @param flush Flush flag. |
| * @param affinityRange File affinity range to update if file cal be colocated. |
| * @param batch Optional secondary file system worker batch. |
| * @throws IgniteCheckedException If failed. |
| * @return Remainder of data that did not fit the block if {@code flush} flag is {@code false}. |
| * @throws IOException If store failed. |
| */ |
| @Nullable public byte[] storeDataBlocks( |
| IgfsEntryInfo fileInfo, |
| long reservedLen, |
| @Nullable byte[] remainder, |
| int remainderLen, |
| DataInput in, |
| int len, |
| boolean flush, |
| IgfsFileAffinityRange affinityRange, |
| @Nullable IgfsFileWorkerBatch batch |
| ) throws IgniteCheckedException, IOException { |
| //assert validTxState(any); // Allow this method call for any transaction state. |
| |
| return dataInputWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, in, len, flush, |
| affinityRange, batch); |
| } |
| |
| /** |
| * Delete file's data from data cache. |
| * |
| * @param fileInfo File details to remove data for. |
| * @return Delete future that will be completed when file is actually erased. |
| */ |
| public IgniteInternalFuture<Object> delete(IgfsEntryInfo fileInfo) { |
| if (!fileInfo.isFile()) { |
| if (log.isDebugEnabled()) |
| log.debug("Cannot delete content of not-data file: " + fileInfo); |
| |
| return new GridFinishedFuture<>(); |
| } |
| else |
| return delWorker.deleteAsync(fileInfo); |
| } |
| |
| /** |
| * @param blockIdx Block index. |
| * @param fileInfo File info. |
| * @return Block key. |
| */ |
| public IgfsBlockKey blockKey(long blockIdx, IgfsEntryInfo fileInfo) { |
| if (fileInfo.affinityKey() != null) |
| return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), blockIdx); |
| |
| if (fileInfo.fileMap() != null) { |
| IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockIdx * fileInfo.blockSize(), false); |
| |
| return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), blockIdx); |
| } |
| |
| return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), blockIdx); |
| } |
| |
| /** |
| * Tries to remove blocks affected by fragmentizer. If {@code cleanNonColocated} is {@code true}, will remove |
| * non-colocated blocks as well. |
| * |
| * @param fileInfo File info to clean up. |
| * @param range Range to clean up. |
| * @param cleanNonColocated {@code True} if all blocks should be cleaned. |
| */ |
| public void cleanBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range, boolean cleanNonColocated) { |
| long startIdx = range.startOffset() / fileInfo.blockSize(); |
| |
| long endIdx = range.endOffset() / fileInfo.blockSize(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Cleaning blocks [fileInfo=" + fileInfo + ", range=" + range + |
| ", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']'); |
| |
| try { |
| try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataStreamer()) { |
| for (long idx = startIdx; idx <= endIdx; idx++) { |
| ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(), |
| idx)); |
| |
| if (cleanNonColocated) |
| ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx)); |
| } |
| } |
| } |
| catch (IgniteException e) { |
| log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e); |
| } |
| } |
| |
| /** |
| * Moves all collocated blocks in range to non-colocated keys. |
| * @param fileInfo File info to move data for. |
| * @param range Range to move. |
| */ |
| public void spreadBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range) { |
| long startIdx = range.startOffset() / fileInfo.blockSize(); |
| |
| long endIdx = range.endOffset() / fileInfo.blockSize(); |
| |
| try { |
| try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataStreamer()) { |
| long bytesProcessed = 0; |
| |
| for (long idx = startIdx; idx <= endIdx; idx++) { |
| IgfsBlockKey colocatedKey = new IgfsBlockKey(fileInfo.id(), range.affinityKey(), |
| fileInfo.evictExclude(), idx); |
| |
| IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx); |
| |
| // Most of the time should be local get. |
| byte[] block = dataCachePrj.get(colocatedKey); |
| |
| if (block != null) { |
| // Need to check if block is partially written. |
| // If so, must update it in pessimistic transaction. |
| if (block.length != fileInfo.blockSize()) { |
| try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { |
| Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key)); |
| |
| byte[] val = vals.get(colocatedKey); |
| |
| if (val != null) { |
| putBlock(fileInfo.blockSize(), key, val); |
| |
| tx.commit(); |
| } |
| else { |
| // File is being concurrently deleted. |
| if (log.isDebugEnabled()) |
| log.debug("Failed to find colocated file block for spread (will ignore) " + |
| "[fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx + |
| ", endIdx=" + endIdx + ", idx=" + idx + ']'); |
| } |
| } |
| } |
| else |
| ldr.addData(key, block); |
| |
| bytesProcessed += block.length; |
| |
| if (bytesProcessed >= igfsCtx.configuration().getFragmentizerThrottlingBlockLength()) { |
| ldr.flush(); |
| |
| bytesProcessed = 0; |
| |
| U.sleep(igfsCtx.configuration().getFragmentizerThrottlingDelay()); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to find colocated file block for spread (will ignore) " + |
| "[fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx + |
| ", endIdx=" + endIdx + ", idx=" + idx + ']'); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e); |
| } |
| } |
| |
| /** |
| * Resolve affinity nodes for specified part of file. |
| * |
| * @param info File info to resolve affinity nodes for. |
| * @param start Start position in the file. |
| * @param len File part length to get affinity for. |
| * @return Affinity blocks locations. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len) |
| throws IgniteCheckedException { |
| return affinity(info, start, len, 0); |
| } |
| |
| /** |
| * Resolve affinity nodes for specified part of file. |
| * |
| * @param info File info to resolve affinity nodes for. |
| * @param start Start position in the file. |
| * @param len File part length to get affinity for. |
| * @param maxLen Maximum block length. |
| * @return Affinity blocks locations. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen) |
| throws IgniteCheckedException { |
| assert info.isFile() : "Failed to get affinity (not a file): " + info; |
| assert start >= 0 : "Start position should not be negative: " + start; |
| assert len >= 0 : "Part length should not be negative: " + len; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Calculating affinity for file [info=" + info + ", start=" + start + ", len=" + len + ']'); |
| |
| // Skip affinity resolving, if no data requested. |
| if (len == 0) |
| return Collections.emptyList(); |
| |
| if (maxLen > 0) { |
| maxLen -= maxLen % info.blockSize(); |
| |
| // If maxLen is smaller than block size, then adjust it to the block size. |
| if (maxLen < info.blockSize()) |
| maxLen = info.blockSize(); |
| } |
| else |
| maxLen = 0; |
| |
| // In case when affinity key is not null the whole file resides on one node. |
| if (info.affinityKey() != null) { |
| Collection<IgfsBlockLocation> res = new LinkedList<>(); |
| |
| splitBlocks(start, len, maxLen, dataCache.affinity().mapKeyToPrimaryAndBackups( |
| new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res); |
| |
| return res; |
| } |
| |
| // Need to merge ranges affinity with non-colocated affinity. |
| Deque<IgfsBlockLocation> res = new LinkedList<>(); |
| |
| if (info.fileMap().ranges().isEmpty()) { |
| affinity0(info, start, len, maxLen, res); |
| |
| return res; |
| } |
| |
| long pos = start; |
| long end = start + len; |
| |
| for (IgfsFileAffinityRange range : info.fileMap().ranges()) { |
| if (log.isDebugEnabled()) |
| log.debug("Checking range [range=" + range + ", pos=" + pos + ']'); |
| |
| // If current position is less than range start, add non-colocated affinity ranges. |
| if (range.less(pos)) { |
| long partEnd = Math.min(end, range.startOffset()); |
| |
| affinity0(info, pos, partEnd - pos, maxLen, res); |
| |
| pos = partEnd; |
| } |
| |
| IgfsBlockLocation last = res.peekLast(); |
| |
| if (range.belongs(pos)) { |
| long partEnd = Math.min(range.endOffset() + 1, end); |
| |
| Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups( |
| range.affinityKey()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Calculated affinity for range [start=" + pos + ", end=" + partEnd + |
| ", nodes=" + F.nodeIds(affNodes) + ", range=" + range + |
| ", affNodes=" + F.nodeIds(affNodes) + ']'); |
| |
| if (last != null && equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id()))) { |
| // Merge with the previous block in result. |
| res.removeLast(); |
| |
| splitBlocks(last.start(), last.length() + partEnd - pos, maxLen, affNodes, res); |
| } |
| else |
| // Do not merge with the previous block. |
| splitBlocks(pos, partEnd - pos, maxLen, affNodes, res); |
| |
| pos = partEnd; |
| } |
| // Else skip this range. |
| |
| if (log.isDebugEnabled()) |
| log.debug("Finished range check [range=" + range + ", pos=" + pos + ", res=" + res + ']'); |
| |
| if (pos == end) |
| break; |
| } |
| |
| // Final chunk. |
| if (pos != end) |
| affinity0(info, pos, end, maxLen, res); |
| |
| return res; |
| } |
| |
| /** |
| * Calculates non-colocated affinity for given file info and given region of file. |
| * |
| * @param info File info. |
| * @param start Start offset. |
| * @param len Length. |
| * @param maxLen Maximum allowed split length. |
| * @param res Result collection to add regions to. |
| */ |
| private void affinity0(IgfsEntryInfo info, long start, long len, long maxLen, Deque<IgfsBlockLocation> res) { |
| long firstGrpIdx = start / grpBlockSize; |
| long limitGrpIdx = (start + len + grpBlockSize - 1) / grpBlockSize; |
| |
| if (limitGrpIdx - firstGrpIdx > Integer.MAX_VALUE) |
| throw new IgfsException("Failed to get affinity (range is too wide)" + |
| " [info=" + info + ", start=" + start + ", len=" + len + ']'); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Mapping file region [fileInfo=" + info + ", start=" + start + ", len=" + len + ']'); |
| |
| for (long grpIdx = firstGrpIdx; grpIdx < limitGrpIdx; grpIdx++) { |
| // Boundaries of the block. |
| long blockStart; |
| long blockLen; |
| |
| // The first block. |
| if (grpIdx == firstGrpIdx) { |
| blockStart = start % grpBlockSize; |
| blockLen = Math.min(grpBlockSize - blockStart, len); |
| } |
| // The last block. |
| else if (grpIdx == limitGrpIdx - 1) { |
| blockStart = 0; |
| blockLen = (start + len - 1) % grpBlockSize + 1; |
| } |
| // Other blocks. |
| else { |
| blockStart = 0; |
| blockLen = grpBlockSize; |
| } |
| |
| // Affinity for the first block in the group. |
| IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), |
| grpIdx * grpSize); |
| |
| Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Mapped key to nodes [key=" + key + ", nodes=" + F.nodeIds(affNodes) + |
| ", blockStart=" + blockStart + ", blockLen=" + blockLen + ']'); |
| |
| IgfsBlockLocation last = res.peekLast(); |
| |
| // Merge with previous affinity block location? |
| if (last != null && equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id()))) { |
| // Remove previous incomplete value. |
| res.removeLast(); |
| |
| // Update affinity block location with merged one. |
| splitBlocks(last.start(), last.length() + blockLen, maxLen, affNodes, res); |
| } |
| else |
| splitBlocks(grpIdx * grpBlockSize + blockStart, blockLen, maxLen, affNodes, res); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Calculated file affinity [info=" + info + ", start=" + start + ", len=" + len + |
| ", res=" + res + ']'); |
| } |
| |
| /** |
| * Split blocks according to maximum split length. |
| * |
| * @param start Start position. |
| * @param len Length. |
| * @param maxLen Maximum allowed length. |
| * @param nodes Affinity nodes. |
| * @param res Where to put results. |
| */ |
| private void splitBlocks(long start, long len, long maxLen, |
| Collection<ClusterNode> nodes, Collection<IgfsBlockLocation> res) { |
| if (maxLen > 0) { |
| long end = start + len; |
| |
| long start0 = start; |
| |
| while (start0 < end) { |
| long len0 = Math.min(maxLen, end - start0); |
| |
| res.add(new IgfsBlockLocationImpl(start0, len0, nodes)); |
| |
| start0 += len0; |
| } |
| } |
| else |
| res.add(new IgfsBlockLocationImpl(start, len, nodes)); |
| } |
| |
| /** |
| * Gets group block size (block size * group size). |
| * |
| * @return Group block size. |
| */ |
| public long groupBlockSize() { |
| return grpBlockSize; |
| } |
| |
| /** |
| * Check if two collections are equal as if they are lists (with respect to order). |
| * |
| * @param one First collection. |
| * @param two Second collection. |
| * @return {@code True} if equal. |
| */ |
| private boolean equal(Collection<UUID> one, Collection<UUID> two) { |
| if (one.size() != two.size()) |
| return false; |
| |
| Iterator<UUID> it1 = one.iterator(); |
| Iterator<UUID> it2 = two.iterator(); |
| |
| int size = one.size(); |
| |
| for (int i = 0; i < size; i++) { |
| if (!it1.next().equals(it2.next())) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * @param fileId File ID. |
| * @param node Node to process blocks on. |
| * @param blocks Blocks to put in cache. |
| * @throws IgniteCheckedException If batch processing failed. |
| */ |
| private void processBatch(IgniteUuid fileId, final ClusterNode node, |
| final Map<IgfsBlockKey, byte[]> blocks) throws IgniteCheckedException { |
| final long batchId = reqIdCtr.getAndIncrement(); |
| |
| final WriteCompletionFuture completionFut = pendingWrites.get(fileId); |
| |
| if (completionFut == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Missing completion future for file write request (most likely exception occurred " + |
| "which will be thrown upon stream close) [nodeId=" + node.id() + ", fileId=" + fileId + ']'); |
| |
| return; |
| } |
| |
| // Throw exception if future is failed in the middle of writing. |
| if (completionFut.isDone()) |
| completionFut.get(); |
| |
| completionFut.onWriteRequest(node.id(), batchId); |
| |
| final UUID nodeId = node.id(); |
| |
| if (!node.isLocal()) { |
| final IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks); |
| |
| try { |
| igfsCtx.send(nodeId, topic, msg, IGFS_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| completionFut.onError(nodeId, e); |
| } |
| } |
| else { |
| igfsCtx.runInIgfsThreadPool(new Runnable() { |
| @Override public void run() { |
| storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> fut) { |
| try { |
| fut.get(); |
| |
| completionFut.onWriteAck(nodeId, batchId); |
| } |
| catch (IgniteCheckedException e) { |
| completionFut.onError(nodeId, e); |
| } |
| } |
| }); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * If partial block write is attempted, both colocated and non-colocated keys are locked and data is appended |
| * to correct block. |
| * |
| * @param fileId File ID. |
| * @param colocatedKey Block key. |
| * @param startOff Data start offset within block. |
| * @param data Data to write. |
| * @param blockSize The block size. |
| * @throws IgniteCheckedException If update failed. |
| */ |
| private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff, |
| byte[] data, int blockSize) throws IgniteCheckedException { |
| // No affinity key present, just concat and return. |
| if (colocatedKey.affinityKey() == null) { |
| dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data)); |
| |
| return; |
| } |
| |
| // If writing from block beginning, just put and return. |
| if (startOff == 0) { |
| putBlock(blockSize, colocatedKey, data); |
| |
| return; |
| } |
| |
| // Create non-colocated key. |
| IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null, |
| colocatedKey.evictExclude(), colocatedKey.blockId()); |
| |
| try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { |
| // Lock keys. |
| Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key)); |
| |
| boolean hasVal = false; |
| |
| UpdateProcessor transformClos = new UpdateProcessor(startOff, data); |
| |
| if (vals.get(colocatedKey) != null) { |
| dataCachePrj.invoke(colocatedKey, transformClos); |
| |
| hasVal = true; |
| } |
| |
| if (vals.get(key) != null) { |
| dataCachePrj.invoke(key, transformClos); |
| |
| hasVal = true; |
| } |
| |
| if (!hasVal) |
| throw new IgniteCheckedException("Failed to write partial block (no previous data was found in cache) " + |
| "[key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff + |
| ", dataLen=" + data.length + ']'); |
| |
| tx.commit(); |
| } |
| } |
| |
| /** |
| * @param blocks Blocks to write. |
| * @return Future that will be completed after put is done. |
| */ |
| @SuppressWarnings("unchecked") |
| private IgniteInternalFuture<?> storeBlocksAsync(Map<IgfsBlockKey, byte[]> blocks) { |
| assert !blocks.isEmpty(); |
| return dataCachePrj.putAllAsync(blocks); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param blocksMsg Write request message. |
| */ |
| private void processBlocksMessage(final UUID nodeId, final IgfsBlocksMessage blocksMsg) { |
| storeBlocksAsync(blocksMsg.blocks()).listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> fut) { |
| IgniteCheckedException err = null; |
| |
| try { |
| fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| err = e; |
| } |
| |
| try { |
| // Send reply back to node. |
| igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), IGFS_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId + |
| ", fileId=" + blocksMsg.fileId() + ", batchId=" + blocksMsg.id() + ']', e); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param ackMsg Write acknowledgement message. |
| */ |
| @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
| private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) { |
| try { |
| ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e); |
| |
| return; |
| } |
| |
| IgniteUuid fileId = ackMsg.fileId(); |
| |
| WriteCompletionFuture fut = pendingWrites.get(fileId); |
| |
| if (fut != null) { |
| if (ackMsg.error() != null) |
| fut.onError(nodeId, ackMsg.error()); |
| else |
| fut.onWriteAck(nodeId, ackMsg.id()); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Received write acknowledgement for non-existent write future (most likely future was " + |
| "failed) [nodeId=" + nodeId + ", fileId=" + fileId + ']'); |
| } |
| } |
| |
| /** |
| * Creates block key based on block ID, file info and local affinity range. |
| * |
| * @param block Block ID. |
| * @param fileInfo File info being written. |
| * @param locRange Local affinity range to update. |
| * @return Block key. |
| */ |
| private IgfsBlockKey createBlockKey( |
| long block, |
| IgfsEntryInfo fileInfo, |
| IgfsFileAffinityRange locRange |
| ) { |
| // If affinityKey is present, return block key as is. |
| if (fileInfo.affinityKey() != null) |
| return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), block); |
| |
| // If range is done, no colocated writes are attempted. |
| if (locRange == null) |
| return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), block); |
| |
| long blockStart = block * fileInfo.blockSize(); |
| |
| // If block does not belong to new range, return old affinity key. |
| if (locRange.less(blockStart)) { |
| IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockStart, false); |
| |
| return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), block); |
| } |
| |
| if (!locRange.belongs(blockStart)) |
| locRange.expand(blockStart, fileInfo.blockSize()); |
| |
| return new IgfsBlockKey(fileInfo.id(), locRange.affinityKey(), fileInfo.evictExclude(), block); |
| } |
| |
| /** |
| * Abstract class to handle writes from different type of input data. |
| */ |
| private abstract class BlocksWriter<T> { |
| /** |
| * Stores data blocks read from abstracted source. |
| * |
| * @param fileInfo File info. |
| * @param reservedLen Reserved length. |
| * @param remainder Remainder. |
| * @param remainderLen Remainder length. |
| * @param src Source to read bytes. |
| * @param srcLen Data length to read from source. |
| * @param flush Flush flag. |
| * @param affinityRange Affinity range to update if file write can be colocated. |
| * @param batch Optional secondary file system worker batch. |
| * @throws IgniteCheckedException If failed. |
| * @return Data remainder if {@code flush} flag is {@code false}. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| @Nullable public byte[] storeDataBlocks( |
| IgfsEntryInfo fileInfo, |
| long reservedLen, |
| @Nullable byte[] remainder, |
| final int remainderLen, |
| T src, |
| int srcLen, |
| boolean flush, |
| IgfsFileAffinityRange affinityRange, |
| @Nullable IgfsFileWorkerBatch batch |
| ) throws IgniteCheckedException { |
| IgniteUuid id = fileInfo.id(); |
| int blockSize = fileInfo.blockSize(); |
| |
| int len = remainderLen + srcLen; |
| |
| if (len > reservedLen) |
| throw new IgfsException("Not enough space reserved to store data [id=" + id + |
| ", reservedLen=" + reservedLen + ", remainderLen=" + remainderLen + |
| ", data.length=" + srcLen + ']'); |
| |
| long start = reservedLen - len; |
| long first = start / blockSize; |
| long limit = (start + len + blockSize - 1) / blockSize; |
| int written = 0; |
| int remainderOff = 0; |
| |
| Map<IgfsBlockKey, byte[]> nodeBlocks = U.newLinkedHashMap((int)(limit - first)); |
| ClusterNode node = null; |
| int off = 0; |
| |
| for (long block = first; block < limit; block++) { |
| final long blockStartOff = block == first ? (start % blockSize) : 0; |
| final long blockEndOff = block == (limit - 1) ? (start + len - 1) % blockSize : (blockSize - 1); |
| |
| final long size = blockEndOff - blockStartOff + 1; |
| |
| assert size > 0 && size <= blockSize; |
| assert blockStartOff + size <= blockSize; |
| |
| final byte[] portion = new byte[(int)size]; |
| |
| // Data length to copy from remainder. |
| int portionOff = Math.min((int)size, remainderLen - remainderOff); |
| |
| if (remainderOff != remainderLen) { |
| U.arrayCopy(remainder, remainderOff, portion, 0, portionOff); |
| |
| remainderOff += portionOff; |
| } |
| |
| if (portionOff < size) |
| readData(src, portion, portionOff); |
| |
| // Will update range if necessary. |
| IgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange); |
| |
| ClusterNode primaryNode = dataCachePrj.cache().affinity().mapKeyToNode(key); |
| |
| if (block == first) { |
| off = (int)blockStartOff; |
| node = primaryNode; |
| } |
| |
| if (size == blockSize) { |
| assert blockStartOff == 0 : "Cannot write the whole block not from start position [start=" + |
| start + ", block=" + block + ", blockStartOff=" + blockStartOff + ", blockEndOff=" + |
| blockEndOff + ", size=" + size + ", first=" + first + ", limit=" + limit + ", blockSize=" + |
| blockSize + ']'; |
| } |
| else { |
| // If partial block is being written from the beginning and not flush, return it as remainder. |
| if (blockStartOff == 0 && !flush) { |
| assert written + portion.length == len; |
| |
| if (!nodeBlocks.isEmpty()) { |
| processBatch(id, node, nodeBlocks); |
| |
| igfsCtx.metrics().addWriteBlocks(1, 0); |
| } |
| |
| return portion; |
| } |
| } |
| |
| int writtenSecondary = 0; |
| |
| if (batch != null) { |
| if (!batch.write(portion)) |
| throw new IgniteCheckedException("Cannot write more data to the secondary file system output " + |
| "stream because it was marked as closed: " + batch.path()); |
| else |
| writtenSecondary = 1; |
| } |
| |
| assert primaryNode != null; |
| |
| int writtenTotal = 0; |
| |
| if (!primaryNode.id().equals(node.id())) { |
| if (!nodeBlocks.isEmpty()) |
| processBatch(id, node, nodeBlocks); |
| |
| writtenTotal = nodeBlocks.size(); |
| |
| nodeBlocks = U.newLinkedHashMap((int)(limit - first)); |
| node = primaryNode; |
| } |
| |
| assert size == portion.length; |
| |
| if (size != blockSize) { |
| // Partial writes must be always synchronous. |
| processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize); |
| |
| writtenTotal++; |
| } |
| else |
| nodeBlocks.put(key, portion); |
| |
| igfsCtx.metrics().addWriteBlocks(writtenTotal, writtenSecondary); |
| |
| written += portion.length; |
| } |
| |
| // Process final batch, if exists. |
| if (!nodeBlocks.isEmpty()) { |
| processBatch(id, node, nodeBlocks); |
| |
| igfsCtx.metrics().addWriteBlocks(nodeBlocks.size(), 0); |
| } |
| |
| assert written == len; |
| |
| return null; |
| } |
| |
| /** |
| * Fully reads data from specified source into the specified byte array. |
| * |
| * @param src Data source. |
| * @param dst Destination. |
| * @param dstOff Destination buffer offset. |
| * @throws IgniteCheckedException If read failed. |
| */ |
| protected abstract void readData(T src, byte[] dst, int dstOff) throws IgniteCheckedException; |
| } |
| |
| /** |
| * Byte buffer writer. |
| */ |
| private class ByteBufferBlocksWriter extends BlocksWriter<ByteBuffer> { |
| /** {@inheritDoc} */ |
| @Override protected void readData(ByteBuffer src, byte[] dst, int dstOff) { |
| src.get(dst, dstOff, dst.length - dstOff); |
| } |
| } |
| |
| /** |
| * Data input writer. |
| */ |
| private class DataInputBlocksWriter extends BlocksWriter<DataInput> { |
| /** {@inheritDoc} */ |
| @Override protected void readData(DataInput src, byte[] dst, int dstOff) |
| throws IgniteCheckedException { |
| try { |
| src.readFully(dst, dstOff, dst.length - dstOff); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| } |
| |
| /** |
| * Helper closure to update data in cache. |
| */ |
| @GridInternal |
| private static final class UpdateProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>, |
| Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Start position in the block to write new data from. */ |
| private int start; |
| |
| /** Data block to write into cache. */ |
| private byte[] data; |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| * |
| */ |
| public UpdateProcessor() { |
| // No-op. |
| } |
| |
| /** |
| * Constructs update data block closure. |
| * |
| * @param start Start position in the block to write new data from. |
| * @param data Data block to write into cache. |
| */ |
| private UpdateProcessor(int start, byte[] data) { |
| assert start >= 0; |
| assert data != null; |
| assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']'; |
| |
| this.start = start; |
| this.data = data; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args) { |
| byte[] e = entry.getValue(); |
| |
| final int size = data.length; |
| |
| if (e == null || e.length == 0) |
| e = new byte[start + size]; // Don't allocate more, then required. |
| else if (e.length < start + size) { |
| // Expand stored data array, if it less, then required. |
| byte[] tmp = new byte[start + size]; // Don't allocate more than required. |
| |
| U.arrayCopy(e, 0, tmp, 0, e.length); |
| |
| e = tmp; |
| } |
| |
| // Copy data into entry. |
| U.arrayCopy(data, 0, e, start, size); |
| |
| entry.setValue(e); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| out.writeInt(start); |
| U.writeByteArray(out, data); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException { |
| start = in.readInt(); |
| data = U.readByteArray(in); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length); |
| } |
| } |
| |
| /** |
| * Asynchronous delete worker. |
| */ |
| private class AsyncDeleteWorker extends GridWorker { |
| /** File info for stop request. */ |
| private final IgfsEntryInfo stopInfo; |
| |
| /** Delete requests queue. */ |
| private BlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo>> delReqs = |
| new LinkedBlockingQueue<>(); |
| |
| /** |
| * @param igniteInstanceName Ignite instance name. |
| * @param name Worker name. |
| * @param log Log. |
| */ |
| protected AsyncDeleteWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) { |
| super(igniteInstanceName, name, log); |
| |
| stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid()); |
| } |
| |
| /** |
| * Gracefully stops worker by adding STOP_INFO to queue. |
| */ |
| private void stop() { |
| delReqs.offer(F.t(new GridFutureAdapter<>(), stopInfo)); |
| } |
| |
| /** |
| * @param info File info to delete. |
| * @return Future which completes when entry is actually removed. |
| */ |
| private IgniteInternalFuture<Object> deleteAsync(IgfsEntryInfo info) { |
| GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); |
| |
| delReqs.offer(F.t(fut, info)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("ConstantConditions") |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| try { |
| while (!isCancelled()) { |
| IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo> req = delReqs.take(); |
| |
| GridFutureAdapter<Object> fut = req.get1(); |
| IgfsEntryInfo fileInfo = req.get2(); |
| |
| // Identity check. |
| if (fileInfo == stopInfo) { |
| fut.onDone(); |
| |
| break; |
| } |
| |
| IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataStreamer(); |
| |
| try { |
| IgfsFileMap map = fileInfo.fileMap(); |
| |
| for (long block = 0, size = fileInfo.blocksCount(); block < size; block++) { |
| IgniteUuid affKey = map == null ? null : map.affinityKey(block * fileInfo.blockSize(), true); |
| |
| ldr.removeData(new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), |
| block)); |
| |
| if (affKey != null) |
| ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), |
| block)); |
| } |
| } |
| catch (IgniteInterruptedException ignored) { |
| // Ignore interruption during shutdown. |
| } |
| catch (IgniteException e) { |
| log.error("Failed to remove file contents: " + fileInfo, e); |
| } |
| finally { |
| try { |
| IgniteUuid fileId = fileInfo.id(); |
| |
| for (long block = 0, size = fileInfo.blocksCount(); block < size; block++) |
| ldr.removeData(new IgfsBlockKey(fileId, fileInfo.affinityKey(), |
| fileInfo.evictExclude(), block)); |
| } |
| catch (IgniteException e) { |
| log.error("Failed to remove file contents: " + fileInfo, e); |
| } |
| finally { |
| try { |
| ldr.close(isCancelled()); |
| } |
| catch (IgniteException e) { |
| log.error("Failed to stop data streamer while shutting down " + |
| "igfs async delete thread.", e); |
| } |
| finally { |
| fut.onDone(); // Complete future. |
| } |
| } |
| } |
| } |
| } |
| finally { |
| if (log.isDebugEnabled()) |
| log.debug("Stopping asynchronous igfs file delete thread: " + name()); |
| |
| IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo> req = delReqs.poll(); |
| |
| while (req != null) { |
| req.get1().onCancelled(); |
| |
| req = delReqs.poll(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Allows output stream to await for all current acks. |
| * |
| * @param fileId File ID. |
| * @throws IgniteInterruptedCheckedException In case of interrupt. |
| */ |
| void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException { |
| WriteCompletionFuture fut = pendingWrites.get(fileId); |
| |
| if (fut != null) |
| fut.awaitAllAcksReceived(); |
| } |
| |
| /** |
| * Future that is completed when all participating |
| */ |
| private class WriteCompletionFuture extends GridFutureAdapter<Boolean> { |
| /** File id to remove future from map. */ |
| private final IgniteUuid fileId; |
| |
| /** Pending acks. */ |
| private final ConcurrentMap<Long, UUID> ackMap = new ConcurrentHashMap<>(); |
| |
| /** Lock for map-related conditions. */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Condition to wait for empty map. */ |
| private final Condition allAcksRcvCond = lock.newCondition(); |
| |
| /** Flag indicating future is waiting for last ack. */ |
| private volatile boolean awaitingLast; |
| |
| /** |
| * @param fileId File id. |
| */ |
| private WriteCompletionFuture(IgniteUuid fileId) { |
| assert fileId != null; |
| |
| this.fileId = fileId; |
| } |
| |
| /** |
| * Await all pending data blockes to be acked. |
| * |
| * @throws IgniteInterruptedCheckedException In case of interrupt. |
| */ |
| public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException { |
| lock.lock(); |
| |
| try { |
| while (!ackMap.isEmpty()) |
| U.await(allAcksRcvCond); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { |
| if (!isDone()) { |
| pendingWrites.remove(fileId, this); |
| |
| if (super.onDone(res, err)) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Write request will be asynchronously executed on node with given ID. |
| * |
| * @param nodeId Node ID. |
| * @param batchId Assigned batch ID. |
| */ |
| private void onWriteRequest(UUID nodeId, long batchId) { |
| if (!isDone()) { |
| UUID pushedOut = ackMap.putIfAbsent(batchId, nodeId); |
| |
| assert pushedOut == null; |
| } |
| } |
| |
| /** |
| * Answers if there are some batches for the specified node we're currently waiting acks for. |
| * |
| * @param nodeId The node Id. |
| * @return If there are acks awaited from this node. |
| */ |
| private boolean hasPendingAcks(UUID nodeId) { |
| assert nodeId != null; |
| |
| for (Map.Entry<Long, UUID> e : ackMap.entrySet()) |
| if (nodeId.equals(e.getValue())) |
| return true; |
| |
| return false; |
| } |
| |
| /** |
| * Error occurred on node with given ID. |
| * |
| * @param nodeId Node ID. |
| * @param e Caught exception. |
| */ |
| private void onError(UUID nodeId, IgniteCheckedException e) { |
| // If waiting for ack from this node. |
| if (hasPendingAcks(nodeId)) { |
| ackMap.clear(); |
| |
| signalNoAcks(); |
| |
| if (e.hasCause(IgfsOutOfSpaceException.class)) |
| onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e)); |
| else |
| onDone(new IgniteCheckedException( |
| "Failed to wait for write completion (write failed on node): " + nodeId, e)); |
| } |
| } |
| |
| /** |
| * Write ack received from node with given ID for given batch ID. |
| * |
| * @param nodeId Node ID. |
| * @param batchId Batch ID. |
| */ |
| private void onWriteAck(UUID nodeId, long batchId) { |
| if (!isDone()) { |
| boolean rmv = ackMap.remove(batchId, nodeId); |
| |
| assert rmv : "Received acknowledgement message for not registered batch [nodeId=" + |
| nodeId + ", batchId=" + batchId + ']'; |
| |
| if (ackMap.isEmpty()) { |
| signalNoAcks(); |
| |
| if (awaitingLast) |
| onDone(true); |
| } |
| } |
| } |
| |
| /** |
| * Signal that currenlty there are no more pending acks. |
| */ |
| private void signalNoAcks() { |
| lock.lock(); |
| |
| try { |
| allAcksRcvCond.signalAll(); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Marks this future as waiting last ack. |
| */ |
| private void markWaitingLastAck() { |
| awaitingLast = true; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Marked write completion future as awaiting last ack: " + fileId); |
| |
| if (ackMap.isEmpty()) |
| onDone(true); |
| } |
| } |
| } |