blob: 8dcccea2e96f13a0d25db8eb79f67c12908867a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED;
/**
* IGFS worker for removal from the trash directory.
*/
public class IgfsDeleteWorker extends IgfsThread {
/** Awake frequency, */
private static final long FREQUENCY = 1000;
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
/** IGFS context. */
private final IgfsContext igfsCtx;
/** Metadata manager. */
private final IgfsMetaManager meta;
/** Data manager. */
private final IgfsDataManager data;
/** Logger. */
private final IgniteLogger log;
/** Lock. */
private final Lock lock = new ReentrantLock();
/** Condition. */
private final Condition cond = lock.newCondition();
/** Force worker to perform actual delete. */
private boolean force;
/** Cancellation flag. */
private volatile boolean cancelled;
/**
* Constructor.
*
* @param igfsCtx IGFS context.
*/
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
this.igfsCtx = igfsCtx;
meta = igfsCtx.meta();
data = igfsCtx.data();
assert meta != null;
assert data != null;
log = igfsCtx.kernalContext().log(IgfsDeleteWorker.class);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Delete worker started.");
while (!cancelled) {
lock.lock();
try {
if (!cancelled && !force)
cond.await(FREQUENCY, TimeUnit.MILLISECONDS);
force = false; // Reset force flag.
}
finally {
lock.unlock();
}
if (!cancelled)
delete();
}
}
/**
* Notify the worker that new entry to delete appeared.
*/
void signal() {
lock.lock();
try {
force = true;
cond.signalAll();
}
finally {
lock.unlock();
}
}
/**
* Cancels the worker.
*/
void cancel() {
cancelled = true;
interrupt();
}
/**
* Perform cleanup of trash directories.
*/
private void delete() {
for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++)
delete(IgfsUtils.trashId(i));
}
/**
* Perform cleanup of concrete trash directory.
*
* @param trashId Trash ID.
*/
private void delete(IgniteUuid trashId) {
IgfsEntryInfo info = null;
try {
info = meta.info(trashId);
}
catch (ClusterTopologyServerNotFoundException ignore) {
// Ignore.
}
catch (IgniteCheckedException e) {
U.warn(log, "Cannot obtain trash directory info (is node stopping?)");
if (log.isDebugEnabled())
U.error(log, "Cannot obtain trash directory info.", e);
}
if (info != null) {
for (Map.Entry<String, IgfsListingEntry> entry : info.listing().entrySet()) {
IgniteUuid fileId = entry.getValue().fileId();
if (log.isDebugEnabled())
log.debug("Deleting IGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']');
try {
if (!cancelled) {
if (delete(trashId, entry.getKey(), fileId)) {
if (log.isDebugEnabled())
log.debug("Sending delete confirmation message [name=" + entry.getKey() +
", fileId=" + fileId + ']');
}
}
else
break;
}
catch (IgniteInterruptedCheckedException ignored) {
// Ignore this exception while stopping.
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
}
}
}
}
/**
* Remove particular entry from the TRASH directory.
*
* @param trashId ID of the trash directory.
* @param name Entry name.
* @param id Entry ID.
* @return {@code True} in case the entry really was deleted form the file system by this call.
* @throws IgniteCheckedException If failed.
*/
private boolean delete(IgniteUuid trashId, String name, IgniteUuid id) throws IgniteCheckedException {
assert name != null;
assert id != null;
while (true) {
IgfsEntryInfo info = meta.info(id);
if (info != null) {
if (info.isDirectory()) {
if (!deleteDirectoryContents(trashId, id))
return false;
if (meta.delete(trashId, name, id))
return true;
}
else {
assert info.isFile();
// Lock the file with special lock Id to prevent concurrent writing:
IgfsEntryInfo lockedInfo = meta.lock(id, true);
if (lockedInfo == null)
return false; // File is locked, we cannot delete it.
assert id.equals(lockedInfo.id());
// Delete file content first.
// In case this node crashes, other node will re-delete the file.
data.delete(lockedInfo).get();
boolean ret = meta.delete(trashId, name, id);
if (ret) {
IgfsPath path = IgfsUtils.extractOriginalPathFromTrash(name);
assert path != null;
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_PURGED);
}
return ret;
}
}
else
return false; // Entry was deleted concurrently.
}
}
/**
* Remove particular entry from the trash directory or subdirectory.
*
* @param parentId Parent ID.
* @param id Entry id.
* @return true iff all the items in the directory were deleted (directory is seen to be empty).
* @throws IgniteCheckedException If delete failed for some reason.
*/
private boolean deleteDirectoryContents(IgniteUuid parentId, final IgniteUuid id) throws IgniteCheckedException {
assert parentId != null;
assert id != null;
while (true) {
IgfsEntryInfo info = meta.info(id);
if (info != null) {
assert info.isDirectory();
final Map<String, IgfsListingEntry> listing = info.listing();
if (listing.isEmpty())
return true; // Directory is empty.
final Map<String, IgfsListingEntry> delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f);
final GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
int failedFiles = 0;
for (final Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
if (cancelled)
return false;
if (entry.getValue().isDirectory()) {
if (deleteDirectoryContents(id, entry.getValue().fileId())) // *** Recursive call.
delListing.put(entry.getKey(), entry.getValue());
else
failedFiles++;
}
else {
IgfsEntryInfo fileInfo = meta.info(entry.getValue().fileId());
if (fileInfo != null) {
assert fileInfo.isFile();
IgfsEntryInfo lockedInfo = meta.lock(fileInfo.id(), true);
if (lockedInfo == null)
// File is already locked:
failedFiles++;
else {
assert IgfsUtils.DELETE_LOCK_ID.equals(lockedInfo.lockId());
fut.add(data.delete(lockedInfo));
delListing.put(entry.getKey(), entry.getValue());
}
}
}
if (delListing.size() == MAX_DELETE_BATCH)
break;
}
fut.markInitialized();
// Wait for data cache to delete values before clearing meta cache.
try {
fut.get();
}
catch (IgniteFutureCancelledCheckedException ignore) {
// This future can be cancelled only due to IGFS shutdown.
cancelled = true;
return false;
}
// Actual delete of folder content.
Collection<IgniteUuid> delIds = meta.delete(id, delListing);
if (listing.size() == delIds.size())
return true; // All entries were deleted.
if (listing.size() == delListing.size() + failedFiles)
// All the files were tried, no reason to continue the loop:
return false;
}
else
return true; // Directory entry was deleted concurrently.
}
}
}