blob: 03ad782a3729570ba374bd53c2ca50990bbecd41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import static java.util.stream.IntStream.range;
/**
* Striped executor.
*/
public class StripedExecutor implements ExecutorService {
/** Stripes. */
private final Stripe[] stripes;
/** Threshold for starvation checks */
private final long threshold;
/** */
private final IgniteLogger log;
/**
* @param cnt Count.
* @param igniteInstanceName Node name.
* @param poolName Pool name.
* @param log Logger.
* @param errHnd Critical failure handler.
* @param gridWorkerLsnr Listener to link with every stripe worker.
*/
public StripedExecutor(
int cnt,
String igniteInstanceName,
String poolName,
final IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
GridWorkerListener gridWorkerLsnr,
long failureDetectionTimeout
) {
this(cnt, igniteInstanceName, poolName, log, errHnd, false, gridWorkerLsnr, failureDetectionTimeout);
}
/**
* @param cnt Count.
* @param igniteInstanceName Node name.
* @param poolName Pool name.
* @param log Logger.
* @param errHnd Critical failure handler.
* @param stealTasks {@code True} to steal tasks.
* @param gridWorkerLsnr listener to link with every stripe worker.
*/
public StripedExecutor(
int cnt,
String igniteInstanceName,
String poolName,
final IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
boolean stealTasks,
GridWorkerListener gridWorkerLsnr,
long failureDetectionTimeout
) {
A.ensure(cnt > 0, "cnt > 0");
boolean success = false;
stripes = new Stripe[cnt];
threshold = failureDetectionTimeout;
this.log = log;
try {
for (int i = 0; i < cnt; i++) {
stripes[i] = stealTasks
? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd, gridWorkerLsnr)
: new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd, gridWorkerLsnr);
}
for (int i = 0; i < cnt; i++)
stripes[i].start();
success = true;
}
catch (Error | RuntimeException e) {
U.error(log, "Failed to initialize striped pool.", e);
throw e;
}
finally {
if (!success) {
for (Stripe stripe : stripes)
U.cancel(stripe);
for (Stripe stripe : stripes)
U.join(stripe, log);
}
}
}
/**
* Checks starvation in striped pool. Maybe too verbose
* but this is needed to faster debug possible issues.
*
* @return Flag representing presence of possible starvation in striped pool.
*/
public boolean detectStarvation() {
boolean starvationDetected = false;
for (Stripe stripe : stripes) {
boolean active = stripe.active;
long lastStartedTs = stripe.lastStartedTs;
if (active && lastStartedTs + threshold < U.currentTimeMillis()) {
starvationDetected = true;
boolean deadlockPresent = U.deadlockPresent();
GridStringBuilder sb = new GridStringBuilder();
sb.a(">>> Possible starvation in striped pool.").a(U.nl())
.a(" Thread name: ").a(stripe.thread.getName()).a(U.nl())
.a(" Queue: ").a(stripe.queueToString()).a(U.nl())
.a(" Deadlock: ").a(deadlockPresent).a(U.nl())
.a(" Completed: ").a(stripe.completedCnt).a(U.nl());
U.printStackTrace(
stripe.thread.getId(),
sb);
String msg = sb.toString();
U.warn(log, msg);
}
}
return starvationDetected;
}
/**
* @return Stripes count.
*/
public int stripesCount() {
return stripes.length;
}
/**
* @return Stripes of this executor.
*/
public Stripe[] stripes() {
return stripes;
}
/**
* Execute command.
*
* @param idx Index.
* @param cmd Command.
*/
public void execute(int idx, Runnable cmd) {
if (idx == -1)
execute(cmd);
else {
assert idx >= 0 : idx;
stripes[idx % stripes.length].execute(cmd);
}
}
/** {@inheritDoc} */
@Override public void shutdown() {
signalStop();
}
/** {@inheritDoc} */
@Override public void execute(@NotNull Runnable cmd) {
stripes[ThreadLocalRandom.current().nextInt(stripes.length)].execute(cmd);
}
/**
* {@inheritDoc}
*
* @return Empty list (always).
*/
@NotNull @Override public List<Runnable> shutdownNow() {
signalStop();
return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public boolean awaitTermination(
long timeout,
@NotNull TimeUnit unit
) throws InterruptedException {
awaitStop();
return true;
}
/** {@inheritDoc} */
@Override public boolean isShutdown() {
for (Stripe stripe : stripes) {
if (stripe != null && stripe.isCancelled())
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public boolean isTerminated() {
for (Stripe stripe : stripes) {
if (stripe.thread.getState() != Thread.State.TERMINATED)
return false;
}
return true;
}
/**
* Stops executor.
*/
public void stop() {
signalStop();
awaitStop();
}
/**
* Signals all stripes.
*/
private void signalStop() {
for (Stripe stripe : stripes)
U.cancel(stripe);
}
/**
* Waits for all stripes to stop.
*/
private void awaitStop() {
for (Stripe stripe : stripes)
U.join(stripe, log);
}
/**
* @return Return total queue size of all stripes.
*/
public int queueSize() {
int size = 0;
for (Stripe stripe : stripes)
size += stripe.queueSize();
return size;
}
/**
* @param idx Stripe index.
* @return Queue size of specific stripe.
*/
public int queueStripeSize(int idx) {
A.ensure(idx >= 0, "Stripe index should be non-negative: " + idx);
return stripes[idx % stripes.length].queueSize();
}
/**
* @return Completed tasks count.
*/
public long completedTasks() {
long cnt = 0;
for (Stripe stripe : stripes)
cnt += stripe.completedCnt;
return cnt;
}
/**
* @return Completed tasks per stripe count.
*/
public long[] stripesCompletedTasks() {
long[] res = new long[stripesCount()];
for (int i = 0; i < res.length; i++)
res[i] = stripes[i].completedCnt;
return res;
}
/**
* @return Number of active tasks per stripe.
*/
public boolean[] stripesActiveStatuses() {
boolean[] res = new boolean[stripesCount()];
for (int i = 0; i < res.length; i++)
res[i] = stripes[i].active;
return res;
}
/**
* @return Number of active tasks.
*/
public int activeStripesCount() {
int res = 0;
for (boolean status : stripesActiveStatuses()) {
if (status)
res++;
}
return res;
}
/**
* @return Size of queue per stripe.
*/
public int[] stripesQueueSizes() {
int[] res = new int[stripesCount()];
for (int i = 0; i < res.length; i++)
res[i] = stripes[i].queueSize();
return res;
}
/**
* Operation not supported.
*/
@NotNull @Override public <T> Future<T> submit(
@NotNull Runnable task,
T res
) {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@NotNull @Override public Future<?> submit(@NotNull Runnable task) {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@NotNull @Override public <T> Future<T> submit(@NotNull Callable<T> task) {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks)
throws InterruptedException {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@NotNull @Override public <T> List<Future<T>> invokeAll(
@NotNull Collection<? extends Callable<T>> tasks,
long timeout,
@NotNull TimeUnit unit
) throws InterruptedException {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
/**
* Operation not supported.
*/
@Override public <T> T invokeAny(
@NotNull Collection<? extends Callable<T>> tasks,
long timeout,
@NotNull TimeUnit unit
) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
/**
* Method for await all task completion in some specific striped indexes or all.
* The method guarantees what all tasks which were passed before this call
* will be completed before method return control.
*
* @param stripes Striped idxs for await. Empty params means await all stripes.
*/
public void awaitComplete(int... stripes) throws InterruptedException {
CountDownLatch awaitLatch;
if (stripes.length == 0) {
awaitLatch = new CountDownLatch(stripesCount());
// We have to ensure that all asynchronous updates are done.
// StripedExecutor guarantees ordering inside stripe - it would enough to await "finishing" tasks.
range(0, stripesCount()).forEach(idx -> execute(idx, awaitLatch::countDown));
}
else {
awaitLatch = new CountDownLatch(stripes.length);
for (int idx : stripes)
execute(idx, awaitLatch::countDown);
}
while (true) {
if (awaitLatch.await(60, TimeUnit.SECONDS))
break;
else {
U.log(log, "Await stripes executor complete tasks" +
", awaitLatch=" + awaitLatch.getCount() +
", stripes=" + (stripes.length == 0 ?
Arrays.toString(range(0, stripesCount()).toArray()) : Arrays.toString(stripes)) +
", queueSize=" + Arrays.toString(stripesQueueSizes()) +
", activeStatus=" + Arrays.toString(stripesActiveStatuses()));
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StripedExecutor.class, this);
}
/**
* Stripe.
*/
public abstract static class Stripe extends GridWorker {
/** */
private final String igniteInstanceName;
/** */
protected final int idx;
/** */
private final IgniteLogger log;
/** */
private volatile long completedCnt;
/** */
private volatile boolean active;
/** */
private volatile long lastStartedTs;
/** Thread executing the loop. */
protected Thread thread;
/** Critical failure handler. */
private IgniteInClosure<Throwable> errHnd;
/**
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
* @param errHnd Exception handler.
* @param gridWorkerLsnr listener to link with stripe worker.
*/
public Stripe(
String igniteInstanceName,
String poolName,
int idx,
IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
GridWorkerListener gridWorkerLsnr
) {
super(igniteInstanceName, poolName + "-stripe-" + idx, log, gridWorkerLsnr);
this.igniteInstanceName = igniteInstanceName;
this.idx = idx;
this.log = log;
this.errHnd = errHnd;
}
/**
* Starts the stripe.
*/
void start() {
thread = new IgniteThread(igniteInstanceName,
name(),
this,
IgniteThread.GRP_IDX_UNASSIGNED,
idx,
GridIoPolicy.UNDEFINED);
thread.start();
}
/** {@inheritDoc} */
@SuppressWarnings("NonAtomicOperationOnVolatileField")
@Override public void body() {
while (!isCancelled()) {
Runnable cmd;
try {
blockingSectionBegin();
try {
cmd = take();
}
finally {
blockingSectionEnd();
}
if (cmd != null) {
active = true;
lastStartedTs = U.currentTimeMillis();
updateHeartbeat();
try {
cmd.run();
}
finally {
active = false;
completedCnt++;
}
}
onIdle();
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
}
catch (Throwable e) {
errHnd.apply(e);
U.error(log, "Failed to execute runnable.", e);
}
}
if (!isCancelled) {
errHnd.apply(new IllegalStateException("Thread " + Thread.currentThread().getName() +
" is terminated unexpectedly"));
}
}
/**
* Execute the command.
*
* @param cmd Command.
*/
abstract void execute(Runnable cmd);
/**
* @return Next runnable.
* @throws InterruptedException If interrupted.
*/
abstract Runnable take() throws InterruptedException;
/**
* @return Queue size.
*/
abstract int queueSize();
/**
* @return Stripe's queue to string presentation.
*/
abstract String queueToString();
/**
* @return Stripe queue.
*/
public abstract Queue<Runnable> queue();
/**
* @return Stripe index.
*/
public int index() {
return idx;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Stripe.class, this);
}
}
/**
* Stripe.
*/
private static class StripeConcurrentQueue extends Stripe {
/** */
private static final int IGNITE_TASKS_STEALING_THRESHOLD =
IgniteSystemProperties.getInteger(
IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4);
/** Queue. */
private final Queue<Runnable> queue;
/** */
@GridToStringExclude
private final Stripe[] others;
/** */
private volatile boolean parked;
/**
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
* @param errHnd Critical failure handler.
* @param gridWorkerLsnr listener to link with stripe worker.
*/
StripeConcurrentQueue(
String igniteInstanceName,
String poolName,
int idx,
IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
GridWorkerListener gridWorkerLsnr
) {
this(igniteInstanceName, poolName, idx, log, null, errHnd, gridWorkerLsnr);
}
/**
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
* @param errHnd Critical failure handler.
* @param gridWorkerLsnr listener to link with stripe worker.
*/
StripeConcurrentQueue(
String igniteInstanceName,
String poolName,
int idx,
IgniteLogger log,
Stripe[] others,
IgniteInClosure<Throwable> errHnd,
GridWorkerListener gridWorkerLsnr
) {
super(
igniteInstanceName,
poolName,
idx,
log,
errHnd,
gridWorkerLsnr);
this.others = others;
this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
}
/** {@inheritDoc} */
@Override Runnable take() throws InterruptedException {
Runnable r;
for (int i = 0; i < 2048; i++) {
r = queue.poll();
if (r != null)
return r;
}
parked = true;
try {
for (;;) {
r = queue.poll();
if (r != null)
return r;
if (others != null) {
int len = others.length;
int init = ThreadLocalRandom.current().nextInt(len);
int cur = init;
while (true) {
if (cur != idx) {
Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue;
if (queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
return r;
}
if ((cur = (cur + 1) % len) == init)
break;
}
}
LockSupport.park();
if (Thread.interrupted())
throw new InterruptedException();
}
}
finally {
parked = false;
}
}
/** {@inheritDoc} */
@Override void execute(Runnable cmd) {
queue.add(cmd);
if (parked)
LockSupport.unpark(thread);
if (others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) {
for (Stripe other : others) {
if (((StripeConcurrentQueue)other).parked)
LockSupport.unpark(other.thread);
}
}
}
/** {@inheritDoc} */
@Override String queueToString() {
return String.valueOf(queue);
}
/** {@inheritDoc} */
@Override public Queue<Runnable> queue() {
return queue;
}
/** {@inheritDoc} */
@Override int queueSize() {
return queue.size();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StripeConcurrentQueue.class, this, super.toString());
}
}
}