blob: e821b80641b4d9ae58c9927b9ea4845363dd7d6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsConcurrentModificationException;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
import org.apache.ignite.igfs.IgfsPathIsDirectoryException;
import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaUnlockCallable;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingAddProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRemoveProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRenameProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingReplaceProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileUnlockProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePropertiesProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
/**
* Cache based structure (meta data) manager.
*/
public class IgfsMetaManager extends IgfsManager {
/** Comparator for Id sorting. */
private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR = new Comparator<IgniteUuid>() {
@Override public int compare(IgniteUuid u1, IgniteUuid u2) {
if (u1 == u2)
return 0;
if (u1 == null)
return -1;
return u1.compareTo(u2);
}
};
/** IGFS configuration. */
private FileSystemConfiguration cfg;
/** Metadata cache. */
private IgniteInternalCache<Object, Object> metaCache;
/** */
private CountDownLatch metaCacheStartLatch;
/** File ID to file info projection. */
private IgniteInternalCache<IgniteUuid, IgfsEntryInfo> id2InfoPrj;
/** Predefined key for sampling mode value. */
private GridCacheInternal sampling;
/** Logger. */
private IgniteLogger log;
/** Delete worker. */
private volatile IgfsDeleteWorker delWorker;
/** Events manager. */
private GridEventStorageManager evts;
/** Local node. */
private ClusterNode locNode;
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Relaxed flag. */
private final boolean relaxed;
/** Client flag. */
private final boolean client;
/** Compute facade for client tasks. */
private IgniteCompute cliCompute;
/** Compute facade for client tasks. */
private String metaCacheName;
/**
* Constructor.
*
* @param relaxed Relaxed mode flag.
* @param client Client flag.
*/
public IgfsMetaManager(boolean relaxed, boolean client) {
this.relaxed = relaxed;
this.client = client;
}
/**
* Await initialization.
*/
void awaitInit() {
try {
metaCacheStartLatch.await();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
metaCacheStartLatch = new CountDownLatch(1);
cfg = igfsCtx.configuration();
evts = igfsCtx.kernalContext().event();
sampling = new IgfsSamplingKey(cfg.getName());
log = igfsCtx.kernalContext().log(IgfsMetaManager.class);
metaCacheName = cfg.getMetaCacheConfiguration().getName();
}
/** {@inheritDoc} */
@SuppressWarnings("RedundantCast")
@Override protected void onKernalStart0() throws IgniteCheckedException {
metaCache = igfsCtx.kernalContext().cache().getOrStartCache(metaCacheName);
assert metaCache != null;
igfsCtx.kernalContext().cache().internalCache(metaCacheName).preloader().startFuture()
.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> f) {
metaCacheStartLatch.countDown();
}
});
id2InfoPrj = (IgniteInternalCache<IgniteUuid, IgfsEntryInfo>)metaCache.<IgniteUuid, IgfsEntryInfo>cache();
locNode = igfsCtx.kernalContext().discovery().localNode();
// Start background delete worker.
if (!client) {
delWorker = new IgfsDeleteWorker(igfsCtx);
delWorker.start();
}
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
IgfsDeleteWorker delWorker0 = delWorker;
if (delWorker0 != null) {
delWorker0.cancel();
try {
U.join(delWorker0);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}
}
busyLock.block();
}
/**
* @return Client flag.
*/
boolean isClient() {
return client;
}
/**
* Run client task.
*
* @param task Task.
* @return Result.
*/
<T> T runClientTask(IgfsClientAbstractCallable<T> task) {
try {
return (cfg.isColocateMetadata()) ?
clientCompute().affinityCall(metaCacheName, IgfsUtils.ROOT_ID, task) :
clientCompute().call(task);
}
catch (Exception e) {
if (X.hasCause(e, ClusterTopologyException.class))
throw new IgfsException("Failed to execute operation because there are no IGFS metadata nodes." , e);
IgfsException igfsEx = X.cause(e, IgfsException.class);
if (igfsEx != null)
throw igfsEx;
throw e;
}
}
/**
* Get compute facade for client tasks.
*
* @return Compute facade.
*/
private IgniteCompute clientCompute() {
assert client;
IgniteCompute cliCompute0 = cliCompute;
if (cliCompute0 == null) {
IgniteEx ignite = igfsCtx.kernalContext().grid();
ClusterGroup cluster = ignite.cluster().forIgfsMetadataDataNodes(cfg.getName(), metaCacheName);
cliCompute0 = ignite.compute(cluster);
cliCompute = cliCompute0;
}
assert cliCompute0 != null;
return cliCompute0;
}
/**
* Gets file ID for specified path.
*
* @param path Path.
* @return File ID for specified path or {@code null} if such file doesn't exist.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgniteUuid fileId(IgfsPath path) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
return fileId(path, false);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file ID because Grid is stopping: " + path);
}
/**
* Gets file ID for specified path possibly skipping existing transaction.
*
* @param path Path.
* @param skipTx Whether to skip existing transaction.
* @return File ID for specified path or {@code null} if such file doesn't exist.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgniteUuid fileId(IgfsPath path, boolean skipTx) throws IgniteCheckedException {
List<IgniteUuid> ids = fileIds(path, skipTx);
assert ids != null && !ids.isEmpty() : "Invalid file IDs [path=" + path + ", ids=" + ids + ']';
return ids.get(ids.size() - 1);
}
/**
* Gets file ID by its name from parent directory listing.
*
* @param parentId Parent directory ID to get child ID for.
* @param fileName File name in parent listing to get file ID for.
* @return File ID.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgniteUuid fileId(IgniteUuid parentId, String fileName) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return fileId(parentId, fileName, false);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file ID because Grid is stopping [parentId=" + parentId +
", fileName=" + fileName + ']');
}
/**
* Gets file ID by its name from parent directory listing possibly skipping existing transaction.
*
* @param parentId Parent directory ID to get child ID for.
* @param fileName File name in parent listing to get file ID for.
* @param skipTx Whether to skip existing transaction.
* @return File ID.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx)
throws IgniteCheckedException {
IgfsListingEntry entry = directoryListing(parentId, skipTx).get(fileName);
if (entry == null) {
if (log.isDebugEnabled())
log.debug("Missing file ID [parentId=" + parentId + ", fileName=" + fileName + ']');
return null;
}
return entry.fileId();
}
/**
* Gets all file IDs for components of specified path. Result cannot be empty - there is at least root element.
* But each element (except the first) can be {@code null} if such files don't exist.
*
* @param path Path.
* @return Collection of file IDs for components of specified path.
* @throws IgniteCheckedException If failed.
*/
public List<IgniteUuid> fileIds(IgfsPath path) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
return fileIds(path, false);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
}
/**
* Gets all file IDs for components of specified path. Result cannot be empty - there is at least root element.
* But each element (except the first) can be {@code null} if such files don't exist.
*
* @param path Path.
* @return Collection of file IDs for components of specified path.
* @throws IgniteCheckedException If failed.
*/
public IgfsPathIds pathIds(IgfsPath path) throws IgniteCheckedException {
// Prepare parts.
String[] components = path.componentsArray();
String[] parts = new String[components.length + 1];
System.arraycopy(components, 0, parts, 1, components.length);
// Get IDs.
if (client) {
List<IgniteUuid> ids = runClientTask(new IgfsClientMetaIdsForPathCallable(cfg.getName(),
IgfsUserContext.currentUser(), path));
return new IgfsPathIds(path, parts, ids.toArray(new IgniteUuid[ids.size()]));
}
else {
if (busyLock.enterBusy()) {
try {
validTxState(false);
IgniteUuid[] ids = new IgniteUuid[parts.length];
ids[0] = IgfsUtils.ROOT_ID;
for (int i = 1; i < ids.length; i++) {
IgniteUuid id = fileId(ids[i - 1], parts[i], false);
if (id != null)
ids[i] = id;
else
break;
}
// Return.
return new IgfsPathIds(path, parts, ids);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
}
}
/**
* Gets all file IDs for components of specified path possibly skipping existing transaction. Result cannot
* be empty - there is at least root element. But each element (except the first) can be {@code null} if such
* files don't exist.
*
* @param path Path.
* @param skipTx Whether to skip existing transaction.
* @return Collection of file IDs for components of specified path.
* @throws IgniteCheckedException If failed.
*/
private List<IgniteUuid> fileIds(IgfsPath path, boolean skipTx) throws IgniteCheckedException {
assert path != null;
// Path components.
Collection<String> components = path.components();
// Collection of file IDs for components of specified path.
List<IgniteUuid> ids = new ArrayList<>(components.size() + 1);
ids.add(IgfsUtils.ROOT_ID); // Always add root ID.
IgniteUuid fileId = IgfsUtils.ROOT_ID;
for (String s : components) {
assert !s.isEmpty();
if (fileId != null)
fileId = fileId(fileId, s, skipTx);
ids.add(fileId);
}
return ids;
}
/**
* Ensure that entry with the given ID exists in meta cache.
*
* @param fileId File id.
* @return {@code True} in case such entry exists.
* @throws IgniteCheckedException IF failed.
*/
public boolean exists(IgniteUuid fileId) throws IgniteCheckedException{
if (busyLock.enterBusy()) {
try {
assert fileId != null;
// containsKey() doesn't work here since meta cache can be PARTITIONED (we do not restrict if!).
return info(fileId) != null;
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to check file system entry existence because Grid is stopping: " +
fileId);
}
/**
* Gets file info by its ID.
* NB: this method is used both in Tx and out of Tx.
*
* @param fileId File ID to get details for.
* @return File info.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgfsEntryInfo info(@Nullable IgniteUuid fileId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
if (fileId == null)
return null;
IgfsEntryInfo info = getInfo(fileId);
// Force root ID always exist in cache.
if (info == null && IgfsUtils.ROOT_ID.equals(fileId))
info = createSystemDirectoryIfAbsent(fileId);
return info;
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file info because Grid is stopping: " + fileId);
}
/**
* Gets files details by their IDs.
*
* @param fileIds file IDs to get details for.
* @return Files details.
* @throws IgniteCheckedException If failed.
*/
public Map<IgniteUuid, IgfsEntryInfo> infos(Collection<IgniteUuid> fileIds) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
assert fileIds != null;
if (F.isEmpty(fileIds))
return Collections.emptyMap();
Map<IgniteUuid, IgfsEntryInfo> map = getInfos(fileIds);
// Force root ID always exist in cache.
if (fileIds.contains(IgfsUtils.ROOT_ID) && !map.containsKey(IgfsUtils.ROOT_ID)) {
map = new GridLeanMap<>(map);
map.put(IgfsUtils.ROOT_ID, createSystemDirectoryIfAbsent(IgfsUtils.ROOT_ID));
}
return map;
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file infos because Grid is stopping: " + fileIds);
}
/**
* Lock the file explicitly outside of transaction.
*
* @param fileId File ID to lock.
* @param del If file is being locked for delete.
* @return Locked file info or {@code null} if file cannot be locked or doesn't exist.
* @throws IgniteCheckedException If the file with such id does not exist, or on another failure.
*/
public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean del) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
assert fileId != null;
try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
return null;
if (oldInfo.lockId() != null)
return null; // The file is already locked, we cannot lock it.
IgfsEntryInfo newInfo = invokeLock(fileId, del);
tx.commit();
return newInfo;
}
catch (GridClosureException e) {
throw U.cast(e);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to obtain lock because Grid is stopping: " + fileId);
}
/**
* Create file lock ID.
*
* @param del If lock ID is required for file deletion.
* @return Lock ID.
*/
private IgniteUuid createFileLockId(boolean del) {
if (del)
return IgfsUtils.DELETE_LOCK_ID;
return IgniteUuid.fromUuid(locNode.id());
}
/**
* Remove explicit lock on file held by the current stream.
*
* @param fileId File ID.
* @param lockId Lock ID.
* @param modificationTime Modification time to write to file info.
* @throws IgniteCheckedException If failed.
*/
public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime)
throws IgniteCheckedException {
unlock(fileId, lockId, modificationTime, false, 0, null);
}
/**
* Remove explicit lock on file held by the current stream.
*
* @param fileId File ID.
* @param lockId Lock ID.
* @param modificationTime Modification time to write to file info.
* @param updateSpace Whether to update space.
* @param space Space.
* @param affRange Affinity range.
* @throws IgniteCheckedException If failed.
*/
public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime,
final boolean updateSpace, final long space, @Nullable final IgfsFileAffinityRange affRange)
throws IgniteCheckedException {
if(client) {
runClientTask(new IgfsClientMetaUnlockCallable(cfg.getName(), IgfsUserContext.currentUser(), fileId,
lockId, modificationTime, updateSpace, space, affRange));
return;
}
validTxState(false);
if (busyLock.enterBusy()) {
try {
if (lockId == null)
return;
// Temporary clear interrupted state for unlocking.
final boolean interrupted = Thread.interrupted();
try {
IgfsUtils.doInTransactionWithRetries(id2InfoPrj, new IgniteOutClosureX<Void>() {
@Override public Void applyx() throws IgniteCheckedException {
validTxState(true);
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " +
"found): " + fileId));
if (!F.eq(lockId, oldInfo.lockId()))
throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " +
"[fileId=" + fileId + ", lockId=" + lockId + ", actualLockId=" +
oldInfo.lockId() + ']');
id2InfoPrj.invoke(fileId,
new IgfsMetaFileUnlockProcessor(modificationTime, updateSpace, space, affRange));
return null;
}
});
}
finally {
validTxState(false);
if (interrupted)
Thread.currentThread().interrupt();
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + fileId);
}
/**
* Lock file IDs participating in the transaction.<br/>
*
* @param fileIds file IDs to lock.
* @return Locked file details. Resulting map doesn't contain details for not-existent files.
* @throws IgniteCheckedException If failed.
*/
private Map<IgniteUuid, IgfsEntryInfo> lockIds(IgniteUuid... fileIds) throws IgniteCheckedException {
validTxState(true);
assert fileIds != null && fileIds.length > 0;
Arrays.sort(fileIds);
return lockIds(Arrays.asList(fileIds));
}
/**
* Answers if the collection is sorted.
*
* @param col The collection to check.
* @param <T> The type of the collection elements.
* @return If the collection sorted.
*/
private static <T extends Comparable<T>> boolean isSorted(Collection<T> col) {
T prev = null;
for (T t: col) {
if (t == null)
throw new NullPointerException("Collections should not contain nulls");
if (prev != null && prev.compareTo(t) > 0)
return false; // disordered.
prev = t;
}
return true;
}
/**
* Lock file IDs.
*
* @param fileIds File IDs (sorted).
* @return Map with lock info.
* @throws IgniteCheckedException If failed.
*/
private Map<IgniteUuid, IgfsEntryInfo> lockIds(Collection<IgniteUuid> fileIds) throws IgniteCheckedException {
assert isSorted(fileIds);
validTxState(true);
if (log.isDebugEnabled())
log.debug("Locking file ids: " + fileIds);
// Lock files and get their infos.
Map<IgniteUuid, IgfsEntryInfo> map = getInfos(fileIds);
if (log.isDebugEnabled())
log.debug("Locked file ids: " + fileIds);
for (IgniteUuid fileId : fileIds) {
if (IgfsUtils.isRootOrTrashId(fileId)) {
if (!map.containsKey(fileId))
map.put(fileId, createSystemDirectoryIfAbsent(fileId));
}
}
// Returns detail's map for locked IDs.
return map;
}
/**
* create system entry if it is absent.
*
* @param id System entry ID.
* @return Value of created or existing system entry.
* @throws IgniteCheckedException On error.
*/
private IgfsEntryInfo createSystemDirectoryIfAbsent(IgniteUuid id)
throws IgniteCheckedException {
assert IgfsUtils.isRootOrTrashId(id);
IgfsEntryInfo info = IgfsUtils.createDirectory(id);
IgfsEntryInfo oldInfo = id2InfoPrj.getAndPutIfAbsent(id, info);
if (oldInfo != null)
info = oldInfo;
return info;
}
/**
* List child files for specified file ID.
*
* @param fileId File to list child files for.
* @return Directory listing for the specified file.
* @throws IgniteCheckedException If failed.
*/
public Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return directoryListing(fileId, false);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get directory listing because Grid is stopping: " + fileId);
}
/**
* Gets first available file info for fragmentizer.
*
* @param exclude File IDs to exclude from result.
* @return First qualified file info.
* @throws IgniteCheckedException If failed to get file for fragmentizer.
*/
public IgfsEntryInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return fileForFragmentizer0(IgfsUtils.ROOT_ID, exclude);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get file for framentizer because Grid is stopping.");
}
/**
* Gets first available file info for fragmentizer.
*
* @param parentId Parent ID to scan.
* @param exclude File IDs to exclude from result.
* @return First qualified file info.
* @throws IgniteCheckedException If failed to get file for fragmentizer.
*/
private IgfsEntryInfo fileForFragmentizer0(IgniteUuid parentId, Collection<IgniteUuid> exclude)
throws IgniteCheckedException {
IgfsEntryInfo info = info(parentId);
// Check if file was concurrently deleted.
if (info == null)
return null;
assert info.isDirectory();
Map<String, IgfsListingEntry> listing = info.listing();
for (IgfsListingEntry entry : listing.values()) {
if (entry.isFile()) {
IgfsEntryInfo fileInfo = info(entry.fileId());
if (fileInfo != null) {
if (!exclude.contains(fileInfo.id()) &&
fileInfo.fileMap() != null &&
!fileInfo.fileMap().ranges().isEmpty())
return fileInfo;
}
}
else {
IgfsEntryInfo fileInfo = fileForFragmentizer0(entry.fileId(), exclude);
if (fileInfo != null)
return fileInfo;
}
}
return null;
}
/**
* List child files for specified file ID possibly skipping existing transaction.
*
* @param fileId File to list child files for.
* @param skipTx Whether to skip existing transaction.
* @return Directory listing for the specified file.*
* @throws IgniteCheckedException If failed.
*/
private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx)
throws IgniteCheckedException {
assert fileId != null;
IgfsEntryInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) :
getInfo(fileId);
return info == null ? Collections.<String, IgfsListingEntry>emptyMap() : info.listing();
}
/**
* Add file into file system structure. Do not create new transaction expecting that the one already exists.
*
* @param parentId Parent file ID.
* @param fileName File name in the parent's listing.
* @param newFileInfo File info to store in the parent's listing.
* @return File id already stored in meta cache or {@code null} if passed file info was stored.
* @throws IgniteCheckedException If failed.
*/
private IgniteUuid putIfAbsentNonTx(IgniteUuid parentId, String fileName, IgfsEntryInfo newFileInfo)
throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Locking parent id [parentId=" + parentId + ", fileName=" + fileName + ", newFileInfo=" +
newFileInfo + ']');
validTxState(true);
// Lock only parent file ID.
IgfsEntryInfo parentInfo = info(parentId);
if (parentInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to lock parent directory (not found): " +
parentId));
if (!parentInfo.isDirectory())
throw fsException(new IgfsPathIsNotDirectoryException("Parent file is not a directory: " + parentInfo));
IgfsListingEntry childEntry = parentInfo.listing().get(fileName);
if (childEntry != null)
return childEntry.fileId();
createNewEntry(newFileInfo, parentId, fileName);
return null;
}
/**
* Move routine.
*
* @param srcPath Source path.
* @param dstPath Destination path.
* @throws IgniteCheckedException In case of exception.
*/
public void move(IgfsPath srcPath, IgfsPath dstPath) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
// Prepare path IDs.
IgfsPathIds srcPathIds = pathIds(srcPath);
IgfsPathIds dstPathIds = pathIds(dstPath);
// Source path must exists.
if (!srcPathIds.allExists())
throw new IgfsPathNotFoundException("Failed to perform move because source path is not " +
"found: " + srcPath);
// At this point we need to understand name of resulting entry. It will be either destination leaf
// or source leaf depending on existence.
String dstName;
if (dstPathIds.lastExists())
// Full destination path exists -> use source name.
dstName = srcPathIds.lastPart();
else {
if (dstPathIds.lastParentExists()) {
// Destination path doesn't exists -> use destination name.
dstName = dstPathIds.lastPart();
dstPathIds = dstPathIds.parent();
}
else
// Destination parent is not found either -> exception.
throw new IgfsPathNotFoundException("Failed to perform move because destination path is not " +
"found: " + dstPath.parent());
}
// Lock participating IDs.
final Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
srcPathIds.addExistingIds(lockIds, relaxed);
dstPathIds.addExistingIds(lockIds, relaxed);
try (GridNearTxLocal tx = startTx()) {
// Obtain the locks.
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
// Verify integrity of source and destination paths.
if (!srcPathIds.verifyIntegrity(lockInfos, relaxed))
throw new IgfsPathNotFoundException("Failed to perform move because source directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
if (!dstPathIds.verifyIntegrity(lockInfos, relaxed))
throw new IgfsPathNotFoundException("Failed to perform move because destination directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
// Addiional check: is destination directory?
IgfsEntryInfo dstParentInfo = lockInfos.get(dstPathIds.lastId());
if (dstParentInfo.isFile())
throw new IgfsPathAlreadyExistsException("Failed to perform move because destination points " +
"to existing file [src=" + srcPath + ", dst=" + dstPath + ']');
// Additional check: does destination already has child with the same name?
if (dstParentInfo.hasChild(dstName))
throw new IgfsPathAlreadyExistsException("Failed to perform move because destination already " +
"contains entry with the same name existing file [src=" + srcPath +
", dst=" + dstPath + ']');
// Actual move: remove from source parent and add to destination target.
IgfsEntryInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId());
IgfsEntryInfo srcInfo = lockInfos.get(srcPathIds.lastId());
String srcName = srcPathIds.lastPart();
IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcName);
transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName);
tx.commit();
// Fire events.
IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), srcPath, newPath,
srcInfo.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to perform move because Grid is stopping [srcPath=" +
srcPath + ", dstPath=" + dstPath + ']');
}
/**
* Move or rename file in existing transaction.
*
* @param fileId File ID to move or rename.
* @param srcFileName Original file name in the parent's listing.
* @param srcParentId Parent directory ID.
* @param destFileName New file name in the parent's listing after moving.
* @param destParentId New parent directory ID.
* @throws IgniteCheckedException If failed.
*/
private void moveNonTx(IgniteUuid fileId, String srcFileName, IgniteUuid srcParentId, String destFileName,
IgniteUuid destParentId) throws IgniteCheckedException {
validTxState(true);
assert fileId != null;
assert srcFileName != null;
assert srcParentId != null;
assert destFileName != null;
assert destParentId != null;
if (srcParentId.equals(destParentId) && srcFileName.equals(destFileName)) {
if (log.isDebugEnabled())
log.debug("File is moved to itself [fileId=" + fileId +
", fileName=" + srcFileName + ", parentId=" + srcParentId + ']');
return; // File is moved to itself.
}
// Lock file ID and parent IDs for this transaction.
Map<IgniteUuid, IgfsEntryInfo> infoMap = lockIds(srcParentId, fileId, destParentId);
IgfsEntryInfo srcInfo = infoMap.get(srcParentId);
if (srcInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to lock source directory (not found?)" +
" [srcParentId=" + srcParentId + ']'));
if (!srcInfo.isDirectory())
throw fsException(new IgfsPathIsNotDirectoryException("Source is not a directory: " + srcInfo));
IgfsEntryInfo destInfo = infoMap.get(destParentId);
if (destInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to lock destination directory (not found?)" +
" [destParentId=" + destParentId + ']'));
if (!destInfo.isDirectory())
throw fsException(new IgfsPathIsNotDirectoryException("Destination is not a directory: " + destInfo));
IgfsEntryInfo fileInfo = infoMap.get(fileId);
if (fileInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to lock target file (not found?) [fileId=" +
fileId + ']'));
IgfsListingEntry srcEntry = srcInfo.listing().get(srcFileName);
// If source file does not exist or was re-created.
if (srcEntry == null || !srcEntry.fileId().equals(fileId))
throw fsException(new IgfsPathNotFoundException("Failed to remove file name from the source directory" +
" (file not found) [fileId=" + fileId + ", srcFileName=" + srcFileName +
", srcParentId=" + srcParentId + ", srcEntry=" + srcEntry + ']'));
// If stored file already exist.
if (destInfo.hasChild(destFileName))
throw fsException(new IgfsPathAlreadyExistsException("Failed to add file name into the destination " +
" directory (file already exists) [fileId=" + fileId + ", destFileName=" + destFileName +
", destParentId=" + destParentId + ']'));
transferEntry(srcEntry, srcParentId, srcFileName, destParentId, destFileName);
}
/**
* Deletes (moves to TRASH) all elements under the root folder.
*
* @return The new Id if the artificially created folder containing all former root
* elements moved to TRASH folder.
* @throws IgniteCheckedException On error.
*/
@SuppressWarnings("RedundantCast")
IgniteUuid format() throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
IgniteUuid trashId = IgfsUtils.randomTrashId();
try (GridNearTxLocal tx = startTx()) {
// NB: We may lock root because its id is less than any other id:
final IgfsEntryInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID);
assert rootInfo != null;
Map<String, IgfsListingEntry> rootListingMap = rootInfo.listing();
assert rootListingMap != null;
if (rootListingMap.isEmpty())
return null; // Root is empty, nothing to do.
// Construct new info and move locked entries from root to it.
Map<String, IgfsListingEntry> transferListing = new HashMap<>(rootListingMap);
IgfsEntryInfo newInfo = IgfsUtils.createDirectory(
IgniteUuid.randomUuid(),
transferListing,
(Map<String, String>) null
);
createNewEntry(newInfo, trashId, newInfo.id().toString());
// Remove listing entries from root.
// Note that root directory properties and other attributes are preserved:
id2InfoPrj.put(IgfsUtils.ROOT_ID, rootInfo.listing(null));
tx.commit();
signalDeleteWorker();
return newInfo.id();
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to perform format because Grid is stopping.");
}
/**
* Whether operation must be re-tried because we have suspicious links which may broke secondary file system
* consistency.
*
* @param pathIds Path IDs.
* @param lockInfos Lock infos.
* @return Whether to re-try.
*/
private static boolean isRetryForSecondary(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos) {
// We need to ensure that the last locked info is not linked with expected child.
// Otherwise there was some concurrent file system update and we have to re-try.
// That is, the following situation lead to re-try:
// 1) We queried path /A/B/C
// 2) Returned IDs are ROOT_ID, A_ID, B_ID, null
// 3) But B's info contains C as child. It mean's that
if (!pathIds.allExists()) {
// Find the last locked index
IgfsEntryInfo lastLockedInfo = null;
int lastLockedIdx = -1;
while (lastLockedIdx < pathIds.lastExistingIndex()) {
IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
if (nextInfo != null) {
lastLockedInfo = nextInfo;
lastLockedIdx++;
}
else
break;
}
assert lastLockedIdx < pathIds.count();
if (lastLockedInfo != null) {
String part = pathIds.part(lastLockedIdx + 1);
if (lastLockedInfo.listing().containsKey(part))
return true;
}
}
return false;
}
/**
* Move path to the trash directory.
*
* @param path Path.
* @param recursive Recursive flag.
* @param secondaryFs Secondary file system (optional).
* @return ID of an entry located directly under the trash directory.
* @throws IgniteCheckedException If failed.
*/
IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive,
@Nullable IgfsSecondaryFileSystem secondaryFs) throws IgniteCheckedException {
while (true) {
if (busyLock.enterBusy()) {
try {
validTxState(false);
IgfsPathIds pathIds = pathIds(path);
if (!pathIds.allExists() && secondaryFs == null)
return new IgfsDeleteResult(false, null);
IgniteUuid victimId = pathIds.lastId();
String victimName = pathIds.lastPart();
if (IgfsUtils.isRootId(victimId))
throw new IgfsException("Cannot remove root directory");
// Prepare IDs to lock.
SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
pathIds.addExistingIds(allIds, relaxed);
IgniteUuid trashId = IgfsUtils.randomTrashId();
allIds.add(trashId);
try (GridNearTxLocal tx = startTx()) {
// Lock participants.
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos))
continue;
// Ensure that all participants are still in place.
if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed)) {
// For DUAL mode we will try to update the underlying FS still. Note we do that inside TX.
if (secondaryFs != null) {
boolean res = secondaryFs.delete(path, recursive);
return new IgfsDeleteResult(res, null);
}
else
return new IgfsDeleteResult(false, null);
}
IgfsEntryInfo victimInfo = lockInfos.get(victimId);
// Cannot delete non-empty directory if recursive flag is not set.
if (!recursive && victimInfo.hasChildren())
throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
"empty and recursive flag is not set).");
// Prepare trash data.
IgfsEntryInfo trashInfo = lockInfos.get(trashId);
final String trashName = IgfsUtils.composeNameForTrash(path, victimId);
assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
"destination directory (file already exists) [destName=" + trashName + ']';
IgniteUuid parentId = pathIds.lastParentId();
IgfsEntryInfo parentInfo = lockInfos.get(parentId);
// Propagate call to the secondary file system.
if (secondaryFs != null && !secondaryFs.delete(path, recursive))
return new IgfsDeleteResult(false, null);
transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
tx.commit();
signalDeleteWorker();
return new IgfsDeleteResult(true, victimInfo);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to perform soft delete because Grid is " +
"stopping [path=" + path + ']');
}
}
/**
* Remove listing entries of the given parent.
* This operation actually deletes directories from TRASH, is used solely by IgfsDeleteWorker.
*
* @param parentId Parent ID.
* @param listing Listing entries.
* @return Collection of really deleted entries.
* @throws IgniteCheckedException If failed.
*/
Collection<IgniteUuid> delete(IgniteUuid parentId, Map<String, IgfsListingEntry> listing)
throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert parentId != null;
assert listing != null;
validTxState(false);
try (GridNearTxLocal tx = startTx()) {
Collection<IgniteUuid> res = new HashSet<>();
// Obtain all necessary locks in one hop.
IgniteUuid[] allIds = new IgniteUuid[listing.size() + 1];
allIds[0] = parentId;
int i = 1;
for (IgfsListingEntry childEntry : listing.values())
allIds[i++] = childEntry.fileId();
Map<IgniteUuid, IgfsEntryInfo> locks = lockIds(allIds);
IgfsEntryInfo parentInfo = locks.get(parentId);
// Ensure parent is still in place.
if (parentInfo != null) {
Map<String, IgfsListingEntry> parentListing = parentInfo.listing();
Map<String, IgfsListingEntry> newListing = new HashMap<>(parentListing.size(), 1.0f);
newListing.putAll(parentListing);
// Remove child entries if possible.
for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
String childName = entry.getKey();
IgniteUuid childId = entry.getValue().fileId();
IgfsEntryInfo entryInfo = locks.get(childId);
if (entryInfo != null) {
// File must be locked for deletion:
assert entryInfo.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(entryInfo.lockId());
// Delete only files or empty folders.
if (!entryInfo.hasChildren()) {
id2InfoPrj.remove(childId);
newListing.remove(childName);
res.add(childId);
}
}
else {
// Entry was deleted concurrently.
newListing.remove(childName);
res.add(childId);
}
}
// Update parent listing.
id2InfoPrj.put(parentId, parentInfo.listing(newListing));
}
tx.commit();
return res;
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to perform delete because Grid is stopping [parentId=" +
parentId + ", listing=" + listing + ']');
}
/**
* Remove entry from the metadata listing.
* Used solely by IgfsDeleteWorker.
*
* @param parentId Parent ID.
* @param name Name.
* @param id ID.
* @return {@code True} in case the entry really was removed from the cache by this call.
* @throws IgniteCheckedException If failed.
*/
boolean delete(IgniteUuid parentId, String name, IgniteUuid id) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> infos = lockIds(parentId, id);
IgfsEntryInfo victim = infos.get(id);
if (victim == null)
return false;
assert victim.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(victim.lockId()) :
" isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId();
// Proceed only in case both parent and child exist.
if (infos.containsKey(parentId) && infos.containsKey(id)) {
IgfsEntryInfo parentInfo = infos.get(parentId);
assert parentInfo != null;
IgfsListingEntry childEntry = parentInfo.listing().get(name);
if (childEntry != null)
id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingRemoveProcessor(name, id));
id2InfoPrj.remove(id);
tx.commit();
return true;
}
return false;
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to perform delete because Grid is stopping [parentId=" +
parentId + ", name=" + name + ", id=" + id + ']');
}
/**
* Update file info (file properties) in cache in existing transaction.
*
* @param fileId File ID to update information for.
* @param props Properties to set for the file.
* @return Updated file info or {@code null} if such file ID not found.
* @throws IgniteCheckedException If operation failed.
*/
@Nullable private IgfsEntryInfo updatePropertiesNonTx(final IgniteUuid fileId, Map<String, String> props)
throws IgniteCheckedException {
assert fileId != null;
assert !F.isEmpty(props) : "Expects not-empty file's properties";
validTxState(true);
if (log.isDebugEnabled())
log.debug("Update file properties [fileId=" + fileId + ", props=" + props + ']');
try {
final IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
return null;
return invokeAndGet(fileId, new IgfsMetaUpdatePropertiesProcessor(props));
}
catch (GridClosureException e) {
throw U.cast(e);
}
}
/**
* Update file info (file properties) in cache.
*
* @param fileId File ID to update information for.
* @param props Properties to set for the file.
* @return Updated file info or {@code null} if such file ID not found.
* @throws IgniteCheckedException If operation failed.
*/
@Nullable public IgfsEntryInfo updateProperties(IgniteUuid fileId, Map<String, String> props)
throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
try (GridNearTxLocal tx = startTx()) {
IgfsEntryInfo info = updatePropertiesNonTx(fileId, props);
tx.commit();
return info;
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to update properties because Grid is stopping [fileId=" + fileId +
", props=" + props + ']');
}
/**
* Reserve space for file.
*
* @param fileId File ID.
* @param space Space.
* @param affRange Affinity range.
* @return New file info.
*/
public IgfsEntryInfo reserveSpace(IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
throws IgniteCheckedException {
validTxState(false);
if (busyLock.enterBusy()) {
try {
if (log.isDebugEnabled())
log.debug("Reserve file space: " + fileId);
try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
throw fsException("File has been deleted concurrently: " + fileId);
IgfsEntryInfo newInfo =
invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
tx.commit();
return newInfo;
}
catch (GridClosureException e) {
throw U.cast(e);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to reserve file space because Grid is stopping:" + fileId);
}
/**
* Update file info in cache.
*
* @param fileId File ID to update information for.
* @param proc Entry processor to invoke.
* @return Updated file info or {@code null} if such file ID not found.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgfsEntryInfo updateInfo(IgniteUuid fileId,
EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo> proc) throws IgniteCheckedException {
validTxState(false);
assert fileId != null;
assert proc != null;
if (busyLock.enterBusy()) {
try {
if (log.isDebugEnabled())
log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']');
try (GridNearTxLocal tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
return null; // File not found.
IgfsEntryInfo newInfo = invokeAndGet(fileId, proc);
if (newInfo == null)
throw fsException("Failed to update file info with null value" +
" [oldInfo=" + oldInfo + ", newInfo=null, proc=" + proc + ']');
if (!oldInfo.id().equals(newInfo.id()))
throw fsException("Failed to update file info (file IDs differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
if (oldInfo.isDirectory() != newInfo.isDirectory())
throw fsException("Failed to update file info (file types differ)" +
" [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
tx.commit();
return newInfo;
}
catch (GridClosureException e) {
throw U.cast(e);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to update file system entry info because Grid is stopping: " +
fileId);
}
/**
* Mkdirs implementation.
*
* @param path The path to create.
* @param props The properties to use for created directories.
* @return True if a directory was created during the operation.
* @throws IgniteCheckedException If a non-directory file exists on the requested path, and in case of other errors.
*/
boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
validTxState(false);
while (true) {
if (busyLock.enterBusy()) {
try {
// Prepare path IDs.
IgfsPathIds pathIds = pathIds(path);
// Prepare lock IDs. Essentially, they consist of two parts: existing IDs and potential new IDs.
Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
pathIds.addExistingIds(lockIds, relaxed);
pathIds.addSurrogateIds(lockIds);
// Start TX.
try (GridNearTxLocal tx = startTx()) {
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
// Directory structure changed concurrently. So we simply re-try.
continue;
// Check if the whole structure is already in place.
if (pathIds.allExists()) {
if (lockInfos.get(pathIds.lastExistingId()).isDirectory())
return false;
else
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
}
IgfsPathsCreateResult res = createDirectory(pathIds, lockInfos, props);
if (res == null)
continue;
// Commit TX.
tx.commit();
generateCreateEvents(res.createdPaths(), false);
// We are done.
return true;
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
}
}
/**
* Set sampling flag.
*
* @param val Sampling flag state or {@code null} to clear sampling state and mark it as "not set".
* @return {@code True} if sampling mode was actually changed by this call.
* @throws IgniteCheckedException If failed.
*/
public boolean sampling(Boolean val) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
try (GridNearTxLocal tx = startTx()) {
Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
tx.commit();
return !F.eq(prev, val);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to set sampling flag because Grid is stopping.");
}
/**
* Get sampling flag state.
*
* @return {@code True} in case sampling is enabled, {@code false} otherwise or {@code null} in case sampling
* is not set.
* @throws IgniteCheckedException If failed.
*/
public Boolean sampling() throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
Object val = metaCache.get(sampling);
return (val == null || !(val instanceof Boolean)) ? null : (Boolean)val;
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to get sampling flag because Grid is stopping.");
}
/**
* Put new entry to meta cache immediately linking it to parent.
*
* @param info Info to put.
* @param parentId Parent ID.
* @param name Name in parent.
* @throws IgniteCheckedException If failed.
*/
private void createNewEntry(IgfsEntryInfo info, IgniteUuid parentId, String name) throws IgniteCheckedException {
validTxState(true);
if (!id2InfoPrj.putIfAbsent(info.id(), info))
throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id());
if (parentId != null)
id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingAddProcessor(name, new IgfsListingEntry(info)));
}
/**
* Transfer entry from one directory to another.
*
* @param entry Entry to be transferred.
* @param srcId Source ID.
* @param srcName Source name.
* @param destId Destination ID.
* @param destName Destination name.
* @throws IgniteCheckedException If failed.
*/
private void transferEntry(IgfsListingEntry entry, IgniteUuid srcId, String srcName,
IgniteUuid destId, String destName) throws IgniteCheckedException {
validTxState(true);
if (F.eq(srcId, destId))
id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRenameProcessor(srcName, destName));
else {
Map<IgniteUuid, EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>> procMap = new HashMap<>();
procMap.put(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
procMap.put(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
id2InfoPrj.invokeAll(procMap);
}
}
/**
* Invoke lock processor.
*
* @param id File ID.
* @param del Whether lock is taken for delete.
* @return Resulting file info.
* @throws IgniteCheckedException If failed.
*/
private IgfsEntryInfo invokeLock(IgniteUuid id, boolean del) throws IgniteCheckedException {
return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(del)));
}
/**
* Invoke some processor and return new value.
*
* @param id ID.
* @param proc Processor.
* @return New file info.
* @throws IgniteCheckedException If failed.
*/
private IgfsEntryInfo invokeAndGet(IgniteUuid id, EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo> proc)
throws IgniteCheckedException {
validTxState(true);
EntryProcessorResult<IgfsEntryInfo> res = id2InfoPrj.invoke(id, proc);
assert res != null;
return res.get();
}
/**
* Get info.
*
* @param id ID.
* @return Info.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgfsEntryInfo getInfo(IgniteUuid id) throws IgniteCheckedException {
return id2InfoPrj.get(id);
}
/**
* Get several infos.
*
* @param ids IDs.
* @return Infos map.
* @throws IgniteCheckedException If failed.
*/
private Map<IgniteUuid, IgfsEntryInfo> getInfos(Collection<IgniteUuid> ids) throws IgniteCheckedException {
return id2InfoPrj.getAll(ids);
}
/**
* A delegate method that performs file creation in the synchronization task.
*
* @param fs File system.
* @param path Path.
* @param simpleCreate "Simple create" flag.
* @param props Properties..
* @param overwrite Overwrite flag.
* @param bufSize Buffer size.
* @param replication Replication factor.
* @param blockSize Block size.
* @param affKey Affinity key.
* @param infos Map from paths to corresponding infos.
* @param pendingEvts A non-null collection the events are to be accumulated in.
* @param t1 A signle-object tuple to hold the created output stream.
* @return Output stream descriptor.
* @throws Exception On error.
*/
IgfsCreateResult onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
boolean simpleCreate, @Nullable final Map<String, String> props, boolean overwrite,
int bufSize, short replication, long blockSize, IgniteUuid affKey, Map<IgfsPath, IgfsEntryInfo> infos,
final Deque<IgfsEvent> pendingEvts, final T1<OutputStream> t1) throws Exception {
validTxState(true);
assert !infos.isEmpty();
// Determine the first existing parent.
IgfsPath parentPath = null;
for (IgfsPath curPath : infos.keySet()) {
if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
parentPath = curPath;
}
assert parentPath != null;
IgfsEntryInfo parentInfo = infos.get(parentPath);
// Delegate to the secondary file system.
OutputStream out = simpleCreate ? fs.create(path, overwrite) :
fs.create(path, bufSize, overwrite, replication, blockSize, props);
t1.set(out);
IgfsPath parent0 = path.parent();
assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
// If some of the parent directories were missing, synchronize again.
if (!parentPath.equals(parent0)) {
parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
// Fire notification about missing directories creation.
if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
IgfsPath evtPath = parent0;
while (!parentPath.equals(evtPath)) {
pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
EventType.EVT_IGFS_DIR_CREATED));
evtPath = evtPath.parent();
assert evtPath != null; // If this fails, then ROOT does not exist.
}
}
}
// Get created file info.
IgfsFile status = fs.info(path);
if (status == null)
throw fsException("Failed to open output stream to the file created in " +
"the secondary file system because it no longer exists: " + path);
else if (status.isDirectory())
throw fsException("Failed to open output stream to the file created in " +
"the secondary file system because the path points to a directory: " + path);
IgfsEntryInfo newInfo = IgfsUtils.createFile(
IgniteUuid.randomUuid(),
igfsCtx.configuration().getBlockSize(),
status.length(),
affKey,
createFileLockId(false),
igfsCtx.igfs().evictExclude(path, false),
status.properties(),
status.accessTime(),
status.modificationTime()
);
// Add new file info to the listing optionally removing the previous one.
assert parentInfo != null;
IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
if (oldId != null) {
IgfsEntryInfo oldInfo = info(oldId);
assert oldInfo != null; // Otherwise cache is in inconsistent state.
// The contact is that we cannot overwrite a file locked for writing:
if (oldInfo.lockId() != null)
throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
id2InfoPrj.remove(oldId); // Remove the old one.
id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
path.name(), parentInfo.listing().get(path.name()).fileId()));
createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
igfsCtx.data().delete(oldInfo);
}
// Record CREATE event if needed.
if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
return new IgfsCreateResult(newInfo, out);
}
/**
* Append to a file in DUAL mode.
*
* @param fs File system.
* @param path Path.
* @param bufSize Buffer size.
* @param create Create flag.
* @return Output stream descriptor.
* @throws IgniteCheckedException If output stream open for append has failed.
*/
public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
final int bufSize, final boolean create) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert path != null;
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
SynchronizationTask<IgfsCreateResult> task =
new SynchronizationTask<IgfsCreateResult>() {
/** Container for the secondary file system output stream. */
private final T1<OutputStream> outT1 = new T1<>(null);
@Override public IgfsCreateResult onSuccess(Map<IgfsPath,
IgfsEntryInfo> infos) throws Exception {
validTxState(true);
final IgfsEntryInfo info = infos.get(path);
final IgfsEntryInfo lockedInfo;
if (info == null)
return onSuccessCreate(fs, path, true/*simpleCreate*/, null,
false/*overwrite*/, bufSize, (short)0, 0, null, infos, pendingEvts, outT1);
else {
if (info.isDirectory())
throw fsException("Failed to open output stream to the file in the " +
"secondary file system because the path points to a directory: " + path);
outT1.set(fs.append(path, bufSize, false, null));
// Synchronize file ending.
long len = info.length();
int blockSize = info.blockSize();
int remainder = (int) (len % blockSize);
if (remainder > 0) {
int blockIdx = (int) (len / blockSize);
try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) {
IgniteInternalFuture<byte[]> fut =
igfsCtx.data().dataBlock(info, path, blockIdx, reader);
assert fut != null;
fut.get();
}
}
if (info.lockId() != null) {
throw fsException("Failed to open file (file is opened for writing) [path=" +
path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']');
}
// Set lock and return.
lockedInfo = invokeLock(info.id(), false);
}
if (evts.isRecordable(EventType.EVT_IGFS_FILE_OPENED_WRITE))
pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_OPENED_WRITE));
return new IgfsCreateResult(lockedInfo, outT1.get());
}
@Override public IgfsCreateResult onFailure(@Nullable Exception err)
throws IgniteCheckedException {
U.closeQuiet(outT1.get());
U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
throw new IgniteCheckedException("Failed to append to the file due to secondary file " +
"system exception: " + path, err);
}
};
try {
return synchronizeAndExecute(task, fs, !create/*strict*/, path);
}
finally {
for (IgfsEvent evt : pendingEvts)
evts.record(evt);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to append to file in DUAL mode because Grid is stopping: " + path);
}
/**
* Get info for the given path.
*
* @param path Path.
* @return Info.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgfsEntryInfo infoForPath(IgfsPath path) throws IgniteCheckedException {
return client ? runClientTask(new IgfsClientMetaInfoForPathCallable(cfg.getName(),
IgfsUserContext.currentUser(), path)) : info(fileId(path));
}
/**
* Get IDs for the given path.
*
* @param path Path.
* @return IDs.
* @throws IgniteCheckedException If failed.
*/
public List<IgniteUuid> idsForPath(IgfsPath path) throws IgniteCheckedException {
return client ? runClientTask(new IgfsClientMetaIdsForPathCallable(cfg.getName(),
IgfsUserContext.currentUser(), path)) : fileIds(path);
}
/**
* Open file in DUAL mode.
*
* @param fs Secondary file system.
* @param path Path to open.
* @param bufSize Buffer size.
* @return Input stream descriptor.
* @throws IgniteCheckedException If input stream open has failed.
*/
public IgfsSecondaryInputStreamDescriptor openDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
final int bufSize) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert path != null;
// First, try getting file info without any transactions and synchronization.
IgfsEntryInfo info = infoForPath(path);
if (info != null) {
if (!info.isFile())
throw fsException(new IgfsPathIsDirectoryException("Failed to open file (not a file): " +
path));
return new IgfsSecondaryInputStreamDescriptor(info, lazySecondaryReader(fs, path, bufSize));
}
// If failed, try synchronize.
SynchronizationTask<IgfsSecondaryInputStreamDescriptor> task =
new SynchronizationTask<IgfsSecondaryInputStreamDescriptor>() {
@Override public IgfsSecondaryInputStreamDescriptor onSuccess(
Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
IgfsEntryInfo info = infos.get(path);
if (info == null)
throw fsException(new IgfsPathNotFoundException("File not found: " + path));
if (!info.isFile())
throw fsException(new IgfsPathIsDirectoryException("Failed to open file " +
"(not a file): " + path));
return new IgfsSecondaryInputStreamDescriptor(infos.get(path),
lazySecondaryReader(fs, path, bufSize));
}
@Override public IgfsSecondaryInputStreamDescriptor onFailure(@Nullable Exception err)
throws IgniteCheckedException {
U.error(log, "File open in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
throw new IgniteCheckedException("Failed to open the path due to secondary file system " +
"exception: " + path, err);
}
};
return synchronizeAndExecute(task, fs, false, path);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to open file in DUAL mode because Grid is stopping: " + path);
}
/**
* Create lazy secondary file system reader.
*
* @param fs File system.
* @param path Path.
* @param bufSize Buffer size.
* @return Lazy reader.
*/
private static IgfsLazySecondaryFileSystemPositionedReadable lazySecondaryReader(IgfsSecondaryFileSystem fs,
IgfsPath path, int bufSize) {
return new IgfsLazySecondaryFileSystemPositionedReadable(fs, path, bufSize);
}
/**
* Synchronizes with secondary file system.
*
* @param fs File system.
* @param path Path.
* @return File info or {@code null} if file not found.
* @throws IgniteCheckedException If sync task failed.
*/
@Nullable public IgfsEntryInfo synchronizeFileDual(final IgfsSecondaryFileSystem fs, final IgfsPath path)
throws IgniteCheckedException {
assert fs != null;
assert path != null;
if (busyLock.enterBusy()) {
try {
// First, try getting file info without any transactions and synchronization.
IgfsEntryInfo info = infoForPath(path);
if (info != null)
return info;
// If failed, try synchronize.
SynchronizationTask<IgfsEntryInfo> task =
new SynchronizationTask<IgfsEntryInfo>() {
@Override public IgfsEntryInfo onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
throws Exception {
return infos.get(path);
}
@Override public IgfsEntryInfo onFailure(@Nullable Exception err) throws IgniteCheckedException {
throw new IgniteCheckedException("Failed to synchronize path due to secondary file " +
"system exception: " + path, err);
}
};
return synchronizeAndExecute(task, fs, false, path);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to synchronize file because Grid is stopping: " + path);
}
/**
* Create directory in DUAL mode.
*
* @param fs Secondary file system.
* @param path Path to create.
* @param props Properties to be applied.
* @return {@code True} in case rename was successful.
* @throws IgniteCheckedException If directory creation failed.
*/
public boolean mkdirsDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final Map<String, String> props)
throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert path != null;
if (path.parent() == null)
return true; // No additional handling for root directory is needed.
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() {
@Override public Boolean onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
fs.mkdirs(path, props);
assert !infos.isEmpty();
// Now perform synchronization again starting with the last created parent.
IgfsPath parentPath = null;
for (IgfsPath curPath : infos.keySet()) {
if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
parentPath = curPath;
}
assert parentPath != null;
IgfsEntryInfo parentPathInfo = infos.get(parentPath);
synchronize(fs, parentPath, parentPathInfo, path, true, null);
if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
IgfsPath evtPath = path;
while (!parentPath.equals(evtPath)) {
pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EventType.EVT_IGFS_DIR_CREATED));
evtPath = evtPath.parent();
assert evtPath != null; // If this fails, then ROOT does not exist.
}
}
return true;
}
@Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException {
U.error(log, "Directory creation in DUAL mode failed [path=" + path + ", properties=" + props +
']', err);
throw new IgniteCheckedException("Failed to create the path due to secondary file system " +
"exception: " + path, err);
}
};
try {
return synchronizeAndExecute(task, fs, false, path.parent());
}
finally {
for (IgfsEvent evt : pendingEvts)
evts.record(evt);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to create directory in DUAL mode because Grid is stopping: " +
path);
}
/**
* Rename path in DUAL mode.
*
* @param fs Secondary file system.
* @param src Source path.
* @param dest Destination path.
* @return Operation result.
* @throws IgniteCheckedException If failed.
*/
public boolean renameDual(final IgfsSecondaryFileSystem fs, final IgfsPath src, final IgfsPath dest) throws
IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert src != null;
assert dest != null;
if (src.parent() == null)
return false; // Root directory cannot be renamed.
// Events to fire (can be done outside of a transaction).
final Collection<IgfsEvent> pendingEvts = new LinkedList<>();
SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() {
@Override public Boolean onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
IgfsEntryInfo srcInfo = infos.get(src);
IgfsEntryInfo srcParentInfo = infos.get(src.parent());
IgfsEntryInfo destInfo = infos.get(dest);
IgfsEntryInfo destParentInfo = dest.parent() != null ? infos.get(dest.parent()) : null;
// Source path and destination (or destination parent) must exist.
if (srcInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to rename " +
"(source path not found): " + src));
if (destInfo == null && destParentInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to rename " +
"(destination path not found): " + dest));
// Delegate to the secondary file system.
fs.rename(src, dest);
// Rename was successful, perform compensation in the local file system.
if (destInfo == null)
moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), dest.name(), destParentInfo.id());
else {
// Move.
if (destInfo.isFile())
throw fsException("Failed to rename the path in the local file system " +
"because destination path already exists and it is a file: " + dest);
else
moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), src.name(), destInfo.id());
}
// Record event if needed.
if (srcInfo.isFile()) {
if (evts.isRecordable(EventType.EVT_IGFS_FILE_RENAMED))
pendingEvts.add(new IgfsEvent(
src,
destInfo == null ? dest : new IgfsPath(dest, src.name()),
locNode,
EventType.EVT_IGFS_FILE_RENAMED));
}
else if (evts.isRecordable(EventType.EVT_IGFS_DIR_RENAMED))
pendingEvts.add(new IgfsEvent(src, dest, locNode, EventType.EVT_IGFS_DIR_RENAMED));
return true;
}
@Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException {
U.error(log, "Path rename in DUAL mode failed [source=" + src + ", destination=" + dest + ']',
err);
throw new IgniteCheckedException("Failed to rename the path due to secondary file system " +
"exception: " + src, err);
}
};
try {
return synchronizeAndExecute(task, fs, false, src, dest);
}
finally {
for (IgfsEvent evt : pendingEvts)
evts.record(evt);
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to rename in DUAL mode because Grid is stopping [src=" + src +
", dest=" + dest + ']');
}
/**
* Update path in DUAL mode.
*
* @param fs Secondary file system.
* @param path Path to update.
* @param props Properties to be applied.
* @return Update file info.
* @throws IgniteCheckedException If update failed.
*/
public IgfsEntryInfo updateDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
final Map<String, String> props) throws IgniteCheckedException {
assert fs != null;
assert path != null;
assert props != null && !props.isEmpty();
if (busyLock.enterBusy()) {
try {
SynchronizationTask<IgfsEntryInfo> task = new SynchronizationTask<IgfsEntryInfo>() {
@Override public IgfsEntryInfo onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
if (infos.get(path) == null)
return null;
fs.update(path, props);
return updatePropertiesNonTx(infos.get(path).id(), props);
}
@Override public IgfsEntryInfo onFailure(@Nullable Exception err) throws IgniteCheckedException {
U.error(log, "Path update in DUAL mode failed [path=" + path + ", properties=" + props + ']',
err);
throw new IgniteCheckedException("Failed to update the path due to secondary file system " +
"exception: " + path, err);
}
};
return synchronizeAndExecute(task, fs, false, path);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to update in DUAL mode because Grid is stopping: " + path);
}
/**
* Synchronize directory structure with the secondary file system.
*
* @param fs Secondary file system.
* @param startPath Start path.
* @param startPathInfo Start path info.
* @param endPath End path.
* @param strict Whether all paths must exist in the secondary file system.
* @param created Optional map where data about all created values is put.
* @return File info of the end path.
* @throws IgniteCheckedException If failed.
*/
private IgfsEntryInfo synchronize(IgfsSecondaryFileSystem fs,
IgfsPath startPath,
IgfsEntryInfo startPathInfo,
IgfsPath endPath,
boolean strict,
@Nullable Map<IgfsPath, IgfsEntryInfo> created)
throws IgniteCheckedException
{
assert fs != null;
assert startPath != null && startPathInfo != null && endPath != null;
validTxState(true);
IgfsEntryInfo parentInfo = startPathInfo;
List<String> components = endPath.components();
IgfsPath curPath = startPath;
for (int i = startPath.components().size(); i < components.size(); i++) {
curPath = new IgfsPath(curPath, components.get(i));
if (created != null && created.containsKey(curPath))
// Re-use already created info.
parentInfo = created.get(curPath);
else {
// Get file status from the secondary file system.
IgfsFile status;
try {
status = fs.info(curPath);
}
catch (IgniteException e) {
throw new IgniteCheckedException("Failed to get path information: " + e, e);
}
if (status != null) {
if (!status.isDirectory() && !curPath.equals(endPath))
throw new IgniteCheckedException("Failed to create path the locally because secondary file " +
"system directory structure was modified concurrently and the path is not a directory as " +
"expected: " + curPath);
}
else {
if (strict) {
throw new IgniteCheckedException("Failed to create path locally due to secondary file system " +
"exception: " + curPath);
}
else if (created != null)
created.put(curPath.parent(), parentInfo);
return null;
}
// Recreate the path locally.
IgfsEntryInfo curInfo = status.isDirectory() ?
IgfsUtils.createDirectory(
IgniteUuid.randomUuid(),
null,
status.properties(),
status.accessTime(),
status.modificationTime()
) :
IgfsUtils.createFile(
IgniteUuid.randomUuid(),
igfsCtx.configuration().getBlockSize(),
status.length(),
null,
null,
igfsCtx.igfs().evictExclude(curPath, false),
status.properties(),
status.accessTime(),
status.modificationTime()
);
assert parentInfo != null;
IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), components.get(i), curInfo);
if (oldId != null)
curInfo = info(oldId);
if (created != null)
created.put(curPath, curInfo);
parentInfo = curInfo;
}
}
return parentInfo;
}
/**
* Synchronize file system structure and then execute provided task. All these actions are performed withing
* the transaction.
*
* @param task Task to execute.
* @param fs File system.
* @param strict Whether paths must be re-created strictly.
* @param paths Paths to synchronize.
* @return Result of task execution.
* @throws IgniteCheckedException If failed.
*/
private <T> T synchronizeAndExecute(SynchronizationTask<T> task,
IgfsSecondaryFileSystem fs,
boolean strict,
IgfsPath... paths)
throws IgniteCheckedException
{
return synchronizeAndExecute(task, fs, strict, null, paths);
}
/**
* Synchronize file system structure and then execute provided task. All these actions are performed withing
* the transaction.
*
* @param task Task to execute.
* @param fs File system.
* @param strict Whether paths must be re-created strictly.
* @param extraLockIds Additional IDs to lock (optional).
* @param paths Paths to synchronize.
* @return Result of task execution.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"Contract", "ConstantConditions"})
private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgfsSecondaryFileSystem fs, boolean strict,
@Nullable Collection<IgniteUuid> extraLockIds, IgfsPath... paths) throws IgniteCheckedException {
assert task != null;
assert fs != null;
assert paths != null && paths.length > 0;
// Sort paths so that we know in which order to synchronize them.
if (paths.length > 1)
Arrays.sort(paths);
boolean finished = false;
T res = null;
while (!finished) {
// Obtain existing IDs outside the transaction.
List<List<IgniteUuid>> pathIds = new ArrayList<>(paths.length);
for (IgfsPath path : paths)
pathIds.add(idsForPath(path));
// Start pessimistic.
try (GridNearTxLocal tx = startTx()) {
// Lock the very first existing parents and possibly the leaf as well.
Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>();
Map<IgfsPath, IgniteUuid> pathToId = new HashMap<>();
for (int i = 0; i < paths.length; i++) {
IgfsPath path = paths[i];
// Determine the very first existing parent
List<IgniteUuid> ids = pathIds.get(i);
if (ids.size() > 1) {
// The path is not root.
IgfsPath parentPath = path.parent();
IgniteUuid parentId = ids.get(ids.size() - 2);
for (int j = ids.size() - 3; j >= 0; j--) {
if (parentId != null)
break;
else {
parentPath = parentPath.parent();
parentId = ids.get(j);
}
}
assert parentPath != null && parentId != null;
pathToParent.put(path, parentPath);
pathToId.put(parentPath, parentId);
}
IgniteUuid pathId = ids.get(ids.size() - 1);
if (pathId != null)
pathToId.put(path, pathId);
}
IgniteUuid[] lockArr = new IgniteUuid[extraLockIds == null ? pathToId.size() : pathToId.size() +
extraLockIds.size()];
int idx = 0;
for (IgniteUuid id : pathToId.values())
lockArr[idx++] = id;
if (extraLockIds != null) {
for (IgniteUuid id : extraLockIds)
lockArr[idx++] = id;
}
Map<IgniteUuid, IgfsEntryInfo> idToInfo = lockIds(lockArr);
if (extraLockIds != null) {
for (IgniteUuid id : extraLockIds)
idToInfo.remove(id);
}
// Ensure that locked IDs still point to expected paths.
IgfsPath changed = null;
for (Map.Entry<IgfsPath, IgniteUuid> entry : pathToId.entrySet()) {
if (!idToInfo.containsKey(entry.getValue()) ||
!F.eq(entry.getValue(), fileId(entry.getKey(), true))) {
changed = entry.getKey();
break;
}
}
if (changed != null) {
finished = true;
throw fsException(new IgfsConcurrentModificationException("File system entry has been " +
"modified concurrently: " + changed));
}
else {
boolean newParents = false;
// Check whether any new parents appeared before we have obtained the locks.
for (int i = 0; i < paths.length; i++) {
List<IgniteUuid> newIds = fileIds(paths[i], true);
if (!pathIds.get(i).equals(newIds)) {
newParents = true;
break;
}
}
if (newParents)
continue; // Release all locks and try again.
else {
// Perform synchronization.
Map<IgfsPath, IgfsEntryInfo> infos = new HashMap<>();
TreeMap<IgfsPath, IgfsEntryInfo> created = new TreeMap<>();
for (IgfsPath path : paths) {
IgfsPath parentPath = path.parent();
if (pathToId.containsKey(path)) {
infos.put(path, info(pathToId.get(path)));
if (parentPath != null)
infos.put(parentPath, info(pathToId.get(parentPath)));
}
else {
IgfsPath firstParentPath = pathToParent.get(path);
assert firstParentPath != null;
assert pathToId.get(firstParentPath) != null;
IgfsEntryInfo info = synchronize(fs,
firstParentPath,
idToInfo.get(pathToId.get(firstParentPath)),
path,
strict,
created);
assert strict && info != null || !strict;
if (info != null)
infos.put(path, info);
if (parentPath != null) {
if (parentPath.equals(firstParentPath))
infos.put(firstParentPath, idToInfo.get(pathToId.get(firstParentPath)));
else {
assert strict && created.get(parentPath) != null || !strict;
if (created.get(parentPath) != null)
infos.put(parentPath, created.get(parentPath));
else {
// Put the last created path.
infos.put(created.lastKey(), created.get(created.lastKey()));
}
}
}
}
}
// Finally, execute the task.
finished = true;
try {
res = task.onSuccess(infos);
}
catch (Exception e) {
res = task.onFailure(e);
}
}
}
tx.commit();
}
catch (IgniteCheckedException e) {
if (!finished) {
finished = true;
res = task.onFailure(e);
}
else
throw e;
}
}
return res;
}
/**
* Check transaction is (not) started.
*
* @param inTx Expected transaction state.
*/
private void validTxState(boolean inTx) {
assert (inTx && id2InfoPrj.tx() != null) || (!inTx && id2InfoPrj.tx() == null) :
"Invalid TX state [expected=" + inTx + ", actual=" + (id2InfoPrj.tx() != null) + ']';
}
/**
* Start transaction on meta cache.
*
* @return Transaction.
*/
private GridNearTxLocal startTx() {
return metaCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
}
/**
* Update times.
*
* @param path Path.
* @param accessTime Access time.
* @param modificationTime Modification time.
* @param secondaryFs Secondary file system.
* @throws IgniteCheckedException If failed.
*/
public void updateTimes(IgfsPath path, long modificationTime, long accessTime,
IgfsSecondaryFileSystem secondaryFs) throws IgniteCheckedException {
while (true) {
if (busyLock.enterBusy()) {
try {
validTxState(false);
// Prepare path IDs.
IgfsPathIds pathIds = pathIds(path);
// Prepare lock IDs.
Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
pathIds.addExistingIds(lockIds, relaxed);
// Start TX.
try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos))
continue;
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
// Directory structure changed concurrently. So we re-try.
continue;
if (pathIds.allExists()) {
// All files are in place. Update both primary and secondary file systems.
if (secondaryFs != null)
secondaryFs.setTimes(path, modificationTime, accessTime);
IgniteUuid targetId = pathIds.lastExistingId();
IgfsEntryInfo targetInfo = lockInfos.get(targetId);
id2InfoPrj.invoke(targetId, new IgfsMetaUpdateTimesProcessor(
accessTime == -1 ? targetInfo.accessTime() : accessTime,
modificationTime == -1 ? targetInfo.modificationTime() : modificationTime)
);
tx.commit();
return;
}
else {
// Propagate call to the secondary FS, as we might haven't cache this part yet.
if (secondaryFs != null) {
secondaryFs.setTimes(path, modificationTime, accessTime);
return;
}
else
throw new IgfsPathNotFoundException("Failed to update times (path not found): " + path);
}
}
}
catch (IgniteException | IgniteCheckedException e) {
throw e;
}
catch (Exception e) {
throw new IgniteCheckedException("setTimes failed due to unexpected exception: " + path, e);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to update times because Grid is stopping: " + path);
}
}
/**
* @param msg Error message.
* @return Checked exception.
*/
private static IgniteCheckedException fsException(String msg) {
return new IgniteCheckedException(new IgfsException(msg));
}
/**
* @param err Unchecked exception.
* @return Checked exception.
*/
private static IgniteCheckedException fsException(IgfsException err) {
return new IgniteCheckedException(err);
}
/**
* Append routine.
*
* @param path Path.
* @param dirProps Directory properties.
* @param create Create flag.
* @param blockSize Block size.
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param fileProps File properties.
* @return Resulting info.
* @throws IgniteCheckedException If failed.
*/
IgfsEntryInfo append(
final IgfsPath path,
Map<String, String> dirProps,
final boolean create,
final int blockSize,
final @Nullable IgniteUuid affKey,
final boolean evictExclude,
@Nullable Map<String, String> fileProps) throws IgniteCheckedException {
validTxState(false);
while (true) {
if (busyLock.enterBusy()) {
try {
// Prepare path IDs.
IgfsPathIds pathIds = pathIds(path);
// Fail-fast: create flag is not specified and some paths are missing.
if (!pathIds.allExists() && !create)
throw new IgfsPathNotFoundException("Failed to append because file is not found: " + path);
// Prepare lock IDs.
Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
pathIds.addExistingIds(lockIds, relaxed);
pathIds.addSurrogateIds(lockIds);
// Start TX.
try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
// Directory structure changed concurrently. So we simply re-try.
continue;
if (pathIds.allExists()) {
// All participants are found. Simply open the stream.
IgfsEntryInfo info = lockInfos.get(pathIds.lastId());
// Check: is it a file?
if (!info.isFile())
throw new IgfsPathIsDirectoryException("Failed to open file for write." + path);
// Check if file already opened for write.
if (info.lockId() != null)
throw new IgfsException("File is already opened for write: " + path);
// At this point we can open the stream safely.
info = invokeLock(info.id(), false);
tx.commit();
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
return info;
}
else {
// Create file and parent folders.
IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
affKey, evictExclude, null, null);
if (res == null)
continue;
// Commit.
tx.commit();
// Generate events.
generateCreateEvents(res.createdPaths(), true);
return res.info();
}
}
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to append for file because Grid is stopping:" + path);
}
}
/**
* Create a file.
*
* @param path Path.
* @param dirProps Directory properties.
* @param overwrite Overwrite flag.
* @param blockSize Block size.
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param fileProps File properties.
* @param secondaryCtx Secondary file system create context.
* @return @return Operation result.
* @throws IgniteCheckedException If failed.
*/
IgfsCreateResult create(
final IgfsPath path,
Map<String, String> dirProps,
final boolean overwrite,
final int blockSize,
final @Nullable IgniteUuid affKey,
final boolean evictExclude,
@Nullable Map<String, String> fileProps,
@Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
validTxState(false);
while (true) {
if (busyLock.enterBusy()) {
OutputStream secondaryOut = null;
try {
// Prepare path IDs.
IgfsPathIds pathIds = pathIds(path);
// Prepare lock IDs.
Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
pathIds.addExistingIds(lockIds, relaxed);
pathIds.addSurrogateIds(lockIds);
// In overwrite mode we also lock ID of potential replacement as well as trash ID.
IgniteUuid overwriteId = IgniteUuid.randomUuid();
IgniteUuid trashId = IgfsUtils.randomTrashId();
if (overwrite) {
lockIds.add(overwriteId);
// Trash ID is only added if we suspect conflict.
if (pathIds.allExists())
lockIds.add(trashId);
}
// Start TX.
try (GridNearTxLocal tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (secondaryCtx != null && isRetryForSecondary(pathIds, lockInfos))
continue;
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
// Directory structure changed concurrently. So we simply re-try.
continue;
if (pathIds.allExists()) {
// All participants found.
IgfsEntryInfo oldInfo = lockInfos.get(pathIds.lastId());
// Check: is it a file?
if (!oldInfo.isFile())
throw new IgfsPathIsDirectoryException("Failed to create a file: " + path);
// Check: can we overwrite it?
if (!overwrite)
throw new IgfsPathAlreadyExistsException("Failed to create a file: " + path);
// Check if file already opened for write.
if (oldInfo.lockId() != null)
throw new IgfsException("File is already opened for write: " + path);
// At this point file can be re-created safely.
// Add existing to trash listing.
IgniteUuid oldId = pathIds.lastId();
id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(
IgfsUtils.composeNameForTrash(path, oldId), new IgfsListingEntry(oldInfo)));
// Replace ID in parent directory.
String name = pathIds.lastPart();
IgniteUuid parentId = pathIds.lastParentId();
id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingReplaceProcessor(name, overwriteId));
// Create the file.
IgniteUuid newLockId = createFileLockId(false);
long newAccessTime;
long newModificationTime;
Map<String, String> newProps;
long newLen;
int newBlockSize;
if (secondaryCtx != null) {
secondaryOut = secondaryCtx.create();
newAccessTime = 0L;
newModificationTime = 0L;
newProps = null;
}
else {
newAccessTime = System.currentTimeMillis();
newModificationTime = newAccessTime;
newProps = fileProps;
}
newLen = 0L;
newBlockSize = blockSize;
IgfsEntryInfo newInfo = invokeAndGet(overwriteId,
new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
newBlockSize, affKey, newLockId, evictExclude, newLen));
// Prepare result and commit.
tx.commit();
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
return new IgfsCreateResult(newInfo, secondaryOut);
}
else {
// Create file and parent folders.
T1<OutputStream> secondaryOutHolder = null;
if (secondaryCtx != null)
secondaryOutHolder = new T1<>();
IgfsPathsCreateResult res;
try {
res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
affKey, evictExclude, secondaryCtx, secondaryOutHolder);
}
finally {
if (secondaryOutHolder != null)
secondaryOut = secondaryOutHolder.get();
}
if (res == null)
continue;
// Commit.
tx.commit();
// Generate events.
generateCreateEvents(res.createdPaths(), true);
return new IgfsCreateResult(res.info(), secondaryOut);
}
}
}
catch (IgniteException | IgniteCheckedException e) {
U.closeQuiet(secondaryOut);
throw e;
}
catch (Exception e) {
U.closeQuiet(secondaryOut);
throw new IgniteCheckedException("Create failed due to unexpected exception: " + path, e);
}
finally {
busyLock.leaveBusy();
}
}
else
throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
}
}
/**
* Create directory and it's parents.
*
* @param pathIds Path IDs.
* @param lockInfos Lock infos.
* @param dirProps Directory properties.
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
@Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps) throws IgniteCheckedException {
// Check if entry we are going to write to is directory.
if (lockInfos.get(pathIds.lastExistingId()).isFile())
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null, null);
}
/**
* Create file and all it's parents.
*
* @param pathIds Paths IDs.
* @param lockInfos Lock infos.
* @param dirProps Directory properties.
* @param fileProps File propertris.
* @param blockSize Block size.
* @param affKey Affinity key (optional)
* @param evictExclude Evict exclude flag.
* @param secondaryCtx Secondary file system create context.
* @param secondaryOutHolder Holder for the secondary output stream.
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx,
@Nullable T1<OutputStream> secondaryOutHolder)
throws IgniteCheckedException{
// Check if entry we are going to write to is directory.
if (lockInfos.get(pathIds.lastExistingId()).isFile())
throw new IgfsParentNotDirectoryException("Failed to open file for write " +
"(parent element is not a directory): " + pathIds.path());
return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude,
secondaryCtx, secondaryOutHolder);
}
/**
* Create file or directory.
*
* @param dir Directory flag.
* @param pathIds Path IDs.
* @param lockInfos Lock infos.
* @param dirProps Directory properties.
* @param fileProps File properties.
* @param blockSize Block size.
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param secondaryCtx Secondary file system create context.
* @param secondaryOutHolder Secondary output stream holder.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
@Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx, @Nullable T1<OutputStream> secondaryOutHolder)
throws IgniteCheckedException {
// This is our starting point.
int lastExistingIdx = pathIds.lastExistingIndex();
IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
// If current info already contains entry with the same name as it's child, then something
// has changed concurrently. We must re-try because we cannot get info of this unexpected
// element due to possible deadlocks.
int curIdx = lastExistingIdx + 1;
String curPart = pathIds.part(curIdx);
IgniteUuid curId = pathIds.surrogateId(curIdx);
if (lastExistingInfo.hasChild(curPart))
return null;
// Create entry in the secondary file system if needed.
if (secondaryCtx != null) {
assert secondaryOutHolder != null;
secondaryOutHolder.set(secondaryCtx.create());
}
Map<IgniteUuid, EntryProcessor> procMap = new HashMap<>();
// First step: add new entry to the last existing element.
procMap.put(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
// Events support.
IgfsPath lastCreatedPath = pathIds.lastExistingPath();
List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx);
// Second step: create middle directories.
long curTime = System.currentTimeMillis();
while (curIdx < pathIds.count() - 1) {
lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
int nextIdx = curIdx + 1;
String nextPart = pathIds.part(nextIdx);
IgniteUuid nextId = pathIds.surrogateId(nextIdx);
long accessTime;
long modificationTime;
Map<String, String> props;
if (secondaryCtx != null) {
accessTime = 0L;
modificationTime = 0L;
props = null;
}
else {
accessTime = curTime;
modificationTime = curTime;
props = dirProps;
}
procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props,
nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
// Save event.
createdPaths.add(lastCreatedPath);
// Advance things further.
curIdx++;
curPart = nextPart;
curId = nextId;
}
// Third step: create leaf.
if (dir) {
long accessTime;
long modificationTime;
Map<String, String> props;
if (secondaryCtx != null) {
accessTime = 0L;
modificationTime = 0L;
props = null;
}
else {
accessTime = curTime;
modificationTime = curTime;
props = dirProps;
}
procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props));
}
else {
long newAccessTime;
long newModificationTime;
Map<String, String> newProps;
long newLen;
int newBlockSize;
if (secondaryCtx != null) {
newAccessTime = 0L;
newModificationTime = 0L;
newProps = null;
}
else {
newAccessTime = curTime;
newModificationTime = curTime;
newProps = fileProps;
}
newLen = 0L;
newBlockSize = blockSize;
procMap.put(curId, new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
newBlockSize, affKey, createFileLockId(false), evictExclude, newLen));
}
createdPaths.add(pathIds.path());
// Execute cache operations.
Map<Object, EntryProcessorResult> invokeRes = ((IgniteInternalCache)id2InfoPrj).invokeAll(procMap);
IgfsEntryInfo info = (IgfsEntryInfo)invokeRes.get(curId).get();
return new IgfsPathsCreateResult(createdPaths, info);
}
/**
* Generate events for created file or directory.
*
* @param createdPaths Created paths.
* @param file Whether file was created.
*/
private void generateCreateEvents(List<IgfsPath> createdPaths, boolean file) {
if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
for (int i = 0; i < createdPaths.size() - 1; i++)
IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i),
EventType.EVT_IGFS_DIR_CREATED);
}
IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1);
if (file) {
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE);
}
else
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
/**
* Signal delete worker thread.
*/
private void signalDeleteWorker() {
IgfsDeleteWorker delWorker0 = delWorker;
if (delWorker0 != null)
delWorker0.signal();
}
/**
* Synchronization task interface.
*/
private static interface SynchronizationTask<T> {
/**
* Callback handler in case synchronization was successful.
*
* @param infos Map from paths to corresponding infos.
* @return Task result.
* @throws Exception If failed.
*/
public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
/**
* Callback handler in case synchronization failed.
*
* @param err Optional exception.
* @return Task result.
* @throws IgniteCheckedException In case exception is to be thrown in that case.
*/
public T onFailure(Exception err) throws IgniteCheckedException;
}
}