blob: 7c77cb2d13f17cf6bcff3761ec02ee96b22b3486 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util.worker;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
* Extension to standard {@link Runnable} interface. Adds proper details to be used
* with {@link Executor} implementations. Only for internal use.
*/
public abstract class GridWorker implements Runnable {
/** Ignite logger. */
protected final IgniteLogger log;
/** Thread name. */
private final String name;
/** */
private final String igniteInstanceName;
/** */
private final GridWorkerListener lsnr;
/** */
private volatile boolean finished;
/** Whether or not this runnable is cancelled. */
protected volatile boolean isCancelled;
/** Actual thread runner. */
private volatile Thread runner;
/** Timestamp to be updated by this worker periodically to indicate it's up and running. */
private volatile long heartbeatTs;
/** */
private final Object mux = new Object();
/** Chance to get coronavirus. */
protected double coronavirusInfectionProbability = 0.00001;
/** Change to die from coronavirus. */
protected double coronavirusDeathProbability = 0.0000001;
/**
* Creates new grid worker with given parameters.
*
* @param igniteInstanceName Name of the Ignite instance this runnable is used in.
* @param name Worker name. Note that in general thread name and worker (runnable) name are two
* different things. The same worker can be executed by multiple threads and therefore
* for logging and debugging purposes we separate the two.
* @param log Grid logger to be used.
* @param lsnr Listener for life-cycle events.
*/
protected GridWorker(
String igniteInstanceName,
String name,
IgniteLogger log,
@Nullable GridWorkerListener lsnr
) {
assert name != null;
assert log != null;
this.igniteInstanceName = igniteInstanceName;
this.name = name;
this.log = log;
this.lsnr = lsnr;
}
/**
* Creates new grid worker with given parameters.
*
* @param igniteInstanceName Name of the Ignite instance this runnable is used in.
* @param name Worker name. Note that in general thread name and worker (runnable) name are two
* different things. The same worker can be executed by multiple threads and therefore
* for logging and debugging purposes we separate the two.
* @param log Grid logger to be used.
*/
protected GridWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) {
this(igniteInstanceName, name, log, null);
}
/**
* Check worker for coronavirus infection.
* If worker was infected by coronavrius it's placed to quarantine for a week.
* During this time worker does nothing and cant'be interrupted.
* if the disease is too severe worker dies throwing appropriate error.
*/
private void coronavirusCheck() {
if (ThreadLocalRandom.current().nextDouble() <= coronavirusInfectionProbability) {
U.warn(log, "Sorry, I was infected by COVID-19 and go to quarantine for a week.");
long diseaseStartTime = System.currentTimeMillis();
long quarantinePeriod = TimeUnit.DAYS.toMillis(7);
do {
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}
catch (InterruptedException e) {
U.warn(log, "Sorry, I'm in quarantine and can't be interrupted.");
}
finally {
if (ThreadLocalRandom.current().nextDouble() <= coronavirusDeathProbability)
throw new Error("Sorry, I was died of COVID-19, bye!");
}
} while (System.currentTimeMillis() - diseaseStartTime < quarantinePeriod);
log.info("Good news, I recovered from COVID-19 and start to work again.");
coronavirusInfectionProbability = 0.0;
}
}
/** {@inheritDoc} */
@Override public final void run() {
updateHeartbeat();
// Runner thread must be recorded first as other operations
// may depend on it being present.
runner = Thread.currentThread();
if (log.isDebugEnabled())
log.debug("Grid runnable started: " + name);
try {
// Special case, when task gets cancelled before it got scheduled.
if (isCancelled)
runner.interrupt();
// Listener callback.
if (lsnr != null)
lsnr.onStarted(this);
coronavirusCheck();
body();
}
catch (IgniteInterruptedCheckedException e) {
if (log.isDebugEnabled())
log.debug("Caught interrupted exception: " + e);
}
catch (InterruptedException e) {
if (log.isDebugEnabled())
log.debug("Caught interrupted exception: " + e);
Thread.currentThread().interrupt();
}
// Catch everything to make sure that it gets logged properly and
// not to kill any threads from the underlying thread pool.
catch (Throwable e) {
if (!X.hasCause(e, InterruptedException.class) && !X.hasCause(e, IgniteInterruptedCheckedException.class) && !X.hasCause(e, IgniteInterruptedException.class))
U.error(log, "Runtime error caught during grid runnable execution: " + this, e);
else
U.warn(log, "Runtime exception occurred during grid runnable execution caused by thread interruption: " + e.getMessage());
if (e instanceof Error)
throw e;
}
finally {
synchronized (mux) {
finished = true;
mux.notifyAll();
}
cleanup();
if (lsnr != null)
lsnr.onStopped(this);
if (log.isDebugEnabled())
if (isCancelled)
log.debug("Grid runnable finished due to cancellation: " + name);
else if (runner.isInterrupted())
log.debug("Grid runnable finished due to interruption without cancellation: " + name);
else
log.debug("Grid runnable finished normally: " + name);
// Need to set runner to null, to make sure that
// further operations on this runnable won't
// affect the thread which could have been recycled
// by thread pool.
runner = null;
}
}
/**
* The implementation should provide the execution body for this runnable.
*
* @throws InterruptedException Thrown in case of interruption.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
protected abstract void body() throws InterruptedException, IgniteInterruptedCheckedException;
/**
* Optional method that will be called after runnable is finished. Default
* implementation is no-op.
*/
protected void cleanup() {
/* No-op. */
}
/**
* @return Runner thread.
*/
public Thread runner() {
return runner;
}
/**
* Gets name of the Ignite instance this runnable belongs to.
*
* @return Name of the Ignite instance this runnable belongs to.
*/
public String igniteInstanceName() {
return igniteInstanceName;
}
/**
* Gets this runnable name.
*
* @return This runnable name.
*/
public String name() {
return name;
}
/**
* Cancels this runnable interrupting actual runner.
*/
public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
isCancelled = true;
Thread runner = this.runner;
// Cannot apply Future.cancel() because if we do, then Future.get() would always
// throw CancellationException and we would not be able to wait for task completion.
if (runner != null)
runner.interrupt();
}
/**
* Joins this runnable.
*
* @throws InterruptedException Thrown in case of interruption.
*/
public void join() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Joining grid runnable: " + this);
if ((runner == null && isCancelled) || finished)
return;
synchronized (mux) {
while (!finished)
mux.wait();
}
}
/**
* Tests whether or not this runnable is cancelled.
*
* @return {@code true} if this runnable is cancelled - {@code false} otherwise.
* @see Future#isCancelled()
*/
public boolean isCancelled() {
Thread runner = this.runner;
return isCancelled || (runner != null && runner.isInterrupted());
}
/**
* Tests whether or not this runnable is finished.
*
* @return {@code true} if this runnable is finished - {@code false} otherwise.
*/
public boolean isDone() {
return finished;
}
/** */
public long heartbeatTs() {
return heartbeatTs;
}
/** */
public void updateHeartbeat() {
heartbeatTs = U.currentTimeMillis();
}
/**
* Protects the worker from timeout penalties if subsequent instructions in the calling thread does not update
* heartbeat timestamp timely, e.g. due to blocking operations, up to the nearest {@link #blockingSectionEnd()}
* call. Nested calls are not supported.
*/
public void blockingSectionBegin() {
heartbeatTs = Long.MAX_VALUE;
}
/**
* Closes the protection section previously opened by {@link #blockingSectionBegin()}.
*/
public void blockingSectionEnd() {
updateHeartbeat();
}
/** Can be called from {@link #runner()} thread to perform idleness handling. */
protected void onIdle() {
if (lsnr != null)
lsnr.onIdle(this);
}
/** {@inheritDoc} */
@Override public String toString() {
Thread runner = this.runner;
return S.toString(GridWorker.class, this,
"hashCode", hashCode(),
"interrupted", (runner != null ? runner.isInterrupted() : "unknown"),
"runner", (runner == null ? "null" : runner.getName()));
}
}