JAVA-262: Make internal executors customizable (#760)

diff --git a/changelog/README.md b/changelog/README.md
index ac06947..0133281 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -4,6 +4,7 @@
 
 - [bug] JAVA-1312: QueryBuilder modifies selected columns when manually selected.
 - [improvement] JAVA-1303: Add missing BoundStatement.setRoutingKey(ByteBuffer...)
+- [improvement] JAVA-262: Make internal executors customizable
 
 
 ### 3.0.4
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
index 902a022..33e553f 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
@@ -25,7 +25,6 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,8 +70,6 @@
 
     @VisibleForTesting
     static final int NEW_NODE_DELAY_SECONDS = SystemProperties.getInt("com.datastax.driver.NEW_NODE_DELAY_SECONDS", 1);
-    private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt("com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE",
-            Runtime.getRuntime().availableProcessors());
 
     private static final ResourceBundle driverProperties = ResourceBundle.getBundle("com.datastax.driver.core.Driver");
 
@@ -80,8 +77,6 @@
     // multiple Cluster instance are created in the same JVM.
     private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0);
 
-    private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
-
     private static final int NOTIF_LOCK_TIMEOUT_SECONDS = SystemProperties.getInt("com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS", 60);
 
     final Manager manager;
@@ -1222,6 +1217,19 @@
         }
 
         /**
+         * Sets the threading options to use for the newly created Cluster.
+         * <p/>
+         * If no options are set through this method, a new instance of {@link ThreadingOptions} will be used.
+         *
+         * @param options the options.
+         * @return this builder.
+         */
+        public Builder withThreadingOptions(ThreadingOptions options) {
+            configurationBuilder.withThreadingOptions(options);
+            return this;
+        }
+
+        /**
          * Set the {@link NettyOptions} to use for the newly created Cluster.
          * <p/>
          * If no Netty options are set through this method, {@link NettyOptions#DEFAULT_INSTANCE}
@@ -1310,20 +1318,15 @@
 
         final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.DefaultConvictionPolicy.Factory();
 
-        ScheduledThreadPoolExecutor reconnectionExecutor;
-        ScheduledThreadPoolExecutor scheduledTasksExecutor;
-
-        // Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
         ListeningExecutorService executor;
-
-        // Work Queue used by executor.
-        LinkedBlockingQueue<Runnable> executorQueue;
-
-        // An executor for tasks that might block some time, like creating new connection, but are generally not too critical.
         ListeningExecutorService blockingExecutor;
+        ScheduledExecutorService reconnectionExecutor;
+        ScheduledExecutorService scheduledTasksExecutor;
 
-        // Work Queue used by blockingExecutor.
-        LinkedBlockingQueue<Runnable> blockingExecutorQueue;
+        BlockingQueue<Runnable> executorQueue;
+        BlockingQueue<Runnable> blockingExecutorQueue;
+        BlockingQueue<Runnable> reconnectionExecutorQueue;
+        BlockingQueue<Runnable> scheduledTasksExecutorQueue;
 
         ConnectionReaper reaper;
 
@@ -1362,16 +1365,31 @@
 
             this.configuration.register(this);
 
-            this.executorQueue = new LinkedBlockingQueue<Runnable>();
-            this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "worker", executorQueue);
-            this.blockingExecutorQueue = new LinkedBlockingQueue<Runnable>();
-            this.blockingExecutor = makeExecutor(2, "blocking-task-worker", blockingExecutorQueue);
-            this.reconnectionExecutor = new ScheduledThreadPoolExecutor(2, threadFactory("reconnection"));
-            // scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
-            // applied in the order received.
-            this.scheduledTasksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory("scheduled-task-worker"));
+            ThreadingOptions threadingOptions = this.configuration.getThreadingOptions();
 
-            this.reaper = new ConnectionReaper(this);
+            // executor
+            ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName);
+            this.executorQueue = (tmpExecutor instanceof ThreadPoolExecutor)
+                    ? ((ThreadPoolExecutor) tmpExecutor).getQueue() : null;
+            this.executor = MoreExecutors.listeningDecorator(tmpExecutor);
+
+            // blocking executor
+            ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName);
+            this.blockingExecutorQueue = (tmpBlockingExecutor instanceof ThreadPoolExecutor)
+                    ? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue() : null;
+            this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor);
+
+            // reconnection executor
+            this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName);
+            this.reconnectionExecutorQueue = (reconnectionExecutor instanceof ThreadPoolExecutor)
+                    ? ((ThreadPoolExecutor) reconnectionExecutor).getQueue() : null;
+
+            // scheduled tasks executor
+            this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName);
+            this.scheduledTasksExecutorQueue = (scheduledTasksExecutor instanceof ThreadPoolExecutor)
+                    ? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue() : null;
+
+            this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName));
             this.metadata = new Metadata(this);
             this.connectionFactory = new Connection.Factory(this, configuration);
             this.controlConnection = new ControlConnection(this);
@@ -1515,29 +1533,6 @@
             return connectionFactory.protocolVersion;
         }
 
-        ThreadFactory threadFactory(String name) {
-            return new ThreadFactoryBuilder()
-                    .setNameFormat(clusterName + "-" + name + "-%d")
-                    // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
-                    // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
-                    // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
-                    // compatibility).
-                    .setThreadFactory(new DefaultThreadFactory("ignored name"))
-                    .build();
-        }
-
-        private ListeningExecutorService makeExecutor(int threads, String name, LinkedBlockingQueue<Runnable> workQueue) {
-            ThreadPoolExecutor executor = new ThreadPoolExecutor(threads,
-                    threads,
-                    DEFAULT_THREAD_KEEP_ALIVE,
-                    TimeUnit.SECONDS,
-                    workQueue,
-                    threadFactory(name));
-
-            executor.allowCoreThreadTimeOut(true);
-            return MoreExecutors.listeningDecorator(executor);
-        }
-
         Cluster getCluster() {
             return Cluster.this;
         }
@@ -2835,9 +2830,9 @@
             }
         };
 
-        ConnectionReaper(Cluster.Manager manager) {
-            executor = Executors.newScheduledThreadPool(1, manager.threadFactory("connection-reaper"));
-            executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
+        ConnectionReaper(ScheduledExecutorService executor) {
+            this.executor = executor;
+            this.executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
         }
 
         void register(Connection connection, long terminateTime) {
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Configuration.java b/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
index 7815b7e..25e88d7 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Configuration.java
@@ -51,6 +51,7 @@
     private final SocketOptions socketOptions;
     private final MetricsOptions metricsOptions;
     private final QueryOptions queryOptions;
+    private final ThreadingOptions threadingOptions;
     private final NettyOptions nettyOptions;
     private final CodecRegistry codecRegistry;
 
@@ -60,6 +61,7 @@
                           SocketOptions socketOptions,
                           MetricsOptions metricsOptions,
                           QueryOptions queryOptions,
+                          ThreadingOptions threadingOptions,
                           NettyOptions nettyOptions,
                           CodecRegistry codecRegistry) {
         this.policies = policies;
@@ -68,6 +70,7 @@
         this.socketOptions = socketOptions;
         this.metricsOptions = metricsOptions;
         this.queryOptions = queryOptions;
+        this.threadingOptions = threadingOptions;
         this.nettyOptions = nettyOptions;
         this.codecRegistry = codecRegistry;
     }
@@ -85,6 +88,7 @@
                 toCopy.getSocketOptions(),
                 toCopy.getMetricsOptions(),
                 toCopy.getQueryOptions(),
+                toCopy.getThreadingOptions(),
                 toCopy.getNettyOptions(),
                 toCopy.getCodecRegistry()
         );
@@ -154,6 +158,13 @@
     }
 
     /**
+     * @return the threading options for this configuration.
+     */
+    public ThreadingOptions getThreadingOptions() {
+        return threadingOptions;
+    }
+
+    /**
      * Returns the {@link NettyOptions} instance for this configuration.
      *
      * @return the {@link NettyOptions} instance for this configuration.
@@ -186,6 +197,7 @@
         private SocketOptions socketOptions;
         private MetricsOptions metricsOptions;
         private QueryOptions queryOptions;
+        private ThreadingOptions threadingOptions;
         private NettyOptions nettyOptions;
         private CodecRegistry codecRegistry;
 
@@ -261,6 +273,17 @@
         }
 
         /**
+         * Sets the threading options for this cluster.
+         *
+         * @param threadingOptions the threading options to set.
+         * @return this builder.
+         */
+        public Builder withThreadingOptions(ThreadingOptions threadingOptions) {
+            this.threadingOptions = threadingOptions;
+            return this;
+        }
+
+        /**
          * Sets the Netty options for this cluster.
          *
          * @param nettyOptions the Netty options.
@@ -297,6 +320,7 @@
                     socketOptions != null ? socketOptions : new SocketOptions(),
                     metricsOptions != null ? metricsOptions : new MetricsOptions(),
                     queryOptions != null ? queryOptions : new QueryOptions(),
+                    threadingOptions != null ? threadingOptions : new ThreadingOptions(),
                     nettyOptions != null ? nettyOptions : NettyOptions.DEFAULT_INSTANCE,
                     codecRegistry != null ? codecRegistry : CodecRegistry.DEFAULT_INSTANCE);
         }
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
index b86fab3..4abba7d 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
@@ -766,9 +766,11 @@
             this.authProvider = configuration.getProtocolOptions().getAuthProvider();
             this.protocolVersion = configuration.getProtocolOptions().initialProtocolVersion;
             this.nettyOptions = configuration.getNettyOptions();
-            this.eventLoopGroup = nettyOptions.eventLoopGroup(manager.threadFactory("nio-worker"));
+            this.eventLoopGroup = nettyOptions.eventLoopGroup(
+                    manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "nio-worker"));
             this.channelClass = nettyOptions.channelClass();
-            this.timer = nettyOptions.timer(manager.threadFactory("timeouter"));
+            this.timer = nettyOptions.timer(
+                    manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "timeouter"));
         }
 
         int getPort() {
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metrics.java b/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
index 33eadc2..94f5a80 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Metrics.java
@@ -20,6 +20,8 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * Metrics exposed by the driver.
@@ -78,36 +80,25 @@
         }
     });
 
-    private final Gauge<Integer> executorQueueDepth = registry.register("executor-queue-depth", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-            return manager.executorQueue.size();
-        }
-    });
-
-    private final Gauge<Integer> blockingExecutorQueueDepth = registry.register("blocking-executor-queue-depth", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-            return manager.blockingExecutorQueue.size();
-        }
-    });
-
-    private final Gauge<Integer> reconnectionSchedulerQueueSize = registry.register("reconnection-scheduler-task-count", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-            return manager.reconnectionExecutor.getQueue().size();
-        }
-    });
-
-    private final Gauge<Integer> taskSchedulerQueueSize = registry.register("task-scheduler-task-count", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-            return manager.scheduledTasksExecutor.getQueue().size();
-        }
-    });
+    private final Gauge<Integer> executorQueueDepth;
+    private final Gauge<Integer> blockingExecutorQueueDepth;
+    private final Gauge<Integer> reconnectionSchedulerQueueSize;
+    private final Gauge<Integer> taskSchedulerQueueSize;
 
     Metrics(Cluster.Manager manager) {
         this.manager = manager;
+        this.executorQueueDepth = registry.register(
+                "executor-queue-depth",
+                buildQueueSizeGauge(manager.executorQueue));
+        this.blockingExecutorQueueDepth = registry.register(
+                "blocking-executor-queue-depth",
+                buildQueueSizeGauge(manager.blockingExecutorQueue));
+        this.reconnectionSchedulerQueueSize = registry.register(
+                "reconnection-scheduler-task-count",
+                buildQueueSizeGauge(manager.reconnectionExecutorQueue));
+        this.taskSchedulerQueueSize = registry.register(
+                "task-scheduler-task-count",
+                buildQueueSizeGauge(manager.scheduledTasksExecutorQueue));
         if (manager.configuration.getMetricsOptions().isJMXReportingEnabled()) {
             this.jmxReporter = JmxReporter.forRegistry(registry).inDomain(manager.clusterName + "-metrics").build();
             this.jmxReporter.start();
@@ -228,30 +219,58 @@
     }
 
     /**
-     * @return The number of queued up tasks in the non-blocking executor (Cassandra Java Driver workers).
+     * Returns the number of queued up tasks in the {@link ThreadingOptions#createExecutor(String) main internal executor}.
+     * <p/>
+     * If the executor's task queue is not accessible – which happens when the executor
+     * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+     *
+     * @return The number of queued up tasks in the main internal executor,
+     * or -1, if that number is unknown.
      */
     public Gauge<Integer> getExecutorQueueDepth() {
         return executorQueueDepth;
     }
 
     /**
-     * @return The number of queued up tasks in the blocking executor (Cassandra Java Driver blocking tasks worker).
+     * Returns the number of queued up tasks in the {@link ThreadingOptions#createBlockingExecutor(String) blocking executor}.
+     * <p/>
+     * If the executor's task queue is not accessible – which happens when the executor
+     * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+     *
+     * @return The number of queued up tasks in the blocking executor,
+     * or -1, if that number is unknown.
      */
     public Gauge<Integer> getBlockingExecutorQueueDepth() {
         return blockingExecutorQueueDepth;
     }
 
     /**
-     * @return The size of the work queue for the reconnection scheduler (Reconnection).  A queue size > 0 does not
+     * Returns the number of queued up tasks in the {@link ThreadingOptions#createReconnectionExecutor(String) reconnection executor}.
+     * <p/>
+     * A queue size > 0 does not
      * necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
+     * <p/>
+     * If the executor's task queue is not accessible – which happens when the executor
+     * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+     *
+     * @return The size of the work queue for the reconnection executor,
+     * or -1, if that number is unknown.
      */
     public Gauge<Integer> getReconnectionSchedulerQueueSize() {
         return reconnectionSchedulerQueueSize;
     }
 
     /**
-     * @return The size of the work queue for the task scheduler (Scheduled Tasks).  A queue size > 0 does not
+     * Returns the number of queued up tasks in the {@link ThreadingOptions#createScheduledTasksExecutor(String) scheduled tasks executor}.
+     * <p/>
+     * A queue size > 0 does not
      * necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
+     * <p/>
+     * If the executor's task queue is not accessible – which happens when the executor
+     * is not an instance of {@link ThreadPoolExecutor} – then this gauge returns -1.
+     *
+     * @return The size of the work queue for the scheduled tasks executor,
+     * or -1, if that number is unknown.
      */
     public Gauge<Integer> getTaskSchedulerQueueSize() {
         return taskSchedulerQueueSize;
@@ -262,6 +281,24 @@
             jmxReporter.stop();
     }
 
+    private static Gauge<Integer> buildQueueSizeGauge(final BlockingQueue<?> queue) {
+        if (queue != null) {
+            return new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                    return queue.size();
+                }
+            };
+        } else {
+            return new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                    return -1;
+                }
+            };
+        }
+    }
+
     /**
      * Metrics on errors encountered.
      */
diff --git a/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java b/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java
new file mode 100644
index 0000000..82b17fd
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/ThreadingOptions.java
@@ -0,0 +1,143 @@
+/*
+ *      Copyright (C) 2012-2015 DataStax Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * A set of hooks that allow clients to customize the driver's internal executors.
+ * <p/>
+ * The methods in this class are invoked when the cluster initializes. To customize the behavior, extend the class and
+ * override the appropriate methods.
+ * <p/>
+ * This is mainly intended to allow customization and instrumentation of driver threads. Each method must return a
+ * newly-allocated executor; don't use a shared executor, as this could introduce unintended consequences like deadlocks
+ * (we're working to simplify the driver's architecture and reduce the number of executors in a future release). The
+ * default implementations use unbounded queues, which is appropriate when the driver is properly configured; the only
+ * reason you would want to use bounded queues is to limit memory consumption in case of a bug or bad configuration. In
+ * that case, make sure to use a {@link RejectedExecutionHandler} that throws, such as
+ * {@link java.util.concurrent.ThreadPoolExecutor.AbortPolicy}; a blocking handler could introduce deadlocks.
+ * <p/>
+ * Netty uses a separate pool for I/O operations, that can be configured via {@link NettyOptions}.
+ */
+public class ThreadingOptions {
+    // Kept for backward compatibility, but this should be customized via this class now
+    private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt(
+            "com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE", Runtime.getRuntime().availableProcessors());
+    private static final int DEFAULT_THREAD_KEEP_ALIVE_SECONDS = 30;
+
+    /**
+     * Builds a thread factory for the threads created by a given executor.
+     * <p/>
+     * This is used by the default implementations in this class, and also internally to create the Netty I/O pool.
+     *
+     * @param clusterName  the name of the cluster, as specified by
+     *                     {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @param executorName a name that identifies the executor.
+     * @return the thread factory.
+     */
+    public ThreadFactory createThreadFactory(String clusterName, String executorName) {
+        return new ThreadFactoryBuilder()
+                .setNameFormat(clusterName + "-" + executorName + "-%d")
+                // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
+                // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
+                // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
+                // compatibility).
+                .setThreadFactory(new DefaultThreadFactory("ignored name"))
+                .build();
+    }
+
+    /**
+     * Builds the main internal executor, used for tasks such as scheduling speculative executions, triggering
+     * registered {@link SchemaChangeListener}s, reacting to node state changes, and metadata updates.
+     * <p/>
+     * The default implementation sets the pool size to the number of available cores.
+     *
+     * @param clusterName the name of the cluster, as specified by
+     *                    {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @return the executor.
+     */
+    public ExecutorService createExecutor(String clusterName) {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                NON_BLOCKING_EXECUTOR_SIZE, NON_BLOCKING_EXECUTOR_SIZE,
+                DEFAULT_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                createThreadFactory(clusterName, "worker"));
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
+    }
+
+    /**
+     * Builds the executor used to block on new connections before they are added to a pool.
+     * <p/>
+     * The default implementation uses 2 threads.
+     *
+     * @param clusterName the name of the cluster, as specified by
+     *                    {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @return the executor.
+     */
+    public ExecutorService createBlockingExecutor(String clusterName) {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                2, 2,
+                DEFAULT_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                createThreadFactory(clusterName, "worker"));
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
+    }
+
+    /**
+     * Builds the executor when reconnection attempts will be scheduled.
+     * <p/>
+     * The default implementation uses 2 threads.
+     *
+     * @param clusterName the name of the cluster, as specified by
+     *                    {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @return the executor.
+     */
+    public ScheduledExecutorService createReconnectionExecutor(String clusterName) {
+        return new ScheduledThreadPoolExecutor(2, createThreadFactory(clusterName, "reconnection"));
+    }
+
+    /**
+     * Builds the executor to handle host state notifications from Cassandra.
+     * <p/>
+     * <b>This executor must have exactly one thread</b> so that notifications are processed in order.
+     *
+     * @param clusterName the name of the cluster, as specified by
+     *                    {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @return the executor.
+     */
+    public ScheduledExecutorService createScheduledTasksExecutor(String clusterName) {
+        return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "scheduled-task-worker"));
+    }
+
+    /**
+     * Builds the executor for an internal maintenance task used to clean up closed connections.
+     * <p/>
+     * A single scheduled task runs on this executor, so there is no reason to use more than one thread.
+     *
+     * @param clusterName the name of the cluster, as specified by
+     *                    {@link com.datastax.driver.core.Cluster.Builder#withClusterName(String)}.
+     * @return the executor.
+     */
+    public ScheduledExecutorService createReaperExecutor(String clusterName) {
+        return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "connection-reaper"));
+    }
+}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java b/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java
new file mode 100644
index 0000000..73e84fd
--- /dev/null
+++ b/driver-core/src/test/java/com/datastax/driver/core/ThreadingOptionsTest.java
@@ -0,0 +1,162 @@
+/*
+ *      Copyright (C) 2012-2015 DataStax Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.assertj.core.api.iterable.Extractor;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.scassandra.http.client.ClosedConnectionReport.CloseType.CLOSE;
+
+public class ThreadingOptionsTest extends ScassandraTestBase {
+
+    private String customPrefix = "custom";
+
+    private ThreadingOptions threadingOptions = new ThreadingOptions() {
+
+        @Override
+        public ThreadFactory createThreadFactory(String clusterName, String executorName) {
+            return new ThreadFactoryBuilder()
+                    .setNameFormat(clusterName + "-" + customPrefix + "-" + executorName + "-%d")
+                    // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
+                    // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
+                    // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
+                    // compatibility).
+                    .setThreadFactory(new DefaultThreadFactory("ignored name"))
+                    .setDaemon(true)
+                    .build();
+        }
+
+        @Override
+        public ExecutorService createExecutor(String clusterName) {
+            return new ThreadPoolExecutor(1, 1,
+                    0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<Runnable>(),
+                    createThreadFactory(clusterName, "myExecutor")
+            );
+        }
+
+        @Override
+        public ExecutorService createBlockingExecutor(String clusterName) {
+            return new ThreadPoolExecutor(1, 1,
+                    0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<Runnable>(),
+                    createThreadFactory(clusterName, "myBlockingExecutor")
+            );
+        }
+
+        @Override
+        public ScheduledExecutorService createReconnectionExecutor(String clusterName) {
+            return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myReconnection"));
+        }
+
+        @Override
+        public ScheduledExecutorService createScheduledTasksExecutor(String clusterName) {
+            return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myScheduled-task-worker"));
+        }
+
+        @Override
+        public ScheduledExecutorService createReaperExecutor(String clusterName) {
+            return new ScheduledThreadPoolExecutor(1, createThreadFactory(clusterName, "myConnection-reaper"));
+        }
+    };
+
+    /**
+     * Validates that when using a provided {@link ThreadingOptions} that its methods are used for creating
+     * executors and that its {@link ThreadingOptions#createThreadFactory(String, String)} is used for initializing
+     * netty resources.
+     *
+     * @test_category configuration
+     */
+    @Test(groups = "short")
+    public void should_use_provided_threading_options() {
+        ThreadingOptions spy = Mockito.spy(threadingOptions);
+        Cluster cluster = createClusterBuilder().withPoolingOptions(new PoolingOptions()
+                .setConnectionsPerHost(HostDistance.LOCAL, 1, 1))
+                .withReconnectionPolicy(new ConstantReconnectionPolicy(100))
+                .withThreadingOptions(spy).build();
+        try {
+            String clusterName = cluster.getClusterName();
+            cluster.init();
+
+            // Ensure each method was invoked appropriately:
+            // 1) 1 time for each create*Executor.
+            // 2) createThreadFactory for netty executor group and timeouter.
+            verify(spy).createExecutor(clusterName);
+            verify(spy).createBlockingExecutor(clusterName);
+            verify(spy).createReconnectionExecutor(clusterName);
+            verify(spy).createScheduledTasksExecutor(clusterName);
+            verify(spy).createReaperExecutor(clusterName);
+            verify(spy).createThreadFactory(clusterName, "nio-worker");
+            verify(spy).createThreadFactory(clusterName, "timeouter");
+
+            cluster.connect();
+
+            // Close all connections bringing the host down, this should cause some activity on
+            // executor and reconnection executor.
+            currentClient.disableListener();
+            currentClient.closeConnections(CLOSE);
+            TestUtils.waitForDown(TestUtils.IP_PREFIX + "1", cluster);
+            currentClient.enableListener();
+            TestUtils.waitForUp(TestUtils.IP_PREFIX + "1", cluster);
+
+            Set<Thread> threads = Thread.getAllStackTraces().keySet();
+            for(Thread thread : threads) {
+                // all threads should use the custom factory and thus be marked daemon
+                if(thread.getName().startsWith(clusterName + "-" + customPrefix)) {
+                    // all created threads should be daemon this should indicate that our custom thread factory was
+                    // used.
+                    assertThat(thread.isDaemon()).isTrue();
+                }
+            }
+
+            final Pattern threadNamePattern = Pattern.compile(clusterName + "-" + customPrefix + "-(.*)-0");
+
+            // Custom executor threads should be present.
+            // NOTE: we don't validate blocking executor since it is hard to deterministically cause it to be used.
+            assertThat(threads).extracting(new Extractor<Thread, String>() {
+                @Override
+                public String extract(Thread thread) {
+                    Matcher matcher = threadNamePattern.matcher(thread.getName());
+                    if(matcher.matches()) {
+                        return matcher.group(1);
+                    } else {
+                        return thread.getName();
+                    }
+                }
+            }).contains(
+                    "nio-worker",
+                    "timeouter",
+                    "myExecutor",
+                    "myReconnection",
+                    "myScheduled-task-worker",
+                    "myConnection-reaper"
+            );
+        } finally {
+            cluster.close();
+        }
+    }
+}