blob: 8881640d5702230aa4f39929a9303a2561340f9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.stat;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
/**
* Executor with busy run support.
* Can run any tasks while active and safelly wait untill they stopped.
*/
public class BusyExecutor {
/** Logger. */
private final IgniteLogger log;
/** Executor name. */
private final String name;
/** Active flag (used to skip commands in inactive cluster.) */
private volatile boolean active;
/** Lock protection of started gathering during deactivation. */
private volatile GridBusyLock busyLock = new GridBusyLock();
/** Executor pool. */
private final IgniteThreadPoolExecutor pool;
/** Cancellable tasks. */
private final GridConcurrentHashSet<CancellableTask> cancellableTasks = new GridConcurrentHashSet<>();
/** External stopping supplier. */
Supplier<Boolean> stopping;
/**
* Constructor.
*
* @param name Executor name.
* @param pool Underlying thread pool executor.
* @param stopping External stopping state supplier.
* @param logSupplier Log supplier.
*/
public BusyExecutor(
String name,
IgniteThreadPoolExecutor pool,
Supplier<Boolean> stopping,
Function<Class<?>, IgniteLogger> logSupplier) {
this.name = name;
this.pool = pool;
this.stopping = stopping;
this.log = logSupplier.apply(StatisticsProcessor.class);
busyLock.block();
}
/**
* Allow operations.
*/
public synchronized void activate() {
busyLock = new GridBusyLock();
active = true;
if (log.isDebugEnabled())
log.debug("Busy executor " + name + " activated.");
}
/**
* Stop all running tasks. Block new task scheduling, execute cancell runnable and wait till each task stops.
*/
public synchronized void deactivate() {
if (!active)
return;
active = false;
if (log.isDebugEnabled())
log.debug("Busy executor " + name + " deactivating.");
cancellableTasks.forEach(CancellableTask::cancel);
busyLock.block();
if (log.isDebugEnabled())
log.debug("Busy executor " + name + " deactivated.");
}
/**
* Run task on busy lock.
*
* @param r Task to run.
* @return {@code true} if task was succesfully scheduled, {@code false} - otherwise (due to inactive state).
*/
public boolean busyRun(Runnable r) {
return busyRun(r, busyLock);
}
/**
* Run task under specified busyLock.
*
* @param r Task to run.
* @param taskLock BusyLock to use.
* @return {@code true} if task was succesfully scheduled, {@code false} - otherwise (due to inactive state).
*/
private boolean busyRun(Runnable r, GridBusyLock taskLock) {
if (!taskLock.enterBusy())
return false;
try {
if (!active)
return false;
r.run();
return true;
}
catch (Throwable t) {
if (stopping.get())
log.debug("Unexpected exception on statistics processing: " + t);
else
log.warning("Unexpected exception on statistics processing: " + t.getMessage(), t);
}
finally {
taskLock.leaveBusy();
}
return false;
}
/**
* Submit task to execute in thread pool under busy lock.
* Task surrounded with try/catch and if it's complete with any exception - resulting future will return
*
* @param r Task to execute.
* @return Completable future with executed flag in result.
*/
public CompletableFuture<Boolean> submit(Runnable r) {
GridBusyLock lock = busyLock;
CompletableFuture<Boolean> res = new CompletableFuture<>();
if (r instanceof CancellableTask) {
CancellableTask ct = (CancellableTask)r;
res.thenApply(result -> {
cancellableTasks.remove(ct);
return result;
});
cancellableTasks.add(ct);
}
pool.execute(() -> res.complete(busyRun(r, lock)));
return res;
}
/**
* Execute task in thread pool under busy lock.
*
* @param r Task to execute.
*/
public void execute(Runnable r) {
submit(r);
}
/**
* Execute cancellable task in thread pool under busy lock. Track task to cancel on executor stop.
*
* @param ct Cancellable task to execute.
*/
public void execute(CancellableTask ct) {
GridBusyLock lock = busyLock;
cancellableTasks.add(ct);
pool.execute(() -> {
busyRun(ct, lock);
cancellableTasks.remove(ct);
});
}
}