blob: 9fb998f691e562f6018d53d180e7c7123d72d155 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.localtask;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
/**
* Processor that is responsible for durable background tasks that are executed on local node.
*/
public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
CheckpointListener {
/** Prefix for metastorage keys for durable background tasks. */
private static final String TASK_PREFIX = "durable-background-task-";
/** MetaStorage synchronization mutex. */
private final Object metaStorageMux = new Object();
/** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
/** Lock for canceling tasks. */
private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
/**
* Tasks to be removed from the MetaStorage after the end of a checkpoint.
* Mapping: {@link DurableBackgroundTask#name task name} -> task.
*/
private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
/** Prohibiting the execution of tasks. */
private volatile boolean prohibitionExecTasks = true;
/**
* Constructor.
*
* @param ctx Kernal context.
*/
public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
super(ctx);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
cancelTasks();
}
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
metaStorageOperation(metaStorage -> {
assert metaStorage != null;
metaStorage.iterate(
TASK_PREFIX,
(k, v) -> {
DurableBackgroundTask t = (DurableBackgroundTask)v;
tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
},
true
);
});
}
/** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
}
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) {
/* No op. */
}
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) {
for (Iterator<DurableBackgroundTaskState> it = tasks.values().iterator(); it.hasNext(); ) {
DurableBackgroundTaskState taskState = it.next();
if (taskState.state() == COMPLETED) {
assert taskState.saved();
DurableBackgroundTask t = taskState.task();
toRmv.put(t.name(), t);
it.remove();
}
}
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) {
/* No op. */
}
/** {@inheritDoc} */
@Override public void afterCheckpointEnd(Context ctx) {
for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
DurableBackgroundTask t = it.next();
metaStorageOperation(metaStorage -> {
if (metaStorage != null && toRmv.containsKey(t.name()))
metaStorage.remove(metaStorageKey(t));
});
it.remove();
}
}
/**
* Callback at the start of a global state change.
*
* @param msg Message for change cluster global state.
*/
public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
if (msg.state() == ClusterState.INACTIVE)
cancelTasks();
}
/**
* Callback on finish of a global state change.
*
* @param msg Finish message for change cluster global state.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (msg.state() != ClusterState.INACTIVE) {
prohibitionExecTasks = false;
for (DurableBackgroundTaskState taskState : tasks.values()) {
if (!prohibitionExecTasks)
executeAsync0(taskState.task());
}
}
}
/**
* Asynchronous execution of a durable background task.
*
* A new task will be added for execution either if there is no task with
* the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
*
* If the task is required to be completed after restarting the node,
* then it must be saved to the MetaStorage.
*
* If the task is saved to the Metastorage, then it will be deleted from it
* only after its completion and at the end of the checkpoint. Otherwise, it
* will be removed as soon as it is completed.
*
* @param task Durable background task.
* @param save Save task to MetaStorage.
* @return Futures that will complete when the task is completed.
*/
public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
if (prev != null && prev.state() != COMPLETED)
throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
if (save)
toRmv.remove(taskName);
return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
});
if (save) {
metaStorageOperation(metaStorage -> {
if (metaStorage != null)
metaStorage.write(metaStorageKey(task), task);
});
}
if (!prohibitionExecTasks)
executeAsync0(task);
return taskState.outFuture();
}
/**
* Overloading the {@link #executeAsync(DurableBackgroundTask, boolean)}.
* If task is applied to persistent cache, saves it to MetaStorage.
*
* @param t Durable background task.
* @param cacheCfg Cache configuration.
* @return Futures that will complete when the task is completed.
*/
public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
}
/**
* Asynchronous execution of a durable background task.
*
* @param t Durable background task.
*/
private void executeAsync0(DurableBackgroundTask t) {
cancelLock.readLock().lock();
try {
DurableBackgroundTaskState taskState = tasks.get(t.name());
if (taskState != null && taskState.state(INIT, PREPARE)) {
if (log.isInfoEnabled())
log.info("Executing durable background task: " + t.name());
t.executeAsync(ctx).listen(f -> {
DurableBackgroundTaskResult res = null;
try {
res = f.get();
}
catch (Throwable e) {
log.error("Task completed with an error: " + t.name(), e);
}
assert res != null;
if (res.error() != null)
log.error("Could not execute durable background task: " + t.name(), res.error());
if (res.completed()) {
if (res.error() == null && log.isInfoEnabled())
log.info("Execution of durable background task completed: " + t.name());
if (taskState.saved())
taskState.state(COMPLETED);
else
tasks.remove(t.name());
GridFutureAdapter<Void> outFut = taskState.outFuture();
if (outFut != null)
outFut.onDone(res.error());
}
else {
assert res.restart();
if (log.isInfoEnabled())
log.info("Execution of durable background task will be restarted: " + t.name());
taskState.state(INIT);
}
});
taskState.state(PREPARE, STARTED);
}
}
finally {
cancelLock.readLock().unlock();
}
}
/**
* Canceling tasks that are currently being executed.
* Prohibiting the execution of tasks.
*/
private void cancelTasks() {
prohibitionExecTasks = true;
cancelLock.writeLock().lock();
try {
for (DurableBackgroundTaskState taskState : tasks.values()) {
if (taskState.state() == STARTED)
taskState.task().cancel();
}
}
finally {
cancelLock.writeLock().unlock();
}
}
/**
* Performing an operation on a {@link MetaStorage}.
* Guarded by {@link #metaStorageMux}.
*
* @param consumer MetaStorage operation, argument can be {@code null}.
* @throws IgniteException If an exception is thrown from the {@code consumer}.
*/
private void metaStorageOperation(IgniteThrowableConsumer<MetaStorage> consumer) throws IgniteException {
synchronized (metaStorageMux) {
IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database();
dbMgr.checkpointReadLock();
try {
MetaStorage metaStorage = dbMgr.metaStorage();
consumer.accept(metaStorage);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
dbMgr.checkpointReadUnlock();
}
}
}
/**
* Getting the task key for the MetaStorage.
*
* @param t Durable background task.
* @return MetaStorage {@code t} key.
*/
static String metaStorageKey(DurableBackgroundTask t) {
return TASK_PREFIX + t.name();
}
}