JAVA-262: Make internal executors customizable (#760)
diff --git a/changelog/README.md b/changelog/README.md
index ac06947..0133281 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -4,6 +4,7 @@
- [bug] JAVA-1312: QueryBuilder modifies selected columns when manually selected.
- [improvement] JAVA-1303: Add missing BoundStatement.setRoutingKey(ByteBuffer...)
+- [improvement] JAVA-262: Make internal executors customizable
### 3.0.4
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
index 902a022..33e553f 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
@@ -25,7 +25,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
-import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +70,6 @@
@VisibleForTesting
static final int NEW_NODE_DELAY_SECONDS = SystemProperties.getInt("com.datastax.driver.NEW_NODE_DELAY_SECONDS", 1);
- private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt("com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE",
- Runtime.getRuntime().availableProcessors());
private static final ResourceBundle driverProperties = ResourceBundle.getBundle("com.datastax.driver.core.Driver");
@@ -80,8 +77,6 @@
// multiple Cluster instance are created in the same JVM.
private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0);
- private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
-
private static final int NOTIF_LOCK_TIMEOUT_SECONDS = SystemProperties.getInt("com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS", 60);
final Manager manager;
@@ -1222,6 +1217,19 @@
}
/**
+ * Sets the threading options to use for the newly created Cluster.
+ * <p/>
+ * If no options are set through this method, a new instance of {@link ThreadingOptions} will be used.
+ *
+ * @param options the options.
+ * @return this builder.
+ */
+ public Builder withThreadingOptions(ThreadingOptions options) {
+ configurationBuilder.withThreadingOptions(options);
+ return this;
+ }
+
+ /**
* Set the {@link NettyOptions} to use for the newly created Cluster.
* <p/>
* If no Netty options are set through this method, {@link NettyOptions#DEFAULT_INSTANCE}
@@ -1310,20 +1318,15 @@
final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.DefaultConvictionPolicy.Factory();
- ScheduledThreadPoolExecutor reconnectionExecutor;
- ScheduledThreadPoolExecutor scheduledTasksExecutor;
-
- // Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
ListeningExecutorService executor;
-
- // Work Queue used by executor.
- LinkedBlockingQueue<Runnable> executorQueue;
-
- // An executor for tasks that might block some time, like creating new connection, but are generally not too critical.
ListeningExecutorService blockingExecutor;
+ ScheduledExecutorService reconnectionExecutor;
+ ScheduledExecutorService scheduledTasksExecutor;
- // Work Queue used by blockingExecutor.
- LinkedBlockingQueue<Runnable> blockingExecutorQueue;
+ BlockingQueue<Runnable> executorQueue;
+ BlockingQueue<Runnable> blockingExecutorQueue;
+ BlockingQueue<Runnable> reconnectionExecutorQueue;
+ BlockingQueue<Runnable> scheduledTasksExecutorQueue;
ConnectionReaper reaper;
@@ -1362,16 +1365,31 @@
this.configuration.register(this);
- this.executorQueue = new LinkedBlockingQueue<Runnable>();
- this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "worker", executorQueue);
- this.blockingExecutorQueue = new LinkedBlockingQueue<Runnable>();
- this.blockingExecutor = makeExecutor(2, "blocking-task-worker", blockingExecutorQueue);
- this.reconnectionExecutor = new ScheduledThreadPoolExecutor(2, threadFactory("reconnection"));
- // scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
- // applied in the order received.
- this.scheduledTasksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory("scheduled-task-worker"));
+ ThreadingOptions threadingOptions = this.configuration.getThreadingOptions();
- this.reaper = new ConnectionReaper(this);
+ // executor
+ ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName);
+ this.executorQueue = (tmpExecutor instanceof ThreadPoolExecutor)
+ ? ((ThreadPoolExecutor) tmpExecutor).getQueue() : null;
+ this.executor = MoreExecutors.listeningDecorator(tmpExecutor);
+
+ // blocking executor
+ ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName);
+ this.blockingExecutorQueue = (tmpBlockingExecutor instanceof ThreadPoolExecutor)
+ ? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue() : null;
+ this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor);
+
+ // reconnection executor
+ this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName);
+ this.reconnectionExecutorQueue = (reconnectionExecutor instanceof ThreadPoolExecutor)
+ ? ((ThreadPoolExecutor) reconnectionExecutor).getQueue() : null;
+
+ // scheduled tasks executor
+ this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName);
+ this.scheduledTasksExecutorQueue = (scheduledTasksExecutor instanceof ThreadPoolExecutor)
+ ? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue() : null;
+
+ this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName));
this.metadata = new Metadata(this);
this.connectionFactory = new Connection.Factory(this, configuration);
this.controlConnection = new ControlConnection(this);
@@ -1515,29 +1533,6 @@
return connectionFactory.protocolVersion;
}
- ThreadFactory threadFactory(String name) {
- return new ThreadFactoryBuilder()
- .setNameFormat(clusterName + "-" + name + "-%d")
- // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
- // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
- // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
- // compatibility).
- .setThreadFactory(new DefaultThreadFactory("ignored name"))
- .build();
- }
-
- private ListeningExecutorService makeExecutor(int threads, String name, LinkedBlockingQueue<Runnable> workQueue) {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(threads,
- threads,
- DEFAULT_THREAD_KEEP_ALIVE,
- TimeUnit.SECONDS,
- workQueue,
- threadFactory(name));
-
- executor.allowCoreThreadTimeOut(true);
- return MoreExecutors.listeningDecorator(executor);
- }
-
Cluster getCluster() {
return Cluster.this;
}
@@ -2835,9 +2830,9 @@
}
};
- ConnectionReaper(Cluster.Manager manager) {
- executor = Executors.newScheduledThreadPool(1, manager.threadFactory("connection-reaper"));
- executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
+ ConnectionReaper(ScheduledExecutorService executor) {
+ this.executor = executor;
+ this.executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
}
void register(Connection connection, long terminateTime) {
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Configuration.java b/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
index 7815b7e..25e88d7 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
@@ -51,6 +51,7 @@
private final SocketOptions socketOptions;
private final MetricsOptions metricsOptions;
private final QueryOptions queryOptions;
+ private final ThreadingOptions threadingOptions;
private final NettyOptions nettyOptions;
private final CodecRegistry codecRegistry;
@@ -60,6 +61,7 @@
SocketOptions socketOptions,
MetricsOptions metricsOptions,
QueryOptions queryOptions,
+ ThreadingOptions threadingOptions,
NettyOptions nettyOptions,
CodecRegistry codecRegistry) {
this.policies = policies;
@@ -68,6 +70,7 @@
this.socketOptions = socketOptions;
this.metricsOptions = metricsOptions;
this.queryOptions = queryOptions;
+ this.threadingOptions = threadingOptions;
this.nettyOptions = nettyOptions;
this.codecRegistry = codecRegistry;
}
@@ -85,6 +88,7 @@
toCopy.getSocketOptions(),
toCopy.getMetricsOptions(),
toCopy.getQueryOptions(),
+ toCopy.getThreadingOptions(),
toCopy.getNettyOptions(),
toCopy.getCodecRegistry()
);
@@ -154,6 +158,13 @@
}
/**
+ * @return the threading options for this configuration.
+ */
+ public ThreadingOptions getThreadingOptions() {
+ return threadingOptions;
+ }
+
+ /**
* Returns the {@link NettyOptions} instance for this configuration.
*
* @return the {@link NettyOptions} instance for this configuration.
@@ -186,6 +197,7 @@
private SocketOptions socketOptions;
private MetricsOptions metricsOptions;
private QueryOptions queryOptions;
+ private ThreadingOptions threadingOptions;
private NettyOptions nettyOptions;
private CodecRegistry codecRegistry;
@@ -261,6 +273,17 @@
}
/**
+ * Sets the threading options for this cluster.
+ *
+ * @param threadingOptions the threading options to set.
+ * @return this builder.
+ */
+ public Builder withThreadingOptions(ThreadingOptions threadingOptions) {
+ this.threadingOptions = threadingOptions;
+ return this;
+ }
+
+ /**
* Sets the Netty options for this cluster.
*
* @param nettyOptions the Netty options.
@@ -297,6 +320,7 @@
socketOptions != null ? socketOptions : new SocketOptions(),
metricsOptions != null ? metricsOptions : new MetricsOptions(),
queryOptions != null ? queryOptions : new QueryOptions(),
+ threadingOptions != null ? threadingOptions : new ThreadingOptions(),
nettyOptions != null ? nettyOptions : NettyOptions.DEFAULT_INSTANCE,
codecRegistry != null ? codecRegistry : CodecRegistry.DEFAULT_INSTANCE);
}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
index b86fab3..4abba7d 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
@@ -766,9 +766,11 @@
this.authProvider = configuration.getProtocolOptions().getAuthProvider();
this.protocolVersion = configuration.getProtocolOptions().initialProtocolVersion;
this.nettyOptions = configuration.getNettyOptions();
- this.eventLoopGroup = nettyOptions.eventLoopGroup(manager.threadFactory("nio-worker"));
+ this.eventLoopGroup = nettyOptions.eventLoopGroup(
+ manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "nio-worker"));
this.channelClass = nettyOptions.channelClass();
- this.timer = nettyOptions.timer(manager.threadFactory("timeouter"));
+ this.timer = nettyOptions.timer(
+ manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "timeouter"));
}
int getPort() {
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metrics.java b/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
index 33eadc2..94f5a80 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
@@ -20,6 +20,8 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
/**
* Metrics exposed by the driver.
@@ -78,36 +80,25 @@
}
});
- private final Gauge<Integer> executorQueueDepth = registry.register("executor-queue-depth", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return manager.executorQueue.size();
- }
- });
-
- private final Gauge<Integer> blockingExecutorQueueDepth = registry.register("blocking-executor-queue-depth", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return manager.blockingExecutorQueue.size();
- }
- });
-
- private final Gauge<Integer> reconnectionSchedulerQueueSize = registry.register("reconnection-scheduler-task-count", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return manager.reconnectionExecutor.getQueue().size();
- }
- });
-
- private final Gauge<Integer> taskSchedulerQueueSize = registry.register("task-scheduler-task-count", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return manager.scheduledTasksExecutor.getQueue().size();
- }
- });
+ private final Gauge<Integer> executorQueueDepth;
+ private final Gauge<Integer> blockingExecutorQueueDepth;
+ private final Gauge<Integer> reconnectionSchedulerQueueSize;
+ private final Gauge<Integer> taskSchedulerQueueSize;
Metrics(Cluster.Manager manager) {
this.manager = manager;
+ this.executorQueueDepth = registry.register(
+ "executor-queue-depth",
+ buildQueueSizeGauge(manager.executorQueue));
+ this.blockingExecutorQueueDepth = registry.register(
+ "blocking-executor-queue-depth",
+ buildQueueSizeGauge(manager.blockingExecutorQueue));
+ this.reconnectionSchedulerQueueSize = registry.register(
+ "reconnection-scheduler-task-count",
+ buildQueueSizeGauge(manager.reconnectionExecutorQueue));
+ this.taskSchedulerQueueSize = registry.register(
+ "task-scheduler-task-count",
+ buildQueueSizeGauge(manager.scheduledTasksExecutorQueue));
if (manager.configuration.getMetricsOptions().isJMXReportingEnabled()) {
this.jmxReporter = JmxReporter.forRegistry(registry).inDomain(manager.clusterName + "-metrics").build();
this.jmxReporter.start();
@@ -228,30 +219,58 @@
}
/**
- * @return The number of queued up tasks in the non-blocking executor (Cassandra Java Driver workers).
+ * Returns the number of queued up tasks in the {@link ThreadingOptions#createExecutor(String) main internal executor}.
+ * <p/>
+ * If the executor's task queue is not accessible – which happens when the executor
+ * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+ *
+ * @return The number of queued up tasks in the main internal executor,
+ * or -1, if that number is unknown.
*/
public Gauge<Integer> getExecutorQueueDepth() {
return executorQueueDepth;
}
/**
- * @return The number of queued up tasks in the blocking executor (Cassandra Java Driver blocking tasks worker).
+ * Returns the number of queued up tasks in the {@link ThreadingOptions#createBlockingExecutor(String) blocking executor}.
+ * <p/>
+ * If the executor's task queue is not accessible – which happens when the executor
+ * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+ *
+ * @return The number of queued up tasks in the blocking executor,
+ * or -1, if that number is unknown.
*/
public Gauge<Integer> getBlockingExecutorQueueDepth() {
return blockingExecutorQueueDepth;
}
/**
- * @return The size of the work queue for the reconnection scheduler (Reconnection). A queue size > 0 does not
+ * Returns the number of queued up tasks in the {@link ThreadingOptions#createReconnectionExecutor(String) reconnection executor}.
+ * <p/>
+ * A queue size > 0 does not
* necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
+ * <p/>
+ * If the executor's task queue is not accessible – which happens when the executor
+ * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+ *
+ * @return The size of the work queue for the reconnection executor,
+ * or -1, if that number is unknown.
*/
public Gauge<Integer> getReconnectionSchedulerQueueSize() {
return reconnectionSchedulerQueueSize;
}
/**
- * @return The size of the work queue for the task scheduler (Scheduled Tasks). A queue size > 0 does not
+ * Returns the number of queued up tasks in the {@link ThreadingOptions#createScheduledTasksExecutor(String) scheduled tasks executor}.
+ * <p/>
+ * A queue size > 0 does not
* necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
+ * <p/>
+ * If the executor's task queue is not accessible – which happens when the executor
+ * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+ *
+ * @return The size of the work queue for the scheduled tasks executor,
+ * or -1, if that number is unknown.
*/
public Gauge<Integer> getTaskSchedulerQueueSize() {
return taskSchedulerQueueSize;
@@ -262,6 +281,24 @@
jmxReporter.stop();
}
+ private static Gauge<Integer> buildQueueSizeGauge(final BlockingQueue<?> queue) {
+ if (queue != null) {
+ return new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return queue.size();
+ }
+ };
+ } else {
+ return new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return -1;
+ }
+ };
+ }
+ }
+
/**
* Metrics on errors encountered.
*/
diff --git a/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java b/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java
new file mode 100644
index 0000000..82b17fd
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * A set of hooks that allow clients to customize the driver's internal executors.
+ * <p/>
+ * The methods in this class are invoked when the cluster initializes. To customize the behavior, extend the class and
+ * override the appropriate methods.
+ * <p/>
+ * This is mainly intended to allow customization and instrumentation of driver threads. Each method must return a
+ * newly-allocated executor; don't use a shared executor, as this could introduce unintended consequences like deadlocks
+ * (we're working to simplify the driver's architecture and reduce the number of executors in a future release). The
+ * default implementations use unbounded queues, which is appropriate when the driver is properly configured; the only
+ * reason you would want to use bounded queues is to limit memory consumption in case of a bug or bad configuration. In
+ * that case, make sure to use a {@link RejectedExecutionHandler} that throws, such as
+ * {@link java.util.concurrent.ThreadPoolExecutor.AbortPolicy}; a blocking handler could introduce deadlocks.
+ * <p/>
+ * Netty uses a separate pool for I/O operations, that can be configured via {@link NettyOptions}.
+ */
+public class ThreadingOptions {
+ // Kept for backward compatibility, but this should be customized via this class now
+ private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt(
+ "com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE", Runtime.getRuntime().availableProcessors());
+ private static final int DEFAULT_THREAD_KEEP_ALIVE_SECONDS = 30;
+
+ /**
+ * Builds a thread factory for the threads created by a given executor.
+ * <p/>
+ * This is used by the default implementations in this class, and also internally to create the Netty I/O pool.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @param executorName a name that identifies the executor.
+ * @return the thread factory.
+ */
+ public ThreadFactory createThreadFactory(String clusterName, String executorName) {
+ return new ThreadFactoryBuilder()
+ .setNameFormat(clusterName + "-" + executorName + "-%d")
+ // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
+ // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
+ // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
+ // compatibility).
+ .setThreadFactory(new DefaultThreadFactory("ignored name"))
+ .build();
+ }
+
+ /**
+ * Builds the main internal executor, used for tasks such as scheduling speculative executions, triggering
+ * registered {@link SchemaChangeListener}s, reacting to node state changes, and metadata updates.
+ * <p/>
+ * The default implementation sets the pool size to the number of available cores.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @return the executor.
+ */
+ public ExecutorService createExecutor(String clusterName) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ NON_BLOCKING_EXECUTOR_SIZE, NON_BLOCKING_EXECUTOR_SIZE,
+ DEFAULT_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ createThreadFactory(clusterName, "worker"));
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
+ /**
+ * Builds the executor used to block on new connections before they are added to a pool.
+ * <p/>
+ * The default implementation uses 2 threads.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @return the executor.
+ */
+ public ExecutorService createBlockingExecutor(String clusterName) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ 2, 2,
+ DEFAULT_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ createThreadFactory(clusterName, "worker"));
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
+ /**
+ * Builds the executor when reconnection attempts will be scheduled.
+ * <p/>
+ * The default implementation uses 2 threads.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @return the executor.
+ */
+ public ScheduledExecutorService createReconnectionExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(2, createThreadFactory(clusterName, "reconnection"));
+ }
+
+ /**
+ * Builds the executor to handle host state notifications from Cassandra.
+ * <p/>
+ * <b>This executor must have exactly one thread</b> so that notifications are processed in order.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @return the executor.
+ */
+ public ScheduledExecutorService createScheduledTasksExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "scheduled-task-worker"));
+ }
+
+ /**
+ * Builds the executor for an internal maintenance task used to clean up closed connections.
+ * <p/>
+ * A single scheduled task runs on this executor, so there is no reason to use more than one thread.
+ *
+ * @param clusterName the name of the cluster, as specified by
+ * {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+ * @return the executor.
+ */
+ public ScheduledExecutorService createReaperExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "connection-reaper"));
+ }
+}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java b/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java
new file mode 100644
index 0000000..73e84fd
--- /dev/null
+++ b/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.assertj.core.api.iterable.Extractor;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.scassandra.http.client.ClosedConnectionReport.CloseType.CLOSE;
+
+public class ThreadingOptionsTest extends ScassandraTestBase {
+
+ private String customPrefix = "custom";
+
+ private ThreadingOptions threadingOptions = new ThreadingOptions() {
+
+ @Override
+ public ThreadFactory createThreadFactory(String clusterName, String executorName) {
+ return new ThreadFactoryBuilder()
+ .setNameFormat(clusterName + "-" + customPrefix + "-" + executorName + "-%d")
+ // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
+ // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
+ // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
+ // compatibility).
+ .setThreadFactory(new DefaultThreadFactory("ignored name"))
+ .setDaemon(true)
+ .build();
+ }
+
+ @Override
+ public ExecutorService createExecutor(String clusterName) {
+ return new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ createThreadFactory(clusterName, "myExecutor")
+ );
+ }
+
+ @Override
+ public ExecutorService createBlockingExecutor(String clusterName) {
+ return new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ createThreadFactory(clusterName, "myBlockingExecutor")
+ );
+ }
+
+ @Override
+ public ScheduledExecutorService createReconnectionExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myReconnection"));
+ }
+
+ @Override
+ public ScheduledExecutorService createScheduledTasksExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myScheduled-task-worker"));
+ }
+
+ @Override
+ public ScheduledExecutorService createReaperExecutor(String clusterName) {
+ return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myConnection-reaper"));
+ }
+ };
+
+ /**
+ * Validates that when using a provided {@link ThreadingOptions} that its methods are used for creating
+ * executors and that its {@link ThreadingOptions#createThreadFactory(String, String)} is used for initializing
+ * netty resources.
+ *
+ * @test_category configuration
+ */
+ @Test(groups = "short")
+ public void should_use_provided_threading_options() {
+ ThreadingOptions spy = Mockito.spy(threadingOptions);
+ Cluster cluster = createClusterBuilder().withPoolingOptions(new PoolingOptions()
+ .setConnectionsPerHost(HostDistance.LOCAL, 1, 1))
+ .withReconnectionPolicy(new ConstantReconnectionPolicy(100))
+ .withThreadingOptions(spy).build();
+ try {
+ String clusterName = cluster.getClusterName();
+ cluster.init();
+
+ // Ensure each method was invoked appropriately:
+ // 1) 1 time for each create*Executor.
+ // 2) createThreadFactory for netty executor group and timeouter.
+ verify(spy).createExecutor(clusterName);
+ verify(spy).createBlockingExecutor(clusterName);
+ verify(spy).createReconnectionExecutor(clusterName);
+ verify(spy).createScheduledTasksExecutor(clusterName);
+ verify(spy).createReaperExecutor(clusterName);
+ verify(spy).createThreadFactory(clusterName, "nio-worker");
+ verify(spy).createThreadFactory(clusterName, "timeouter");
+
+ cluster.connect();
+
+ // Close all connections bringing the host down, this should cause some activity on
+ // executor and reconnection executor.
+ currentClient.disableListener();
+ currentClient.closeConnections(CLOSE);
+ TestUtils.waitForDown(TestUtils.IP_PREFIX + "1", cluster);
+ currentClient.enableListener();
+ TestUtils.waitForUp(TestUtils.IP_PREFIX + "1", cluster);
+
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for(Thread thread : threads) {
+ // all threads should use the custom factory and thus be marked daemon
+ if(thread.getName().startsWith(clusterName + "-" + customPrefix)) {
+ // all created threads should be daemon this should indicate that our custom thread factory was
+ // used.
+ assertThat(thread.isDaemon()).isTrue();
+ }
+ }
+
+ final Pattern threadNamePattern = Pattern.compile(clusterName + "-" + customPrefix + "-(.*)-0");
+
+ // Custom executor threads should be present.
+ // NOTE: we don't validate blocking executor since it is hard to deterministically cause it to be used.
+ assertThat(threads).extracting(new Extractor<Thread, String>() {
+ @Override
+ public String extract(Thread thread) {
+ Matcher matcher = threadNamePattern.matcher(thread.getName());
+ if(matcher.matches()) {
+ return matcher.group(1);
+ } else {
+ return thread.getName();
+ }
+ }
+ }).contains(
+ "nio-worker",
+ "timeouter",
+ "myExecutor",
+ "myReconnection",
+ "myScheduled-task-worker",
+ "myConnection-reaper"
+ );
+ } finally {
+ cluster.close();
+ }
+ }
+}