blob: 88f32e9ba3dae4b4f31f464e0b4101124d565455 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.maintenance;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/** */
public class MaintenanceProcessor extends GridProcessorAdapter implements MaintenanceRegistry {
/** */
private static final String DISABLED_ERR_MSG = "Maintenance Mode is not supported for in-memory and client nodes";
/**
* Active {@link MaintenanceTask}s are the ones that were read from disk when node entered Maintenance Mode.
*/
private final Map<String, MaintenanceTask> activeTasks = new ConcurrentHashMap<>();
/**
* Requested {@link MaintenanceTask}s are collection of tasks requested by user
* or other components when node operates normally (not in Maintenance Mode).
*/
private final Map<String, MaintenanceTask> requestedTasks = new ConcurrentHashMap<>();
/** */
private final Map<String, MaintenanceWorkflowCallback> workflowCallbacks = new ConcurrentHashMap<>();
/** */
private final MaintenanceFileStore fileStorage;
/** */
private final boolean disabled;
/** */
private volatile boolean maintenanceMode;
/**
* @param ctx Kernal context.
*/
public MaintenanceProcessor(GridKernalContext ctx) {
super(ctx);
disabled = !CU.isPersistenceEnabled(ctx.config()) || ctx.clientNode();
if (disabled) {
fileStorage = new MaintenanceFileStore(true,
null,
null,
null);
return;
}
fileStorage = new MaintenanceFileStore(false,
ctx.pdsFolderResolver(),
ctx.config().getDataStorageConfiguration().getFileIOFactory(),
log);
}
/** {@inheritDoc} */
@Override public @Nullable MaintenanceTask registerMaintenanceTask(MaintenanceTask task) throws IgniteCheckedException {
if (disabled)
throw new IgniteCheckedException(DISABLED_ERR_MSG);
if (isMaintenanceMode())
throw new IgniteCheckedException("Node is already in Maintenance Mode, " +
"registering additional maintenance task is not allowed in Maintenance Mode.");
MaintenanceTask oldTask = requestedTasks.put(task.name(), task);
if (oldTask != null) {
log.info(
"Maintenance Task with name " + task.name() +
" is already registered" +
(oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : ".") +
" It will be replaced with new task" +
task.parameters() != null ? " with parameters " + task.parameters() : "" + "."
);
}
try {
fileStorage.writeMaintenanceTask(task);
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to register maintenance task " + task, e);
}
return oldTask;
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
try {
fileStorage.stop();
}
catch (IOException e) {
log.warning("Failed to free maintenance file resources", e);
}
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (disabled)
return;
try {
fileStorage.init();
activeTasks.putAll(fileStorage.getAllTasks());
maintenanceMode = !activeTasks.isEmpty();
}
catch (Throwable t) {
log.warning("Caught exception when starting MaintenanceProcessor," +
" maintenance mode won't be entered", t);
activeTasks.clear();
fileStorage.clear();
}
}
/** {@inheritDoc} */
@Override public void prepareAndExecuteMaintenance() {
if (isMaintenanceMode()) {
workflowCallbacks.entrySet().removeIf(cbE ->
{
if (!cbE.getValue().shouldProceedWithMaintenance()) {
unregisterMaintenanceTask(cbE.getKey());
return true;
}
return false;
}
);
}
if (!workflowCallbacks.isEmpty()) {
if (log.isInfoEnabled()) {
String mntcTasksNames = String.join(", ", workflowCallbacks.keySet());
log.info("Node requires maintenance, non-empty set of maintenance tasks is found: [" +
mntcTasksNames + ']');
}
proceedWithMaintenance();
}
else if (isMaintenanceMode()) {
if (log.isInfoEnabled()) {
log.info("All maintenance tasks are fixed, no need to enter maintenance mode. " +
"Restart the node to get it back to normal operations.");
}
}
}
/**
* Handles all {@link MaintenanceTask maintenance tasks} left
* after {@link MaintenanceRegistry#prepareAndExecuteMaintenance()} check.
*
* If a task defines an action that should be started automatically (e.g. defragmentation starts automatically,
* no additional confirmation from user is required), it is executed.
*
* Otherwise waits for user to trigger actions for maintenance tasks.
*/
private void proceedWithMaintenance() {
for (Map.Entry<String, MaintenanceWorkflowCallback> cbE : workflowCallbacks.entrySet()) {
MaintenanceAction<?> mntcAct = cbE.getValue().automaticAction();
if (mntcAct != null) {
try {
mntcAct.execute();
}
catch (Throwable t) {
log.warning("Failed to execute automatic action for maintenance task: " +
activeTasks.get(cbE.getKey()), t);
throw t;
}
}
}
}
/** {@inheritDoc} */
@Override public @Nullable MaintenanceTask activeMaintenanceTask(String maitenanceTaskName) {
return activeTasks.get(maitenanceTaskName);
}
/** {@inheritDoc} */
@Override public boolean isMaintenanceMode() {
return maintenanceMode;
}
/** {@inheritDoc} */
@Override public boolean unregisterMaintenanceTask(String maintenanceTaskName) {
if (disabled)
return false;
boolean deleted;
if (isMaintenanceMode())
deleted = activeTasks.remove(maintenanceTaskName) != null;
else
deleted = requestedTasks.remove(maintenanceTaskName) != null;
try {
fileStorage.deleteMaintenanceTask(maintenanceTaskName);
}
catch (IOException e) {
log.warning("Failed to clear maintenance task with name "
+ maintenanceTaskName
+ " from file, whole file will be deleted", e
);
fileStorage.clear();
}
return deleted;
}
/** {@inheritDoc} */
@Override public void registerWorkflowCallback(@NotNull String maintenanceTaskName, @NotNull MaintenanceWorkflowCallback cb) {
if (disabled)
throw new IgniteException(DISABLED_ERR_MSG);
List<MaintenanceAction<?>> actions = cb.allActions();
if (actions == null || actions.isEmpty())
throw new IgniteException("Maintenance workflow callback should provide at least one maintenance action");
int size = actions.size();
long distinctSize = actions.stream().map(MaintenanceAction::name).distinct().count();
if (distinctSize < size)
throw new IgniteException("All actions of a single workflow should have unique names: " +
actions.stream().map(MaintenanceAction::name).collect(Collectors.joining(", ")));
Optional<String> wrongActionName = actions
.stream()
.map(MaintenanceAction::name)
.filter(name -> !U.alphanumericUnderscore(name))
.findFirst();
if (wrongActionName.isPresent())
throw new IgniteException(
"All actions' names should contain only alphanumeric and underscore symbols: "
+ wrongActionName.get());
workflowCallbacks.put(maintenanceTaskName, cb);
}
/** {@inheritDoc} */
@Override public List<MaintenanceAction<?>> actionsForMaintenanceTask(String maintenanceTaskName) {
if (disabled)
throw new IgniteException(DISABLED_ERR_MSG);
if (!activeTasks.containsKey(maintenanceTaskName))
throw new IgniteException("Maintenance workflow callback for given task name not found, " +
"cannot retrieve maintenance actions for it: " + maintenanceTaskName);
return workflowCallbacks.get(maintenanceTaskName).allActions();
}
}