blob: 02576a22806852e23c2a0caa0f4b3c1a67bbaa50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.concurrent;
import java.util.concurrent.TimeUnit;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
/**
* Manages dedicated worker pools to schedule and execute _blocking_ tasks to avoid blocking netty eventloop.
* It is a facade to handle one-off and periodic blocking execution.
*/
@Singleton
public class ExecutorPools
{
// a pool for tasks that are not expected to block for too long, therefore will free up more quickly
private final TaskExecutorPool taskExecutors;
// a pool for tasks that are expected to block for a long time, and the nature of these tasks is that
// they are not expected to complete immediately and can be queued if all the threads are busy
private final TaskExecutorPool internalTaskExecutors;
@Inject
public ExecutorPools(Vertx vertx, ServiceConfiguration configuration)
{
this.taskExecutors = new TaskExecutorPool(vertx, configuration.serverWorkerPoolConfiguration());
this.internalTaskExecutors = new TaskExecutorPool(vertx, configuration.serverInternalWorkerPoolConfiguration());
}
/**
* @return the executor pool to run blocking code for client facing http requests.
* The blocking code should not block the thread for too long.
*/
public TaskExecutorPool service()
{
return taskExecutors;
}
/**
* @return the executor pool to run blocking code for internal.
* Unlike the pool {@link #service()}, the blocking code can block the thread for a longer duration.
* Tasks submitted to this pool can be queued if all threads are busy.
*/
public TaskExecutorPool internal()
{
return internalTaskExecutors;
}
public Future<Void> close()
{
return taskExecutors.closeInternal().onComplete(v -> internalTaskExecutors.closeInternal());
}
/**
* A pool of executors backed by {@link WorkerExecutor} and {@link Vertx}
*/
public static class TaskExecutorPool implements WorkerExecutor
{
private final Vertx vertx;
private final WorkerExecutor workerExecutor;
private TaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config)
{
this.vertx = vertx;
this.workerExecutor = vertx.createSharedWorkerExecutor(config.workerPoolName(),
config.workerPoolSize(),
config.workerMaxExecutionTimeMillis(),
TimeUnit.MILLISECONDS);
}
/**
* Like {@link #setPeriodic(long, Handler, boolean)} with the handler guaranteed to be executed in a
* worker thread and ordered = false.
*
* @param delay the delay in milliseconds, after which the timer will fire
* @param handler the handler that will be called with the timer ID when the timer fires
* @return the unique identifier for the timer
*/
public long setPeriodic(long delay, Handler<Long> handler)
{
return setPeriodic(delay, handler, false);
}
/**
* Like {@link Vertx#setPeriodic(long, Handler)} with the handler guaranteed to be executed in a worker thread.
*
* @param delay the delay in milliseconds, after which the timer will fire
* @param handler the handler that will be called with the timer ID when the timer fires
* @param ordered if true and if executeBlocking is called several times on the same context, the executions
* for that context will be executed serially, not in parallel. The periodic scheduled
* executions could be delayed if the prior execution on the same context is taking longer.
* If false then they will be no ordering guarantees
* @return the unique identifier for the timer
*/
public long setPeriodic(long delay, Handler<Long> handler, boolean ordered)
{
return vertx.setPeriodic(delay,
id -> workerExecutor.executeBlocking(promise -> {
handler.handle(id);
promise.complete();
}, ordered));
}
/**
* Like {@link #setTimer(long, Handler)} with the handler guaranteed to be executed in a
* worker thread and ordered = false.
*
* @param delay the delay in milliseconds, after which the timer will fire
* @param handler the handler that will be called with the timer ID when the timer fires
* @return the unique identifier for the timer
*/
public long setTimer(long delay, Handler<Long> handler)
{
return vertx.setTimer(delay, id -> workerExecutor.executeBlocking(promise -> handler.handle(id), false));
}
/**
* Like {@link Vertx#setTimer(long, Handler)} with the handler guaranteed to be executed in a worker thread.
*
* @param delay the delay in milliseconds, after which the timer will fire
* @param handler the handler that will be called with the timer ID when the timer fires
* @param ordered if true and if executeBlocking is called several times on the same context, the executions
* for that context will be executed serially, not in parallel. The periodic scheduled
* executions could be delayed if the prior execution on the same context is taking longer.
* If false then they will be no ordering guarantees
* @return the unique identifier for the timer
*/
public long setTimer(long delay, Handler<Long> handler, boolean ordered)
{
return vertx.setTimer(delay, id -> workerExecutor.executeBlocking(promise -> handler.handle(id), ordered));
}
/**
* Delegate to {@link Vertx#cancelTimer(long)}
*
* @param id The id of the timer to cancel
* @return {@code true} if the timer was successfully cancelled, {@code false} otherwise
*/
public boolean cancelTimer(long id)
{
return vertx.cancelTimer(id);
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler)
{
workerExecutor.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler,
boolean ordered)
{
return workerExecutor.executeBlocking(blockingCodeHandler, ordered);
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler)
{
workerExecutor.executeBlocking(blockingCodeHandler, asyncResultHandler);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler)
{
return workerExecutor.executeBlocking(blockingCodeHandler);
}
@Override
public void close(Handler<AsyncResult<Void>> handler)
{
throw new UnsupportedOperationException("Closing TaskExecutorPool is not supported!");
}
@Override
public Future<Void> close()
{
throw new UnsupportedOperationException("Closing TaskExecutorPool is not supported!");
}
private Future<Void> closeInternal()
{
return workerExecutor == null
? Future.succeededFuture()
: workerExecutor.close();
}
}
}