JAVA-1055: Add ErrorAware load balancing policy
diff --git a/changelog/README.md b/changelog/README.md
index 04c7003..085aff5 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -13,6 +13,7 @@
 - [new feature] JAVA-541: Add polymorphism support to object mapper.
 - [new feature] JAVA-636: Allow @Column annotations on getters/setters as well as fields.
 - [new feature] JAVA-984: Allow non-void setters in object mapping.
+- [new feature] JAVA-1055: Add ErrorAware load balancing policy.
 
 Merged from 3.0.x branch:
 
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
index c54e56b..7b7d6a6 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
@@ -471,7 +471,7 @@
      * Registers the provided tracker to be updated with hosts read
      * latencies.
      * <p/>
-     * Registering the same listener multiple times is a no-op.
+     * Registering the same tracker multiple times is a no-op.
      * <p/>
      * Beware that the registered tracker's
      * {@link LatencyTracker#update(Host, Statement, Exception, long) update}
@@ -490,7 +490,7 @@
      */
     public Cluster register(LatencyTracker tracker) {
         checkNotClosed(manager);
-        boolean added = manager.trackers.add(tracker);
+        boolean added = manager.latencyTrackers.add(tracker);
         if (added)
             tracker.onRegister(this);
         return this;
@@ -508,7 +508,7 @@
      */
     public Cluster unregister(LatencyTracker tracker) {
         checkNotClosed(manager);
-        boolean removed = manager.trackers.remove(tracker);
+        boolean removed = manager.latencyTrackers.remove(tracker);
         if (removed)
             tracker.onUnregister(this);
         return this;
@@ -1341,7 +1341,7 @@
         ConcurrentMap<MD5Digest, PreparedStatement> preparedQueries;
 
         final Set<Host.StateListener> listeners;
-        final Set<LatencyTracker> trackers = new CopyOnWriteArraySet<LatencyTracker>();
+        final Set<LatencyTracker> latencyTrackers = new CopyOnWriteArraySet<LatencyTracker>();
         final Set<SchemaChangeListener> schemaChangeListeners = new CopyOnWriteArraySet<SchemaChangeListener>();
 
         EventDebouncer<NodeListRefreshRequest> nodeListRefreshRequestDebouncer;
@@ -1484,11 +1484,10 @@
                 configuration.getPolicies().getRetryPolicy().init(Cluster.this);
                 reconnectionPolicy().init(Cluster.this);
                 configuration.getPolicies().getAddressTranslator().init(Cluster.this);
-                for (LatencyTracker tracker : trackers)
+                for (LatencyTracker tracker : latencyTrackers)
                     tracker.onRegister(Cluster.this);
                 for (Host.StateListener listener : listeners)
                     listener.onRegister(Cluster.this);
-
                 for (Host host : removedContactPointHosts) {
                     loadBalancingPolicy().onRemove(host);
                     for (Host.StateListener listener : listeners)
@@ -1592,8 +1591,8 @@
             return sessions.remove(session);
         }
 
-        void reportLatency(Host host, Statement statement, Exception exception, long latencyNanos) {
-            for (LatencyTracker tracker : trackers) {
+        void reportQuery(Host host, Statement statement, Exception exception, long latencyNanos) {
+            for (LatencyTracker tracker : latencyTrackers) {
                 tracker.update(host, statement, exception, latencyNanos);
             }
         }
@@ -1634,7 +1633,7 @@
                 configuration.getPolicies().getRetryPolicy().close();
                 reconnectionPolicy().close();
                 configuration.getPolicies().getAddressTranslator().close();
-                for (LatencyTracker tracker : trackers)
+                for (LatencyTracker tracker : latencyTrackers)
                     tracker.onUnregister(Cluster.this);
                 for (Host.StateListener listener : listeners)
                     listener.onUnregister(Cluster.this);
diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
index 23f475e..65d269e 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
@@ -638,8 +638,9 @@
                 exceptionToReport = e;
                 setFinalException(connection, e);
             } finally {
-                if (queriedHost != null && statement != Statement.DEFAULT)
-                    manager.cluster.manager.reportLatency(queriedHost, statement, exceptionToReport, latency);
+                if (queriedHost != null && statement != Statement.DEFAULT) {
+                    manager.cluster.manager.reportQuery(queriedHost, statement, exceptionToReport, latency);
+                }
             }
         }
 
@@ -746,7 +747,7 @@
                 setFinalException(null, new DriverInternalError("An unexpected error happened while handling exception " + exception, e));
             } finally {
                 if (queriedHost != null && statement != Statement.DEFAULT)
-                    manager.cluster.manager.reportLatency(queriedHost, statement, exception, latency);
+                    manager.cluster.manager.reportQuery(queriedHost, statement, exception, latency);
             }
         }
 
@@ -774,7 +775,7 @@
                 setFinalException(null, new DriverInternalError("An unexpected error happened while handling timeout", e));
             } finally {
                 if (queriedHost != null && statement != Statement.DEFAULT)
-                    manager.cluster.manager.reportLatency(queriedHost, statement, timeoutException, latency);
+                    manager.cluster.manager.reportQuery(queriedHost, statement, timeoutException, latency);
             }
             return true;
         }
diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java
new file mode 100644
index 0000000..e718e49
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java
@@ -0,0 +1,364 @@
+/*
+ *      Copyright (C) 2012-2015 DataStax Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package com.datastax.driver.core.policies;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Chainable load balancing policy that filters out hosts based on their error rates.
+ * <p/>
+ * When creating a query plan, this policy gathers a list of candidate hosts from its child policy; for each candidate
+ * host, it then determines whether it should be included into or excluded from the final query plan, based on its
+ * current error rate (measured over the last minute, with a 5-second granularity).
+ * <p/>
+ * Note that the policy should not blindly count all errors in its measurements: some type of errors (e.g. CQL syntax
+ * errors) can originate from the client and occur on all hosts, therefore they should not count towards the exclusion
+ * threshold or all hosts could become excluded. You can provide your own {@link ErrorFilter} to customize that logic.
+ * <p/>
+ * The policy follows the builder pattern to be created, the {@link Builder} class can be created with
+ * {@link #builder} method.
+ * <p/>
+ * This policy is currently in BETA mode and its behavior might be changing throughout different driver versions.
+ */
+@Beta
+public class ErrorAwarePolicy implements ChainableLoadBalancingPolicy {
+
+    private static final Logger logger = LoggerFactory.getLogger(ErrorAwarePolicy.class);
+
+    private final LoadBalancingPolicy childPolicy;
+
+    private final long retryPeriodNanos;
+
+    PerHostErrorTracker errorTracker;
+
+    private ErrorAwarePolicy(Builder builder) {
+        this.childPolicy = builder.childPolicy;
+        this.retryPeriodNanos = builder.retryPeriodNanos;
+        this.errorTracker = new PerHostErrorTracker(builder.maxErrorsPerMinute, builder.errorFilter, builder.clock);
+    }
+
+    @Override
+    public LoadBalancingPolicy getChildPolicy() {
+        return childPolicy;
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts) {
+        childPolicy.init(cluster, hosts);
+        cluster.register(this.errorTracker);
+    }
+
+    @Override
+    public HostDistance distance(Host host) {
+        return childPolicy.distance(host);
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+        final Iterator<Host> childQueryPlan = childPolicy.newQueryPlan(loggedKeyspace, statement);
+
+        return new AbstractIterator<Host>() {
+
+            @Override
+            protected Host computeNext() {
+                while (childQueryPlan.hasNext()) {
+                    Host host = childQueryPlan.next();
+                    if (!errorTracker.isExcluded(host)) {
+                        return host;
+                    }
+                }
+                return endOfData();
+            }
+        };
+    }
+
+    @Override
+    public void onAdd(Host host) {
+        childPolicy.onAdd(host);
+    }
+
+    @Override
+    public void onUp(Host host) {
+        childPolicy.onUp(host);
+    }
+
+    @Override
+    public void onDown(Host host) {
+        childPolicy.onDown(host);
+    }
+
+    @Override
+    public void onRemove(Host host) {
+        childPolicy.onRemove(host);
+    }
+
+    /**
+     * Creates a new error aware policy builder given the child policy
+     * that the resulting policy should wrap.
+     *
+     * @param childPolicy the load balancing policy to wrap with error
+     *                    awareness.
+     * @return the created builder.
+     */
+    public static Builder builder(LoadBalancingPolicy childPolicy) {
+        return new Builder(childPolicy);
+    }
+
+    @Override
+    public void close() {
+        childPolicy.close();
+    }
+
+    /**
+     * Utility class to create a {@link ErrorAwarePolicy}.
+     */
+    public static class Builder {
+        final LoadBalancingPolicy childPolicy;
+
+        private int maxErrorsPerMinute = 1;
+        private long retryPeriodNanos = NANOSECONDS.convert(2, MINUTES);
+        private Clock clock = Clock.DEFAULT;
+
+        private ErrorFilter errorFilter = new DefaultErrorFilter();
+
+        /**
+         * Creates a {@link Builder} instance.
+         *
+         * @param childPolicy the load balancing policy to wrap with error
+         *                    awareness.
+         */
+        public Builder(LoadBalancingPolicy childPolicy) {
+            this.childPolicy = childPolicy;
+        }
+
+        /**
+         * Defines the maximum number of errors allowed per minute for each host.
+         * <p/>
+         * The policy keeps track of the number of errors on each host (filtered by
+         * {@link #withErrorsFilter(ErrorFilter)}) over a sliding 1-minute window. If a host had more than this number
+         * of errors, it will be excluded from the query plan for the duration defined by
+         * {@link #withRetryPeriod(long, TimeUnit)}.
+         * <p/>
+         * Default value for the threshold is 1.
+         *
+         * @param maxErrorsPerMinute the number.
+         * @return this {@link Builder} instance, for method chaining.
+         */
+        public Builder withMaxErrorsPerMinute(int maxErrorsPerMinute) {
+            this.maxErrorsPerMinute = maxErrorsPerMinute;
+            return this;
+        }
+
+        /**
+         * Defines the time during which a host is excluded by the policy once it has exceeded
+         * {@link #withMaxErrorsPerMinute(int)}.
+         * <p/>
+         * Default value for the retry period is 2 minutes.
+         *
+         * @param retryPeriod         the period of exclusion for a host.
+         * @param retryPeriodTimeUnit the time unit for the retry period.
+         * @return this {@link Builder} instance, for method chaining.
+         */
+        public Builder withRetryPeriod(long retryPeriod, TimeUnit retryPeriodTimeUnit) {
+            this.retryPeriodNanos = retryPeriodTimeUnit.toNanos(retryPeriod);
+            return this;
+        }
+
+        /**
+         * Provides a filter that will decide which errors are counted towards {@link #withMaxErrorsPerMinute(int)}.
+         * <p/>
+         * The default implementation will exclude from the error counting, the following exception types:
+         * <ul>
+         * <li>{@link QueryConsistencyException} and {@link UnavailableException}: the assumption is that these errors
+         * are most often caused by other replicas being unavailable, not by something wrong on the coordinator;</li>
+         * <li>{@link InvalidQueryException}, {@link AlreadyExistsException}, {@link SyntaxError}: these are likely
+         * caused by a bad query in client code, that will fail on all hosts. Excluding hosts could lead to complete
+         * loss of connectivity, rather the solution is to fix the query;</li>
+         * <li>{@link FunctionExecutionException}: similarly, this is caused by a bad function definition and likely to
+         * fail on all hosts.</li>
+         * </ul>
+         *
+         * @param errorFilter the filter class that the policy will use.
+         * @return this {@link Builder} instance, for method chaining.
+         */
+        public Builder withErrorsFilter(ErrorFilter errorFilter) {
+            this.errorFilter = errorFilter;
+            return this;
+        }
+
+        @VisibleForTesting
+        Builder withClock(Clock clock) {
+            this.clock = clock;
+            return this;
+        }
+
+        /**
+         * Creates the {@link ErrorAwarePolicy} instance.
+         *
+         * @return the newly created {@link ErrorAwarePolicy}.
+         */
+        public ErrorAwarePolicy build() {
+            return new ErrorAwarePolicy(this);
+        }
+    }
+
+    class PerHostErrorTracker implements LatencyTracker {
+
+        private final int maxErrorsPerMinute;
+        private final ErrorFilter errorFilter;
+        private final Clock clock;
+        private final ConcurrentMap<Host, RollingCount> hostsCounts = new ConcurrentHashMap<Host, RollingCount>();
+        private final ConcurrentMap<Host, Long> exclusionTimes = new ConcurrentHashMap<Host, Long>();
+
+        PerHostErrorTracker(int maxErrorsPerMinute, ErrorFilter errorFilter, Clock clock) {
+            this.maxErrorsPerMinute = maxErrorsPerMinute;
+            this.errorFilter = errorFilter;
+            this.clock = clock;
+        }
+
+        @Override
+        public void update(Host host, Statement statement, Exception exception, long newLatencyNanos) {
+            if (exception == null) {
+                return;
+            }
+            if (!errorFilter.shouldConsiderError(exception, host, statement)) {
+                return;
+            }
+            RollingCount hostCount = getOrCreateCount(host);
+            hostCount.increment();
+        }
+
+        boolean isExcluded(Host host) {
+            Long excludedTime = exclusionTimes.get(host);
+            boolean expired = excludedTime != null && clock.nanoTime() - excludedTime >= retryPeriodNanos;
+            if (excludedTime == null || expired) {
+                if (maybeExcludeNow(host, excludedTime)) {
+                    return true;
+                }
+                if (expired) {
+                    // Cleanup, but make sure we don't overwrite if another thread just set it
+                    exclusionTimes.remove(host, excludedTime);
+                }
+                return false;
+            } else { // host is already excluded
+                return true;
+            }
+        }
+
+        // Exclude if we're over the threshold
+        private boolean maybeExcludeNow(Host host, Long previousTime) {
+            RollingCount rollingCount = getOrCreateCount(host);
+            long count = rollingCount.get();
+            if (count > maxErrorsPerMinute) {
+                excludeNow(host, count, previousTime);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        // Set the exclusion time to now, handling potential races
+        private void excludeNow(Host host, long count, Long previousTime) {
+            long now = clock.nanoTime();
+            boolean didNotRace = (previousTime == null)
+                    ? exclusionTimes.putIfAbsent(host, now) == null
+                    : exclusionTimes.replace(host, previousTime, now);
+
+            if (didNotRace && logger.isDebugEnabled()) {
+                logger.debug(String.format("Host %s encountered %d errors in the last minute, which is more " +
+                                "than the maximum allowed (%d). It will be excluded from query plans for the " +
+                                "next %d nanoseconds.",
+                        host, count, maxErrorsPerMinute, retryPeriodNanos));
+            }
+        }
+
+        private RollingCount getOrCreateCount(Host host) {
+            RollingCount hostCount = hostsCounts.get(host);
+            if (hostCount == null) {
+                RollingCount tmp = new RollingCount(clock);
+                hostCount = hostsCounts.putIfAbsent(host, tmp);
+                if (hostCount == null)
+                    hostCount = tmp;
+            }
+            return hostCount;
+        }
+
+        @Override
+        public void onRegister(Cluster cluster) {
+            // nothing to do.
+        }
+
+        @Override
+        public void onUnregister(Cluster cluster) {
+            // nothing to do.
+        }
+    }
+
+    static class DefaultErrorFilter implements ErrorFilter {
+        private static final List<Class<? extends Exception>> IGNORED_EXCEPTIONS =
+                ImmutableList.<Class<? extends Exception>>builder()
+                        .add(FunctionExecutionException.class)
+                        .add(QueryConsistencyException.class)
+                        .add(UnavailableException.class)
+                        .add(AlreadyExistsException.class)
+                        .add(InvalidQueryException.class)
+                        .add(SyntaxError.class)
+                        .build();
+
+        @Override
+        public boolean shouldConsiderError(Exception e, Host host, Statement statement) {
+            for (Class<? extends Exception> ignoredException : IGNORED_EXCEPTIONS) {
+                if (ignoredException.isInstance(e))
+                    return false;
+            }
+            return true;
+        }
+    }
+
+    /**
+     * A filter for the errors considered by {@link ErrorAwarePolicy}.
+     * <p/>
+     * Only errors that indicate something wrong with a host should lead to its exclusion from query plans.
+     */
+    public interface ErrorFilter {
+        /**
+         * Whether an error should be counted in the host's error rate.
+         *
+         * @param e         the exception.
+         * @param host      the host.
+         * @param statement the statement that caused the exception.
+         * @return {@code true} if the exception should be counted.
+         */
+        boolean shouldConsiderError(Exception e, Host host, Statement statement);
+    }
+}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java
new file mode 100644
index 0000000..5bccefc
--- /dev/null
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java
@@ -0,0 +1,256 @@
+/*
+ *      Copyright (C) 2012-2015 DataStax Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package com.datastax.driver.core.policies;
+
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ScassandraCluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.UnauthorizedException;
+import org.scassandra.Scassandra;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.scassandra.http.client.PrimingRequest.queryBuilder;
+import static org.scassandra.http.client.Result.success;
+import static org.scassandra.http.client.Result.unauthorized;
+
+public class PerHostErrorTrackerTest {
+
+    @Test
+    public void meterTest() throws InterruptedException {
+        ScassandraCluster sCluster = ScassandraCluster.builder()
+                .withNodes(3)
+                .build();
+
+        String query = "SELECT foo FROM bar";
+        sCluster.init();
+
+        for (Scassandra scassandra : sCluster.nodes()) {
+            scassandra.primingClient().prime(
+                    queryBuilder()
+                            .withQuery(query)
+                            .withResult(success)
+                            .build()
+            );
+        }
+
+        sCluster.node(1).primingClient().prime(queryBuilder()
+                        .withQuery(query)
+                        .withResult(unauthorized)
+                        .build()
+        );
+
+        LoadBalancingPolicy lbp =
+                ErrorAwarePolicy.builder(new AlwaysSameHostsPolicy())
+                        .withInclusionThreshold(0.0000001)
+                        .withRetryPeriod(100000)
+                        .build();
+
+        Cluster cluster = Cluster.builder()
+                .addContactPoints(sCluster.address(1).getAddress(), sCluster.address(2).getAddress(), sCluster.address(3).getAddress())
+                .withPort(sCluster.getBinaryPort())
+                .withLoadBalancingPolicy(lbp)
+                .withRetryPolicy(FallthroughRetryPolicy.INSTANCE)
+                .build();
+
+        Session session = cluster.connect();
+
+        try {
+            // will send to node1, which will respond with an error.
+            session.execute(query);
+        } catch (UnauthorizedException e) {
+            assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+        }
+
+        // For some reason, it takes a certain time for the Meter to be updated when it's at 0...
+        // So the previous is accounted, but it only appears to the tracker a little bit later.
+        Thread.sleep(7000);
+
+        try {
+            // will send to node1, which will respond with an error.
+            session.execute(query);
+            fail("Should have failed after sending query to node1");
+        } catch (UnauthorizedException e) {
+            assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+        }
+
+        Thread.sleep(1000);
+
+        ResultSet rs = session.execute(query);
+        assertThat(rs.getExecutionInfo().getQueriedHost().getSocketAddress()).isEqualTo(sCluster.address(2));
+    }
+
+    @Test
+    public void meterTest2() throws InterruptedException {
+        ScassandraCluster sCluster = ScassandraCluster.builder()
+                .withNodes(3)
+                .build();
+
+        String query = "SELECT foo FROM bar";
+        sCluster.init();
+
+        for (Scassandra scassandra : sCluster.nodes()) {
+            scassandra.primingClient().prime(
+                    queryBuilder()
+                            .withQuery(query)
+                            .withResult(success)
+                            .build()
+            );
+        }
+
+        sCluster.node(1).primingClient().prime(queryBuilder()
+                        .withQuery(query)
+                        .withResult(unauthorized)
+                        .build()
+        );
+
+        LoadBalancingPolicy lbp =
+                ErrorAwarePolicy.builder(new AlwaysSameHostsPolicy())
+                        .withInclusionThreshold(0.2)
+                        .withRetryPeriod(1000)
+                        .build();
+
+        Cluster cluster = Cluster.builder()
+                .addContactPoints(sCluster.address(1).getAddress(), sCluster.address(2).getAddress(), sCluster.address(3).getAddress())
+                .withPort(sCluster.getBinaryPort())
+                .withLoadBalancingPolicy(lbp)
+                .withRetryPolicy(FallthroughRetryPolicy.INSTANCE)
+                .build();
+
+        Session session = cluster.connect();
+
+        try {
+            // will send to node1, which will respond with an error.
+            session.execute(query);
+            fail("Should have failed after sending query to node1");
+        } catch (UnauthorizedException e) {
+            assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+        }
+
+        Thread.sleep(7000);
+
+        try {
+            // will send to node1, which will respond with an error.
+            session.execute(query);
+            fail("Should have failed after sending query to node1");
+        } catch (UnauthorizedException e) {
+            assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+        }
+        // now node1 excluded
+
+        // wait until it is reconsidered
+        Thread.sleep(2000);
+
+        try {
+            // will send to node1, which will respond with an error.
+            session.execute(query);
+            fail("Should have failed after sending query to node1");
+        } catch (UnauthorizedException e) {
+            assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+        }
+
+    }
+
+
+
+    private class AlwaysSameHostsPolicy implements LoadBalancingPolicy {
+
+        Collection<Host> hosts;
+
+        @Override
+        public void init(Cluster cluster, Collection<Host> hosts) {
+            // hosts may have been shuffled so need to sort in ascending order
+            List<Host> copyHosts = new ArrayList<Host>(hosts);
+            Collections.sort(copyHosts, new Comparator<Host>() {
+                @Override
+                public int compare(Host o1, Host o2) {
+                    byte[] ba1 = o1.getAddress().getAddress();
+                    byte[] ba2 = o2.getAddress().getAddress();
+
+                    // general ordering: ipv4 before ipv6
+                    if (ba1.length < ba2.length) return -1;
+                    if (ba1.length > ba2.length) return 1;
+
+                    // we have 2 ips of the same type, so we have to compare each byte
+                    for (int i = 0; i < ba1.length; i++) {
+                        int b1 = unsignedByteToInt(ba1[i]);
+                        int b2 = unsignedByteToInt(ba2[i]);
+                        if (b1 == b2)
+                            continue;
+                        if (b1 < b2)
+                            return -1;
+                        else
+                            return 1;
+                    }
+                    return 0;
+                }
+
+                private int unsignedByteToInt(byte b) {
+                    return (int) b & 0xFF;
+                }
+            });
+            this.hosts = copyHosts;
+        }
+
+        @Override
+        public HostDistance distance(Host host) {
+            return HostDistance.LOCAL;
+        }
+
+        @Override
+        public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+            return hosts.iterator();
+        }
+
+        @Override
+        public void onAdd(Host host) {
+
+        }
+
+        @Override
+        public void onUp(Host host) {
+
+        }
+
+        @Override
+        public void onDown(Host host) {
+
+        }
+
+        @Override
+        public void onRemove(Host host) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+}