blob: 8dadf46060fd5775ff2326e605794218ea51120f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.logging;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor.FunctionExecutionRejectedExecutionHandler;
import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
import org.apache.geode.distributed.internal.QueueStatHelper;
import org.apache.geode.distributed.internal.SerialQueuedExecutorWithDMStats;
import org.apache.geode.internal.ScheduledThreadPoolExecutorWithKeepAlive;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandWrapper;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer;
public class CoreLoggingExecutorsTest {
private CommandWrapper commandWrapper;
private PoolStatHelper poolStatHelper;
private QueueStatHelper queueStatHelper;
private Runnable runnable;
private ThreadInitializer threadInitializer;
private ThreadsMonitoring threadsMonitoring;
@Before
public void setUp() {
commandWrapper = mock(CommandWrapper.class);
poolStatHelper = mock(PoolStatHelper.class);
queueStatHelper = mock(QueueStatHelper.class);
runnable = mock(Runnable.class);
threadInitializer = mock(ThreadInitializer.class);
threadsMonitoring = mock(ThreadsMonitoring.class);
}
/**
* Creates, passes in, and uses OverflowQueueWithDMStats.
*
* <p>
* Uses {@code ThreadPoolExecutor.AbortPolicy}.
*/
@Test
public void newFixedThreadPoolWithTimeout() {
int poolSize = 5;
int keepAliveTime = 2;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newFixedThreadPoolWithTimeout(poolSize, keepAliveTime, MINUTES, queueStatHelper,
threadName);
assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(poolSize);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
OverflowQueueWithDMStats overflowQueueWithDMStats =
(OverflowQueueWithDMStats) executor.getQueue();
assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates and passes in OverflowQueueWithDMStats. FunctionExecutionPooledExecutor then creates
* and passes up SynchronousQueue, but finally uses OverflowQueueWithDMStats.
*
* <p>
* {@code getBufferQueue()} returns passed-in OverflowQueueWithDMStats. {@code getQueue()} returns
* internal SynchronousQueue.
*
* <p>
* Uses {@code ThreadPoolExecutor.AbortPolicy}.
*/
@Test
public void newFunctionThreadPoolWithFeedStatistics() {
int poolSize = 5;
int workQueueSize = 2;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newFunctionThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper,
threadName, threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(FunctionExecutionPooledExecutor.class);
FunctionExecutionPooledExecutor executor = (FunctionExecutionPooledExecutor) executorService;
assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(FunctionExecutionRejectedExecutionHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
OverflowQueueWithDMStats overflowQueueWithDMStats =
(OverflowQueueWithDMStats) executor.getBufferQueue();
assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Passes in and uses BlockingQueue.
*
* <p>
* Uses {@code PooledExecutorWithDMStats.BlockHandler}.
*/
@Test
public void newSerialThreadPool() {
BlockingQueue<Runnable> workQueue = mock(BlockingQueue.class);
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newSerialThreadPool(workQueue, threadName, threadInitializer, commandWrapper,
poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(SerialQueuedExecutorWithDMStats.class);
SerialQueuedExecutorWithDMStats executor = (SerialQueuedExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(1);
assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
assertThat(executor.getPoolStatHelper()).isSameAs(poolStatHelper);
assertThat(executor.getQueue()).isSameAs(workQueue);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BlockHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates, passes in, and uses OverflowQueueWithDMStats.
*
* <p>
* Uses {@code PooledExecutorWithDMStats.BlockHandler}.
*/
@Test
public void newSerialThreadPoolWithFeedStatistics() {
int workQueueSize = 2;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newSerialThreadPoolWithFeedStatistics(workQueueSize, queueStatHelper, threadName,
threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(SerialQueuedExecutorWithDMStats.class);
SerialQueuedExecutorWithDMStats executor = (SerialQueuedExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(1);
assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
assertThat(executor.getPoolStatHelper()).isSameAs(poolStatHelper);
assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BlockHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates, passes up, and uses SynchronousQueue. ScheduledThreadPoolExecutorWithKeepAlive then
* creates a ThreadPoolExecutor to delegate to.
*
* <p>
* Uses {@code ScheduledThreadPoolExecutorWithKeepAlive.BlockCallerPolicy}.
*/
@Test
public void newScheduledThreadPool() {
int poolSize = 10;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newScheduledThreadPool(poolSize, keepAliveTime, unit, threadName, threadsMonitoring);
assertThat(executorService).isInstanceOf(ScheduledThreadPoolExecutorWithKeepAlive.class);
ScheduledThreadPoolExecutorWithKeepAlive executor =
(ScheduledThreadPoolExecutorWithKeepAlive) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getDelegateExecutor()).isInstanceOf(ScheduledThreadPoolExecutor.class);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ScheduledThreadPoolExecutorWithKeepAlive.BlockCallerPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ScheduledThreadPoolExecutor delegate = executor.getDelegateExecutor();
assertThat(delegate.getContinueExistingPeriodicTasksAfterShutdownPolicy()).isFalse();
assertThat(delegate.getExecuteExistingDelayedTasksAfterShutdownPolicy()).isFalse();
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates and passes up BlockingQueue. PooledExecutorWithDMStats then creates and passes up
* SynchronousQueue
*/
@Test
public void newThreadPool() {
int poolSize = 10;
BlockingQueue<Runnable> workQueue = mock(BlockingQueue.class);
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newThreadPool(poolSize, workQueue, threadName, threadInitializer, commandWrapper,
poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor = (PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates ArrayBlockingQueue but then uses SynchronousQueue
*/
@Test
public void newThreadPoolWithFixedFeed() {
int poolSize = 10;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
int workQueueSize = 2;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newThreadPoolWithFixedFeed(poolSize, keepAliveTime, unit, workQueueSize, threadName,
commandWrapper, poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor =
(PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Creates and passes in OverflowQueueWithDMStats. PooledExecutorWithDMStats then creates and
* uses SynchronousQueue.
*/
@Test
public void newThreadPoolWithFeedStatistics() {
int poolSize = 10;
int workQueueSize = 2;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper, threadName,
threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor =
(PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithSynchronousFeed() {
int poolSize = 10;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
poolSize, threadName, commandWrapper);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor =
(PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithSynchronousFeed_2() {
int poolSize = 10;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
poolSize, keepAliveTime, unit, threadName, commandWrapper, poolStatHelper,
threadsMonitoring);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor =
(PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithSynchronousFeed_3() {
int poolSize = 10;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
RejectedExecutionHandler rejectedExecutionHandler = mock(RejectedExecutionHandler.class);
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
poolSize, keepAliveTime, unit, threadName, rejectedExecutionHandler, poolStatHelper);
assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
PooledExecutorWithDMStats executor =
(PooledExecutorWithDMStats) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler()).isSameAs(rejectedExecutionHandler);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithSynchronousFeed_4() {
int corePoolSize = 10;
int maximumPoolSize = 20;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
corePoolSize, maximumPoolSize, keepAliveTime, unit, threadName, threadInitializer,
commandWrapper);
assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
/**
* Used for P2P Reader Threads in ConnectionTable
*/
@Test
public void newThreadPoolWithSynchronousFeed_5() {
int corePoolSize = 10;
int maximumPoolSize = 20;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
corePoolSize, maximumPoolSize, keepAliveTime, unit, threadName);
assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithSynchronousFeedThatHandlesRejection() {
int corePoolSize = 10;
int maximumPoolSize = 20;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors
.newThreadPoolWithSynchronousFeedThatHandlesRejection(corePoolSize, maximumPoolSize,
keepAliveTime, unit, threadName, threadInitializer, commandWrapper);
assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(CoreLoggingExecutors.QueuingRejectedExecutionHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
@Test
public void newThreadPoolWithUnlimitedFeed() {
int poolSize = 10;
long keepAliveTime = 5000;
TimeUnit unit = SECONDS;
String threadName = "thread";
ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithUnlimitedFeed(
poolSize, keepAliveTime, unit, threadName, threadInitializer, commandWrapper,
poolStatHelper, threadsMonitoring);
assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
assertThat(executor.getCorePoolSize()).isEqualTo(1);
assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
assertThat(executor.getRejectedExecutionHandler())
.isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
ThreadFactory threadFactory = executor.getThreadFactory();
Thread thread = threadFactory.newThread(runnable);
assertThat(thread).isInstanceOf(LoggingThread.class);
assertThat(thread.getName()).contains(threadName);
}
}