blob: 6b53a10dbd01ec9aea5410ac07e128de7a288f92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.visor.persistence;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CheckCorruptedCacheStoresCleanAction;
import org.apache.ignite.internal.processors.cache.persistence.CleanCacheStoresMaintenanceAction;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
/** */
@GridInternal
@GridVisorManagementTask
public class PersistenceTask extends VisorOneNodeTask<PersistenceTaskArg, PersistenceTaskResult> {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final String BACKUP_FOLDER_PREFIX = "backup_";
/** {@inheritDoc} */
@Override protected VisorJob<PersistenceTaskArg, PersistenceTaskResult> job(PersistenceTaskArg arg) {
return new PersistenceJob(arg, debug);
}
/** */
private static class PersistenceJob extends VisorJob<PersistenceTaskArg, PersistenceTaskResult> {
/** */
private static final long serialVersionUID = 0L;
/**
* Create job with specified argument.
*
* @param arg Job argument.
* @param debug Flag indicating whether debug information should be printed into node log.
*/
protected PersistenceJob(@Nullable PersistenceTaskArg arg, boolean debug) {
super(arg, debug);
}
/** {@inheritDoc} */
@Override protected PersistenceTaskResult run(@Nullable PersistenceTaskArg arg) throws IgniteException {
if (!ignite.context().maintenanceRegistry().isMaintenanceMode())
return new PersistenceTaskResult(false);
switch (arg.operation()) {
case CLEAN:
return clean(arg);
case BACKUP:
return backup(arg);
default:
return info();
}
}
/** */
private PersistenceTaskResult backup(PersistenceTaskArg arg) {
PersistenceCleanAndBackupSettings backupSettings = arg.cleanAndBackupSettings();
MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
MaintenanceTask task = mntcReg.activeMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
File workDir = ((FilePageStoreManager) ignite.context().cache().context().pageStore()).workDir();
switch (backupSettings.cleanAndBackupType()) {
case ALL:
return backupAll(workDir);
case CORRUPTED:
return backupCaches(workDir, corruptedCacheDirectories(task));
default:
return backupCaches(workDir, cacheDirectoriesFromCacheNames(backupSettings.cacheNames()));
}
}
/** */
private PersistenceTaskResult backupAll(File workDir) {
GridCacheProcessor cacheProc = ignite.context().cache();
List<String> allCacheDirs = cacheProc.cacheDescriptors()
.values()
.stream()
.map(desc -> cacheDirName(desc.cacheConfiguration()))
.distinct()
.collect(Collectors.toList());
return backupCaches(workDir, allCacheDirs);
}
/** */
private PersistenceTaskResult backupCaches(File workDir, List<String> cacheDirs) {
PersistenceTaskResult res = new PersistenceTaskResult(true);
List<String> backupCompletedCaches = new ArrayList<>();
List<String> backupFailedCaches = new ArrayList<>();
for (String dir : cacheDirs) {
String backupDirName = BACKUP_FOLDER_PREFIX + dir;
File backupDir = new File(workDir, backupDirName);
if (!backupDir.exists()) {
try {
U.ensureDirectory(backupDir, backupDirName, null);
copyCacheFiles(workDir.toPath().resolve(dir).toFile(), backupDir);
backupCompletedCaches.add(backupDirName);
} catch (IgniteCheckedException | IOException e) {
backupFailedCaches.add(dir);
}
}
}
res.handledCaches(backupCompletedCaches);
res.failedCaches(backupFailedCaches);
return res;
}
/** */
private void copyCacheFiles(File sourceDir, File backupDir) throws IOException {
for (File f : sourceDir.listFiles())
Files.copy(f.toPath(), backupDir.toPath().resolve(f.getName()), StandardCopyOption.REPLACE_EXISTING);
}
/** */
private PersistenceTaskResult clean(PersistenceTaskArg arg) {
PersistenceTaskResult res = new PersistenceTaskResult();
PersistenceCleanAndBackupSettings cleanSettings = arg.cleanAndBackupSettings();
GridCacheProcessor cacheProc = ignite.context().cache();
MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
switch (cleanSettings.cleanAndBackupType()) {
case ALL:
return cleanAll(cacheProc, mntcReg);
case CORRUPTED:
return cleanCorrupted(mntcReg);
case CACHES:
return cleanCaches(cacheProc, mntcReg, cleanSettings.cacheNames());
}
return res;
}
/** */
private PersistenceTaskResult cleanCaches(
GridCacheProcessor cacheProc,
MaintenanceRegistry mntcReg,
List<String> cacheNames
) {
PersistenceTaskResult res = new PersistenceTaskResult(true);
List<String> cleanedCaches = new ArrayList<>();
List<String> failedToCleanCaches = new ArrayList<>();
DataStorageConfiguration dsCfg = ignite.context().config().getDataStorageConfiguration();
IgnitePageStoreManager pageStore = cacheProc.context().pageStore();
AtomicReference<String> missedCache = new AtomicReference<>();
Boolean allExist = cacheNames
.stream()
.map(name -> {
if (cacheProc.cacheDescriptor(name) != null)
return true;
else {
missedCache.set(name);
return false;
}
})
.reduce(true, (t, u) -> t && u);
if (!allExist)
throw new IllegalArgumentException("Cache with name " + missedCache.get() +
" not found, no caches will be cleaned.");
for (String name : cacheNames) {
DynamicCacheDescriptor cacheDescr = cacheProc.cacheDescriptor(name);
if (CU.isPersistentCache(cacheDescr.cacheConfiguration(), dsCfg)) {
try {
pageStore.cleanupPersistentSpace(cacheDescr.cacheConfiguration());
cleanedCaches.add(cacheDirName(cacheDescr.cacheConfiguration()));
}
catch (IgniteCheckedException e) {
failedToCleanCaches.add(name);
}
}
}
res.handledCaches(cleanedCaches);
if (!failedToCleanCaches.isEmpty())
res.failedCaches(failedToCleanCaches);
List<MaintenanceAction<?>> actions = mntcReg.actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
Optional<MaintenanceAction<?>> checkActionOpt = actions.stream()
.filter(a -> a.name().equals(CheckCorruptedCacheStoresCleanAction.ACTION_NAME))
.findFirst();
if (checkActionOpt.isPresent()) {
MaintenanceAction<Boolean> action = (MaintenanceAction<Boolean>)checkActionOpt.get();
Boolean mntcTaskCompleted = action.execute();
res.maintenanceTaskCompleted(mntcTaskCompleted);
if (mntcTaskCompleted)
mntcReg.unregisterMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
}
return res;
}
/** */
private PersistenceTaskResult cleanAll(GridCacheProcessor cacheProc, MaintenanceRegistry mntcReg) {
PersistenceTaskResult res = new PersistenceTaskResult(true);
List<String> allCacheDirs = cacheProc.cacheDescriptors()
.values()
.stream()
.map(desc -> cacheDirName(desc.cacheConfiguration()))
.collect(Collectors.toList());
try {
cacheProc.cleanupCachesDirectories();
} catch (IgniteCheckedException e) {
throw U.convertException(e);
}
mntcReg.unregisterMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
res.maintenanceTaskCompleted(true);
res.handledCaches(allCacheDirs);
return res;
}
/** */
private PersistenceTaskResult cleanCorrupted(MaintenanceRegistry mntcReg) {
PersistenceTaskResult res = new PersistenceTaskResult(true);
List<MaintenanceAction<?>> actions = mntcReg
.actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
Optional<MaintenanceAction<?>> cleanCorruptedActionOpt = actions
.stream()
.filter(a -> a.name().equals(CleanCacheStoresMaintenanceAction.ACTION_NAME))
.findFirst();
if (cleanCorruptedActionOpt.isPresent()) {
cleanCorruptedActionOpt.get().execute();
MaintenanceTask corruptedTask = mntcReg.activeMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
mntcReg.unregisterMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
res.handledCaches(
corruptedCacheDirectories(corruptedTask)
);
res.maintenanceTaskCompleted(true);
}
return res;
}
/** */
private PersistenceTaskResult info() {
PersistenceTaskResult res = new PersistenceTaskResult(true);
GridCacheProcessor cacheProc = ignite.context().cache();
DataStorageConfiguration dsCfg = ignite.context().config().getDataStorageConfiguration();
List<String> corruptedCacheNames = corruptedCacheDirectories(ignite.context().maintenanceRegistry()
.activeMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME));
Map<String, IgniteBiTuple<Boolean, Boolean>> cachesInfo = new HashMap<>();
for (DynamicCacheDescriptor desc : cacheProc.cacheDescriptors().values()) {
if (!CU.isPersistentCache(desc.cacheConfiguration(), dsCfg))
continue;
CacheGroupDescriptor grpDesc = desc.groupDescriptor();
if (grpDesc != null) {
boolean globalWalEnabled = grpDesc.walEnabled();
boolean localWalEnabled = true;
if (globalWalEnabled && corruptedCacheNames.contains(desc.cacheName()))
localWalEnabled = false;
cachesInfo.put(desc.cacheName(), new IgniteBiTuple<>(globalWalEnabled, localWalEnabled));
}
}
res.cachesInfo(cachesInfo);
return res;
}
/** */
private List<String> corruptedCacheDirectories(MaintenanceTask task) {
String params = task.parameters();
String[] namesArr = params.split(Pattern.quote(File.separator));
return Arrays.asList(namesArr);
}
/** */
private List<String> cacheDirectoriesFromCacheNames(List<String> cacheNames) {
GridCacheProcessor cacheProc = ignite.context().cache();
DataStorageConfiguration dsCfg = ignite.configuration().getDataStorageConfiguration();
AtomicReference<String> missedCache = new AtomicReference<>();
Boolean allExist = cacheNames.stream()
.map(s -> {
if (cacheProc.cacheDescriptor(s) != null)
return true;
else {
missedCache.set(s);
return false;
}
})
.reduce(true, (u, v) -> u && v);
if (!allExist)
throw new IllegalArgumentException("Cache with name " + missedCache.get() +
" not found, no caches will be backed up.");
return cacheNames.stream()
.filter(s -> cacheProc.cacheDescriptor(s) != null)
.filter(s ->
CU.isPersistentCache(cacheProc.cacheDescriptor(s).cacheConfiguration(), dsCfg))
.map(s -> cacheProc.cacheDescriptor(s).cacheConfiguration())
.map(FilePageStoreManager::cacheDirName)
.distinct()
.collect(Collectors.toList());
}
}
}