blob: dfbab72a5d0bd7e9439c769a2f4af9361c973926 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.localtask;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
/**
* Processor that is responsible for durable background tasks that are executed on local node
* and should be continued even after node restart.
*/
public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
CheckpointListener {
/** Prefix for metastorage keys for durable background tasks. */
private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
/** Metastorage. */
private volatile ReadWriteMetastorage metastorage;
/** Metastorage synchronization mutex. */
private final Object metaStorageMux = new Object();
/** Set of workers that executing durable background tasks. */
private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
/** Count of workers that executing durable background tasks. */
private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
/** Durable background tasks map. */
private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
/** Set of started tasks' names. */
private final Set<String> startedTasks = new GridConcurrentHashSet<>();
/**
* Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
*
* @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
*/
private volatile boolean forbidStartingNewTasks;
/**
* @param ctx Kernal context.
*/
public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
super(ctx);
}
/**
* Starts the asynchronous operation of pending tasks execution. Is called on start.
*/
private void asyncDurableBackgroundTasksExecution() {
assert durableBackgroundTasks != null;
for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
if (!task.isCompleted() && startedTasks.add(task.shortName()))
asyncDurableBackgroundTaskExecute(task);
}
}
/**
* Creates a worker to execute single durable background task.
*
* @param task Task.
*/
private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
@Override public void cancel() {
task.onCancel();
super.cancel();
}
@Override protected void body() {
try {
if (forbidStartingNewTasks)
return;
log.info("Executing durable background task: " + task.shortName());
task.execute(ctx);
task.complete();
log.info("Execution of durable background task completed: " + task.shortName());
}
catch (Throwable e) {
log.error("Could not execute durable background task: " + task.shortName(), e);
}
finally {
startedTasks.remove(task.shortName());
asyncDurableBackgroundTaskWorkers.remove(this);
}
}
};
asyncDurableBackgroundTaskWorkers.add(worker);
Thread asyncTask = new IgniteThread(worker);
asyncTask.start();
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) {
asyncDurableBackgroundTasksExecution();
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
forbidStartingNewTasks = true;
awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
/**
* @param msg Message.
*/
public void onStateChange(ChangeGlobalStateMessage msg) {
if (msg.state() == ClusterState.INACTIVE) {
forbidStartingNewTasks = true;
awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
}
}
/**
* @param msg Message.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (msg.state() != ClusterState.INACTIVE) {
forbidStartingNewTasks = false;
asyncDurableBackgroundTasksExecution();
}
}
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
synchronized (metaStorageMux) {
if (durableBackgroundTasks.isEmpty()) {
try {
metastorage.iterate(
STORE_DURABLE_BACKGROUND_TASK_PREFIX,
(key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
true
);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to iterate durable background tasks storage.", e);
}
}
}
}
/** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
synchronized (metaStorageMux) {
try {
for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
if (metastorage.readRaw(entry.getKey()) == null)
metastorage.write(entry.getKey(), entry.getValue());
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to read key from durable background tasks storage.", e);
}
}
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
this.metastorage = metastorage;
}
/**
* Builds a metastorage key for durable background task object.
*
* @param obj Object.
* @return Metastorage key.
*/
private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
if (k.length() > MetastorageTree.MAX_KEY_LEN) {
int hashLenLimit = 5;
String hash = String.valueOf(k.hashCode());
k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
(hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
}
return k;
}
/**
* Adds durable background task object.
*
* @param obj Object.
*/
private void addDurableBackgroundTask(DurableBackgroundTask obj) {
String objName = durableBackgroundTaskMetastorageKey(obj);
synchronized (metaStorageMux) {
durableBackgroundTasks.put(objName, obj);
if (metastorage != null) {
ctx.cache().context().database().checkpointReadLock();
try {
metastorage.write(objName, obj);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.cache().context().database().checkpointReadUnlock();
}
}
}
}
/**
* Removes durable background task object.
*
* @param obj Object.
*/
private void removeDurableBackgroundTask(DurableBackgroundTask obj) {
String objName = durableBackgroundTaskMetastorageKey(obj);
synchronized (metaStorageMux) {
durableBackgroundTasks.remove(objName);
if (metastorage != null) {
ctx.cache().context().database().checkpointReadLock();
try {
metastorage.remove(objName);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.cache().context().database().checkpointReadUnlock();
}
}
}
}
/**
* Starts durable background task. If task is applied to persistent cache, saves it to metastorage.
*
* @param task Continuous task.
* @param ccfg Cache configuration.
*/
public void startDurableBackgroundTask(DurableBackgroundTask task, CacheConfiguration ccfg) {
if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
addDurableBackgroundTask(task);
asyncDurableBackgroundTaskExecute(task);
}
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) {
/* No op. */
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) {
/* No op. */
}
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) {
/* No op. */
}
/** {@inheritDoc} */
@Override public void afterCheckpointEnd(Context ctx) {
for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
if (task.isCompleted())
removeDurableBackgroundTask(task);
}
}
}