blob: 6bde0628238046c9f8739aa013ce0b63444b3afd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.soroushbot.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a simple thread pool that send each job to a thread based on the jobs topic,
*/
public class MultiQueueWithTopicThreadPool {
private static Logger log = LoggerFactory.getLogger(MultiQueueWithTopicThreadPool.class);
private boolean shutdown; //default is false
private int poolSize;
private List<PoolWorker> workers = new ArrayList<>();
public MultiQueueWithTopicThreadPool(int poolSize, int capacity, String namePrefix) {
if (log.isDebugEnabled()) {
log.debug("creating MultiQueueWithTopicThreadPool with size " + poolSize + " and capacity of each queue is set to " + capacity);
}
this.poolSize = poolSize;
//create a pool of thread and start them
for (int i = 0; i < poolSize; i++) {
PoolWorker e = new PoolWorker(capacity);
workers.add(e);
e.start();
e.setName(namePrefix + " #" + i);
}
}
/**
* add the runnable into corresponding queue and it when it reach to the head of queue
* the queue is decided based on {@code topic}. if topic is instance of Integer,
* it uses (topic%poolSize) to determine corresponding queue otherwise it uses
* (topic.hashCode()%poolsize) do determine corresponding queue.
*
* @param topic tasks are organized between threads based on this parameter
* @param runnable the task that should be executed
* @throws IllegalStateException if the {@code runnable} cannot be added at this
* time due to queue capacity restrictions
*/
public void execute(Object topic, Runnable runnable) throws IllegalStateException {
if (shutdown) {
throw new RejectedExecutionException("pool has been shutdown");
}
int selectedQueue;
if (topic instanceof Integer) {
selectedQueue = ((Integer) topic) % poolSize;
} else {
selectedQueue = topic.hashCode() % poolSize;
}
PoolWorker poolWorker = workers.get(selectedQueue);
synchronized (poolWorker) {
poolWorker.enqueue(runnable);
}
}
public void shutdown() {
shutdown = true;
}
}
/**
* Each PoolWorker is a thread that when it is idle, it pick the head from its and
* execute it.
*/
class PoolWorker extends Thread {
private static Logger log = LoggerFactory.getLogger(PoolWorker.class);
final LinkedBlockingQueue<Runnable> queue;
public PoolWorker(int capacity) {
// if capacity <=0 then the queue capacity should be {@link Integer#MAX_VALUE}
if (capacity > 0) {
queue = new LinkedBlockingQueue<>(capacity);
} else {
queue = new LinkedBlockingQueue<>();
}
}
/**
* ad new runnable to queue and notify corresponding thread to execute newly added
* runnable if the thread is idle.
*
* @param r a runnable to execute by this threadPool
*/
public void enqueue(Runnable r) {
synchronized (queue) {
queue.add(r);
queue.notify();
}
}
@Override
public void run() {
while (true) {
Runnable task;
synchronized (queue) {
// while queue is empty wait for queue to become populated
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
log.error("An interrupt occurred while queue is waiting: " + e.getMessage());
//interrupt current thread to prevent the interrupt being swallowed.
Thread.currentThread().interrupt();
}
}
//poll next task as we know it is exists in the queue
task = queue.poll();
//double check!
if (task == null) {
continue;
}
}
try {
task.run();
} catch (RuntimeException e) {
//catch RuntimeException that may thrown in the task
log.error("Thread pool is interrupted due to an issue: " + e.getMessage());
}
}
}
}