JAVA-1055: Add ErrorAware load balancing policy
diff --git a/changelog/README.md b/changelog/README.md
index 04c7003..085aff5 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -13,6 +13,7 @@
- [new feature] JAVA-541: Add polymorphism support to object mapper.
- [new feature] JAVA-636: Allow @Column annotations on getters/setters as well as fields.
- [new feature] JAVA-984: Allow non-void setters in object mapping.
+- [new feature] JAVA-1055: Add ErrorAware load balancing policy.
Merged from 3.0.x branch:
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
index c54e56b..7b7d6a6 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java
@@ -471,7 +471,7 @@
* Registers the provided tracker to be updated with hosts read
* latencies.
* <p/>
- * Registering the same listener multiple times is a no-op.
+ * Registering the same tracker multiple times is a no-op.
* <p/>
* Beware that the registered tracker's
* {@link LatencyTracker#update(Host, Statement, Exception, long) update}
@@ -490,7 +490,7 @@
*/
public Cluster register(LatencyTracker tracker) {
checkNotClosed(manager);
- boolean added = manager.trackers.add(tracker);
+ boolean added = manager.latencyTrackers.add(tracker);
if (added)
tracker.onRegister(this);
return this;
@@ -508,7 +508,7 @@
*/
public Cluster unregister(LatencyTracker tracker) {
checkNotClosed(manager);
- boolean removed = manager.trackers.remove(tracker);
+ boolean removed = manager.latencyTrackers.remove(tracker);
if (removed)
tracker.onUnregister(this);
return this;
@@ -1341,7 +1341,7 @@
ConcurrentMap<MD5Digest, PreparedStatement> preparedQueries;
final Set<Host.StateListener> listeners;
- final Set<LatencyTracker> trackers = new CopyOnWriteArraySet<LatencyTracker>();
+ final Set<LatencyTracker> latencyTrackers = new CopyOnWriteArraySet<LatencyTracker>();
final Set<SchemaChangeListener> schemaChangeListeners = new CopyOnWriteArraySet<SchemaChangeListener>();
EventDebouncer<NodeListRefreshRequest> nodeListRefreshRequestDebouncer;
@@ -1484,11 +1484,10 @@
configuration.getPolicies().getRetryPolicy().init(Cluster.this);
reconnectionPolicy().init(Cluster.this);
configuration.getPolicies().getAddressTranslator().init(Cluster.this);
- for (LatencyTracker tracker : trackers)
+ for (LatencyTracker tracker : latencyTrackers)
tracker.onRegister(Cluster.this);
for (Host.StateListener listener : listeners)
listener.onRegister(Cluster.this);
-
for (Host host : removedContactPointHosts) {
loadBalancingPolicy().onRemove(host);
for (Host.StateListener listener : listeners)
@@ -1592,8 +1591,8 @@
return sessions.remove(session);
}
- void reportLatency(Host host, Statement statement, Exception exception, long latencyNanos) {
- for (LatencyTracker tracker : trackers) {
+ void reportQuery(Host host, Statement statement, Exception exception, long latencyNanos) {
+ for (LatencyTracker tracker : latencyTrackers) {
tracker.update(host, statement, exception, latencyNanos);
}
}
@@ -1634,7 +1633,7 @@
configuration.getPolicies().getRetryPolicy().close();
reconnectionPolicy().close();
configuration.getPolicies().getAddressTranslator().close();
- for (LatencyTracker tracker : trackers)
+ for (LatencyTracker tracker : latencyTrackers)
tracker.onUnregister(Cluster.this);
for (Host.StateListener listener : listeners)
listener.onUnregister(Cluster.this);
diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
index 23f475e..65d269e 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
@@ -638,8 +638,9 @@
exceptionToReport = e;
setFinalException(connection, e);
} finally {
- if (queriedHost != null && statement != Statement.DEFAULT)
- manager.cluster.manager.reportLatency(queriedHost, statement, exceptionToReport, latency);
+ if (queriedHost != null && statement != Statement.DEFAULT) {
+ manager.cluster.manager.reportQuery(queriedHost, statement, exceptionToReport, latency);
+ }
}
}
@@ -746,7 +747,7 @@
setFinalException(null, new DriverInternalError("An unexpected error happened while handling exception " + exception, e));
} finally {
if (queriedHost != null && statement != Statement.DEFAULT)
- manager.cluster.manager.reportLatency(queriedHost, statement, exception, latency);
+ manager.cluster.manager.reportQuery(queriedHost, statement, exception, latency);
}
}
@@ -774,7 +775,7 @@
setFinalException(null, new DriverInternalError("An unexpected error happened while handling timeout", e));
} finally {
if (queriedHost != null && statement != Statement.DEFAULT)
- manager.cluster.manager.reportLatency(queriedHost, statement, timeoutException, latency);
+ manager.cluster.manager.reportQuery(queriedHost, statement, timeoutException, latency);
}
return true;
}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java
new file mode 100644
index 0000000..e718e49
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core.policies;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Chainable load balancing policy that filters out hosts based on their error rates.
+ * <p/>
+ * When creating a query plan, this policy gathers a list of candidate hosts from its child policy; for each candidate
+ * host, it then determines whether it should be included into or excluded from the final query plan, based on its
+ * current error rate (measured over the last minute, with a 5-second granularity).
+ * <p/>
+ * Note that the policy should not blindly count all errors in its measurements: some type of errors (e.g. CQL syntax
+ * errors) can originate from the client and occur on all hosts, therefore they should not count towards the exclusion
+ * threshold or all hosts could become excluded. You can provide your own {@link ErrorFilter} to customize that logic.
+ * <p/>
+ * The policy follows the builder pattern to be created, the {@link Builder} class can be created with
+ * {@link #builder} method.
+ * <p/>
+ * This policy is currently in BETA mode and its behavior might be changing throughout different driver versions.
+ */
+@Beta
+public class ErrorAwarePolicy implements ChainableLoadBalancingPolicy {
+
+ private static final Logger logger = LoggerFactory.getLogger(ErrorAwarePolicy.class);
+
+ private final LoadBalancingPolicy childPolicy;
+
+ private final long retryPeriodNanos;
+
+ PerHostErrorTracker errorTracker;
+
+ private ErrorAwarePolicy(Builder builder) {
+ this.childPolicy = builder.childPolicy;
+ this.retryPeriodNanos = builder.retryPeriodNanos;
+ this.errorTracker = new PerHostErrorTracker(builder.maxErrorsPerMinute, builder.errorFilter, builder.clock);
+ }
+
+ @Override
+ public LoadBalancingPolicy getChildPolicy() {
+ return childPolicy;
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts) {
+ childPolicy.init(cluster, hosts);
+ cluster.register(this.errorTracker);
+ }
+
+ @Override
+ public HostDistance distance(Host host) {
+ return childPolicy.distance(host);
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+ final Iterator<Host> childQueryPlan = childPolicy.newQueryPlan(loggedKeyspace, statement);
+
+ return new AbstractIterator<Host>() {
+
+ @Override
+ protected Host computeNext() {
+ while (childQueryPlan.hasNext()) {
+ Host host = childQueryPlan.next();
+ if (!errorTracker.isExcluded(host)) {
+ return host;
+ }
+ }
+ return endOfData();
+ }
+ };
+ }
+
+ @Override
+ public void onAdd(Host host) {
+ childPolicy.onAdd(host);
+ }
+
+ @Override
+ public void onUp(Host host) {
+ childPolicy.onUp(host);
+ }
+
+ @Override
+ public void onDown(Host host) {
+ childPolicy.onDown(host);
+ }
+
+ @Override
+ public void onRemove(Host host) {
+ childPolicy.onRemove(host);
+ }
+
+ /**
+ * Creates a new error aware policy builder given the child policy
+ * that the resulting policy should wrap.
+ *
+ * @param childPolicy the load balancing policy to wrap with error
+ * awareness.
+ * @return the created builder.
+ */
+ public static Builder builder(LoadBalancingPolicy childPolicy) {
+ return new Builder(childPolicy);
+ }
+
+ @Override
+ public void close() {
+ childPolicy.close();
+ }
+
+ /**
+ * Utility class to create a {@link ErrorAwarePolicy}.
+ */
+ public static class Builder {
+ final LoadBalancingPolicy childPolicy;
+
+ private int maxErrorsPerMinute = 1;
+ private long retryPeriodNanos = NANOSECONDS.convert(2, MINUTES);
+ private Clock clock = Clock.DEFAULT;
+
+ private ErrorFilter errorFilter = new DefaultErrorFilter();
+
+ /**
+ * Creates a {@link Builder} instance.
+ *
+ * @param childPolicy the load balancing policy to wrap with error
+ * awareness.
+ */
+ public Builder(LoadBalancingPolicy childPolicy) {
+ this.childPolicy = childPolicy;
+ }
+
+ /**
+ * Defines the maximum number of errors allowed per minute for each host.
+ * <p/>
+ * The policy keeps track of the number of errors on each host (filtered by
+ * {@link #withErrorsFilter(ErrorFilter)}) over a sliding 1-minute window. If a host had more than this number
+ * of errors, it will be excluded from the query plan for the duration defined by
+ * {@link #withRetryPeriod(long, TimeUnit)}.
+ * <p/>
+ * Default value for the threshold is 1.
+ *
+ * @param maxErrorsPerMinute the number.
+ * @return this {@link Builder} instance, for method chaining.
+ */
+ public Builder withMaxErrorsPerMinute(int maxErrorsPerMinute) {
+ this.maxErrorsPerMinute = maxErrorsPerMinute;
+ return this;
+ }
+
+ /**
+ * Defines the time during which a host is excluded by the policy once it has exceeded
+ * {@link #withMaxErrorsPerMinute(int)}.
+ * <p/>
+ * Default value for the retry period is 2 minutes.
+ *
+ * @param retryPeriod the period of exclusion for a host.
+ * @param retryPeriodTimeUnit the time unit for the retry period.
+ * @return this {@link Builder} instance, for method chaining.
+ */
+ public Builder withRetryPeriod(long retryPeriod, TimeUnit retryPeriodTimeUnit) {
+ this.retryPeriodNanos = retryPeriodTimeUnit.toNanos(retryPeriod);
+ return this;
+ }
+
+ /**
+ * Provides a filter that will decide which errors are counted towards {@link #withMaxErrorsPerMinute(int)}.
+ * <p/>
+ * The default implementation will exclude from the error counting, the following exception types:
+ * <ul>
+ * <li>{@link QueryConsistencyException} and {@link UnavailableException}: the assumption is that these errors
+ * are most often caused by other replicas being unavailable, not by something wrong on the coordinator;</li>
+ * <li>{@link InvalidQueryException}, {@link AlreadyExistsException}, {@link SyntaxError}: these are likely
+ * caused by a bad query in client code, that will fail on all hosts. Excluding hosts could lead to complete
+ * loss of connectivity, rather the solution is to fix the query;</li>
+ * <li>{@link FunctionExecutionException}: similarly, this is caused by a bad function definition and likely to
+ * fail on all hosts.</li>
+ * </ul>
+ *
+ * @param errorFilter the filter class that the policy will use.
+ * @return this {@link Builder} instance, for method chaining.
+ */
+ public Builder withErrorsFilter(ErrorFilter errorFilter) {
+ this.errorFilter = errorFilter;
+ return this;
+ }
+
+ @VisibleForTesting
+ Builder withClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ /**
+ * Creates the {@link ErrorAwarePolicy} instance.
+ *
+ * @return the newly created {@link ErrorAwarePolicy}.
+ */
+ public ErrorAwarePolicy build() {
+ return new ErrorAwarePolicy(this);
+ }
+ }
+
+ class PerHostErrorTracker implements LatencyTracker {
+
+ private final int maxErrorsPerMinute;
+ private final ErrorFilter errorFilter;
+ private final Clock clock;
+ private final ConcurrentMap<Host, RollingCount> hostsCounts = new ConcurrentHashMap<Host, RollingCount>();
+ private final ConcurrentMap<Host, Long> exclusionTimes = new ConcurrentHashMap<Host, Long>();
+
+ PerHostErrorTracker(int maxErrorsPerMinute, ErrorFilter errorFilter, Clock clock) {
+ this.maxErrorsPerMinute = maxErrorsPerMinute;
+ this.errorFilter = errorFilter;
+ this.clock = clock;
+ }
+
+ @Override
+ public void update(Host host, Statement statement, Exception exception, long newLatencyNanos) {
+ if (exception == null) {
+ return;
+ }
+ if (!errorFilter.shouldConsiderError(exception, host, statement)) {
+ return;
+ }
+ RollingCount hostCount = getOrCreateCount(host);
+ hostCount.increment();
+ }
+
+ boolean isExcluded(Host host) {
+ Long excludedTime = exclusionTimes.get(host);
+ boolean expired = excludedTime != null && clock.nanoTime() - excludedTime >= retryPeriodNanos;
+ if (excludedTime == null || expired) {
+ if (maybeExcludeNow(host, excludedTime)) {
+ return true;
+ }
+ if (expired) {
+ // Cleanup, but make sure we don't overwrite if another thread just set it
+ exclusionTimes.remove(host, excludedTime);
+ }
+ return false;
+ } else { // host is already excluded
+ return true;
+ }
+ }
+
+ // Exclude if we're over the threshold
+ private boolean maybeExcludeNow(Host host, Long previousTime) {
+ RollingCount rollingCount = getOrCreateCount(host);
+ long count = rollingCount.get();
+ if (count > maxErrorsPerMinute) {
+ excludeNow(host, count, previousTime);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Set the exclusion time to now, handling potential races
+ private void excludeNow(Host host, long count, Long previousTime) {
+ long now = clock.nanoTime();
+ boolean didNotRace = (previousTime == null)
+ ? exclusionTimes.putIfAbsent(host, now) == null
+ : exclusionTimes.replace(host, previousTime, now);
+
+ if (didNotRace && logger.isDebugEnabled()) {
+ logger.debug(String.format("Host %s encountered %d errors in the last minute, which is more " +
+ "than the maximum allowed (%d). It will be excluded from query plans for the " +
+ "next %d nanoseconds.",
+ host, count, maxErrorsPerMinute, retryPeriodNanos));
+ }
+ }
+
+ private RollingCount getOrCreateCount(Host host) {
+ RollingCount hostCount = hostsCounts.get(host);
+ if (hostCount == null) {
+ RollingCount tmp = new RollingCount(clock);
+ hostCount = hostsCounts.putIfAbsent(host, tmp);
+ if (hostCount == null)
+ hostCount = tmp;
+ }
+ return hostCount;
+ }
+
+ @Override
+ public void onRegister(Cluster cluster) {
+ // nothing to do.
+ }
+
+ @Override
+ public void onUnregister(Cluster cluster) {
+ // nothing to do.
+ }
+ }
+
+ static class DefaultErrorFilter implements ErrorFilter {
+ private static final List<Class<? extends Exception>> IGNORED_EXCEPTIONS =
+ ImmutableList.<Class<? extends Exception>>builder()
+ .add(FunctionExecutionException.class)
+ .add(QueryConsistencyException.class)
+ .add(UnavailableException.class)
+ .add(AlreadyExistsException.class)
+ .add(InvalidQueryException.class)
+ .add(SyntaxError.class)
+ .build();
+
+ @Override
+ public boolean shouldConsiderError(Exception e, Host host, Statement statement) {
+ for (Class<? extends Exception> ignoredException : IGNORED_EXCEPTIONS) {
+ if (ignoredException.isInstance(e))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /**
+ * A filter for the errors considered by {@link ErrorAwarePolicy}.
+ * <p/>
+ * Only errors that indicate something wrong with a host should lead to its exclusion from query plans.
+ */
+ public interface ErrorFilter {
+ /**
+ * Whether an error should be counted in the host's error rate.
+ *
+ * @param e the exception.
+ * @param host the host.
+ * @param statement the statement that caused the exception.
+ * @return {@code true} if the exception should be counted.
+ */
+ boolean shouldConsiderError(Exception e, Host host, Statement statement);
+ }
+}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java
new file mode 100644
index 0000000..5bccefc
--- /dev/null
+++ b/driver-core/src/test/java/com/datastax/driver/core/policies/PerHostErrorTrackerTest.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core.policies;
+
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ScassandraCluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.UnauthorizedException;
+import org.scassandra.Scassandra;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.scassandra.http.client.PrimingRequest.queryBuilder;
+import static org.scassandra.http.client.Result.success;
+import static org.scassandra.http.client.Result.unauthorized;
+
+public class PerHostErrorTrackerTest {
+
+ @Test
+ public void meterTest() throws InterruptedException {
+ ScassandraCluster sCluster = ScassandraCluster.builder()
+ .withNodes(3)
+ .build();
+
+ String query = "SELECT foo FROM bar";
+ sCluster.init();
+
+ for (Scassandra scassandra : sCluster.nodes()) {
+ scassandra.primingClient().prime(
+ queryBuilder()
+ .withQuery(query)
+ .withResult(success)
+ .build()
+ );
+ }
+
+ sCluster.node(1).primingClient().prime(queryBuilder()
+ .withQuery(query)
+ .withResult(unauthorized)
+ .build()
+ );
+
+ LoadBalancingPolicy lbp =
+ ErrorAwarePolicy.builder(new AlwaysSameHostsPolicy())
+ .withInclusionThreshold(0.0000001)
+ .withRetryPeriod(100000)
+ .build();
+
+ Cluster cluster = Cluster.builder()
+ .addContactPoints(sCluster.address(1).getAddress(), sCluster.address(2).getAddress(), sCluster.address(3).getAddress())
+ .withPort(sCluster.getBinaryPort())
+ .withLoadBalancingPolicy(lbp)
+ .withRetryPolicy(FallthroughRetryPolicy.INSTANCE)
+ .build();
+
+ Session session = cluster.connect();
+
+ try {
+ // will send to node1, which will respond with an error.
+ session.execute(query);
+ } catch (UnauthorizedException e) {
+ assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+ }
+
+ // For some reason, it takes a certain time for the Meter to be updated when it's at 0...
+ // So the previous is accounted, but it only appears to the tracker a little bit later.
+ Thread.sleep(7000);
+
+ try {
+ // will send to node1, which will respond with an error.
+ session.execute(query);
+ fail("Should have failed after sending query to node1");
+ } catch (UnauthorizedException e) {
+ assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+ }
+
+ Thread.sleep(1000);
+
+ ResultSet rs = session.execute(query);
+ assertThat(rs.getExecutionInfo().getQueriedHost().getSocketAddress()).isEqualTo(sCluster.address(2));
+ }
+
+ @Test
+ public void meterTest2() throws InterruptedException {
+ ScassandraCluster sCluster = ScassandraCluster.builder()
+ .withNodes(3)
+ .build();
+
+ String query = "SELECT foo FROM bar";
+ sCluster.init();
+
+ for (Scassandra scassandra : sCluster.nodes()) {
+ scassandra.primingClient().prime(
+ queryBuilder()
+ .withQuery(query)
+ .withResult(success)
+ .build()
+ );
+ }
+
+ sCluster.node(1).primingClient().prime(queryBuilder()
+ .withQuery(query)
+ .withResult(unauthorized)
+ .build()
+ );
+
+ LoadBalancingPolicy lbp =
+ ErrorAwarePolicy.builder(new AlwaysSameHostsPolicy())
+ .withInclusionThreshold(0.2)
+ .withRetryPeriod(1000)
+ .build();
+
+ Cluster cluster = Cluster.builder()
+ .addContactPoints(sCluster.address(1).getAddress(), sCluster.address(2).getAddress(), sCluster.address(3).getAddress())
+ .withPort(sCluster.getBinaryPort())
+ .withLoadBalancingPolicy(lbp)
+ .withRetryPolicy(FallthroughRetryPolicy.INSTANCE)
+ .build();
+
+ Session session = cluster.connect();
+
+ try {
+ // will send to node1, which will respond with an error.
+ session.execute(query);
+ fail("Should have failed after sending query to node1");
+ } catch (UnauthorizedException e) {
+ assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+ }
+
+ Thread.sleep(7000);
+
+ try {
+ // will send to node1, which will respond with an error.
+ session.execute(query);
+ fail("Should have failed after sending query to node1");
+ } catch (UnauthorizedException e) {
+ assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+ }
+ // now node1 excluded
+
+ // wait until it is reconsidered
+ Thread.sleep(2000);
+
+ try {
+ // will send to node1, which will respond with an error.
+ session.execute(query);
+ fail("Should have failed after sending query to node1");
+ } catch (UnauthorizedException e) {
+ assertThat(e.getAddress()).isEqualTo(sCluster.address(1));
+ }
+
+ }
+
+
+
+ private class AlwaysSameHostsPolicy implements LoadBalancingPolicy {
+
+ Collection<Host> hosts;
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts) {
+ // hosts may have been shuffled so need to sort in ascending order
+ List<Host> copyHosts = new ArrayList<Host>(hosts);
+ Collections.sort(copyHosts, new Comparator<Host>() {
+ @Override
+ public int compare(Host o1, Host o2) {
+ byte[] ba1 = o1.getAddress().getAddress();
+ byte[] ba2 = o2.getAddress().getAddress();
+
+ // general ordering: ipv4 before ipv6
+ if (ba1.length < ba2.length) return -1;
+ if (ba1.length > ba2.length) return 1;
+
+ // we have 2 ips of the same type, so we have to compare each byte
+ for (int i = 0; i < ba1.length; i++) {
+ int b1 = unsignedByteToInt(ba1[i]);
+ int b2 = unsignedByteToInt(ba2[i]);
+ if (b1 == b2)
+ continue;
+ if (b1 < b2)
+ return -1;
+ else
+ return 1;
+ }
+ return 0;
+ }
+
+ private int unsignedByteToInt(byte b) {
+ return (int) b & 0xFF;
+ }
+ });
+ this.hosts = copyHosts;
+ }
+
+ @Override
+ public HostDistance distance(Host host) {
+ return HostDistance.LOCAL;
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+ return hosts.iterator();
+ }
+
+ @Override
+ public void onAdd(Host host) {
+
+ }
+
+ @Override
+ public void onUp(Host host) {
+
+ }
+
+ @Override
+ public void onDown(Host host) {
+
+ }
+
+ @Override
+ public void onRemove(Host host) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}