| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.igfs; |
| |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.igfs.IgfsPath; |
| import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; |
| |
| /** |
| * IGFS worker for removal from the trash directory. |
| */ |
| public class IgfsDeleteWorker extends IgfsThread { |
| /** Awake frequency, */ |
| private static final long FREQUENCY = 1000; |
| |
| /** How many files/folders to delete at once (i.e in a single transaction). */ |
| private static final int MAX_DELETE_BATCH = 100; |
| |
| /** IGFS context. */ |
| private final IgfsContext igfsCtx; |
| |
| /** Metadata manager. */ |
| private final IgfsMetaManager meta; |
| |
| /** Data manager. */ |
| private final IgfsDataManager data; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Lock. */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Condition. */ |
| private final Condition cond = lock.newCondition(); |
| |
| /** Force worker to perform actual delete. */ |
| private boolean force; |
| |
| /** Cancellation flag. */ |
| private volatile boolean cancelled; |
| |
| /** |
| * Constructor. |
| * |
| * @param igfsCtx IGFS context. |
| */ |
| IgfsDeleteWorker(IgfsContext igfsCtx) { |
| super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%"); |
| |
| this.igfsCtx = igfsCtx; |
| |
| meta = igfsCtx.meta(); |
| data = igfsCtx.data(); |
| |
| assert meta != null; |
| assert data != null; |
| |
| log = igfsCtx.kernalContext().log(IgfsDeleteWorker.class); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| if (log.isDebugEnabled()) |
| log.debug("Delete worker started."); |
| |
| while (!cancelled) { |
| lock.lock(); |
| |
| try { |
| if (!cancelled && !force) |
| cond.await(FREQUENCY, TimeUnit.MILLISECONDS); |
| |
| force = false; // Reset force flag. |
| } |
| finally { |
| lock.unlock(); |
| } |
| |
| if (!cancelled) |
| delete(); |
| } |
| } |
| |
| /** |
| * Notify the worker that new entry to delete appeared. |
| */ |
| void signal() { |
| lock.lock(); |
| |
| try { |
| force = true; |
| |
| cond.signalAll(); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Cancels the worker. |
| */ |
| void cancel() { |
| cancelled = true; |
| |
| interrupt(); |
| } |
| |
| /** |
| * Perform cleanup of trash directories. |
| */ |
| private void delete() { |
| for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++) |
| delete(IgfsUtils.trashId(i)); |
| } |
| |
| /** |
| * Perform cleanup of concrete trash directory. |
| * |
| * @param trashId Trash ID. |
| */ |
| private void delete(IgniteUuid trashId) { |
| IgfsEntryInfo info = null; |
| |
| try { |
| info = meta.info(trashId); |
| } |
| catch (ClusterTopologyServerNotFoundException ignore) { |
| // Ignore. |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Cannot obtain trash directory info (is node stopping?)"); |
| |
| if (log.isDebugEnabled()) |
| U.error(log, "Cannot obtain trash directory info.", e); |
| } |
| |
| if (info != null) { |
| for (Map.Entry<String, IgfsListingEntry> entry : info.listing().entrySet()) { |
| IgniteUuid fileId = entry.getValue().fileId(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Deleting IGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']'); |
| |
| try { |
| if (!cancelled) { |
| if (delete(trashId, entry.getKey(), fileId)) { |
| if (log.isDebugEnabled()) |
| log.debug("Sending delete confirmation message [name=" + entry.getKey() + |
| ", fileId=" + fileId + ']'); |
| } |
| } |
| else |
| break; |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| // Ignore this exception while stopping. |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Remove particular entry from the TRASH directory. |
| * |
| * @param trashId ID of the trash directory. |
| * @param name Entry name. |
| * @param id Entry ID. |
| * @return {@code True} in case the entry really was deleted form the file system by this call. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private boolean delete(IgniteUuid trashId, String name, IgniteUuid id) throws IgniteCheckedException { |
| assert name != null; |
| assert id != null; |
| |
| while (true) { |
| IgfsEntryInfo info = meta.info(id); |
| |
| if (info != null) { |
| if (info.isDirectory()) { |
| if (!deleteDirectoryContents(trashId, id)) |
| return false; |
| |
| if (meta.delete(trashId, name, id)) |
| return true; |
| } |
| else { |
| assert info.isFile(); |
| |
| // Lock the file with special lock Id to prevent concurrent writing: |
| IgfsEntryInfo lockedInfo = meta.lock(id, true); |
| |
| if (lockedInfo == null) |
| return false; // File is locked, we cannot delete it. |
| |
| assert id.equals(lockedInfo.id()); |
| |
| // Delete file content first. |
| // In case this node crashes, other node will re-delete the file. |
| data.delete(lockedInfo).get(); |
| |
| boolean ret = meta.delete(trashId, name, id); |
| |
| if (ret) { |
| IgfsPath path = IgfsUtils.extractOriginalPathFromTrash(name); |
| |
| assert path != null; |
| |
| IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_PURGED); |
| } |
| |
| return ret; |
| } |
| } |
| else |
| return false; // Entry was deleted concurrently. |
| } |
| } |
| |
| /** |
| * Remove particular entry from the trash directory or subdirectory. |
| * |
| * @param parentId Parent ID. |
| * @param id Entry id. |
| * @return true iff all the items in the directory were deleted (directory is seen to be empty). |
| * @throws IgniteCheckedException If delete failed for some reason. |
| */ |
| private boolean deleteDirectoryContents(IgniteUuid parentId, final IgniteUuid id) throws IgniteCheckedException { |
| assert parentId != null; |
| assert id != null; |
| |
| while (true) { |
| IgfsEntryInfo info = meta.info(id); |
| |
| if (info != null) { |
| assert info.isDirectory(); |
| |
| final Map<String, IgfsListingEntry> listing = info.listing(); |
| |
| if (listing.isEmpty()) |
| return true; // Directory is empty. |
| |
| final Map<String, IgfsListingEntry> delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f); |
| |
| final GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(); |
| |
| int failedFiles = 0; |
| |
| for (final Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) { |
| if (cancelled) |
| return false; |
| |
| if (entry.getValue().isDirectory()) { |
| if (deleteDirectoryContents(id, entry.getValue().fileId())) // *** Recursive call. |
| delListing.put(entry.getKey(), entry.getValue()); |
| else |
| failedFiles++; |
| } |
| else { |
| IgfsEntryInfo fileInfo = meta.info(entry.getValue().fileId()); |
| |
| if (fileInfo != null) { |
| assert fileInfo.isFile(); |
| |
| IgfsEntryInfo lockedInfo = meta.lock(fileInfo.id(), true); |
| |
| if (lockedInfo == null) |
| // File is already locked: |
| failedFiles++; |
| else { |
| assert IgfsUtils.DELETE_LOCK_ID.equals(lockedInfo.lockId()); |
| |
| fut.add(data.delete(lockedInfo)); |
| |
| delListing.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| |
| if (delListing.size() == MAX_DELETE_BATCH) |
| break; |
| } |
| |
| fut.markInitialized(); |
| |
| // Wait for data cache to delete values before clearing meta cache. |
| try { |
| fut.get(); |
| } |
| catch (IgniteFutureCancelledCheckedException ignore) { |
| // This future can be cancelled only due to IGFS shutdown. |
| cancelled = true; |
| |
| return false; |
| } |
| |
| // Actual delete of folder content. |
| Collection<IgniteUuid> delIds = meta.delete(id, delListing); |
| |
| if (listing.size() == delIds.size()) |
| return true; // All entries were deleted. |
| |
| if (listing.size() == delListing.size() + failedFiles) |
| // All the files were tried, no reason to continue the loop: |
| return false; |
| } |
| else |
| return true; // Directory entry was deleted concurrently. |
| } |
| } |
| } |