Merge branch 'cassandra-3.0' into cassandra-3.11
diff --git a/build.xml b/build.xml
index 25a4733..9932604 100644
--- a/build.xml
+++ b/build.xml
@@ -120,6 +120,7 @@
     <property name="jacoco.version" value="0.7.5.201505241946"/>
 
     <property name="byteman.version" value="3.0.3"/>
+    <property name="bytebuddy.version" value="1.10.10"/>
 
     <property name="ecj.version" value="4.4.2"/>
 
@@ -423,7 +424,7 @@
           </dependency>
           <dependency groupId="junit" artifactId="junit" version="4.6" />
           <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.2" />
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
@@ -447,6 +448,8 @@
           <dependency groupId="org.jboss.byteman" artifactId="byteman-submit" version="${byteman.version}"/>
           <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" version="${byteman.version}"/>
 
+          <dependency groupId="net.bytebuddy" artifactId="byte-buddy" version="${bytebuddy.version}" />
+          <dependency groupId="net.bytebuddy" artifactId="byte-buddy-agent" version="${bytebuddy.version}" />
 
           <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" version="1.21"/>
           <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" version="1.21"/>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0016316..3cb8dac 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -116,6 +117,8 @@
 
     // mutated by user-facing API
     private final MessageFilters filters;
+    private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 
     protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
@@ -152,6 +155,8 @@
         private IInvokableInstance newInstance(int generation)
         {
             ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
+            if (instanceInitializer != null)
+                instanceInitializer.accept(classLoader, config.num());
             return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
                                         .apply(config, classLoader);
         }
@@ -261,6 +266,7 @@
         this.instanceMap = new HashMap<>();
         this.initialVersion = builder.getVersion();
         this.filters = new MessageFilters();
+        this.instanceInitializer = builder.getInstanceInitializer();
 
         int generation = GENERATION.incrementAndGet();
         for (int i = 0; i < builder.getNodeCount(); ++i)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
new file mode 100644
index 0000000..b49572d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ByteBuddyExamples extends TestBaseImpl
+{
+    @Test
+    public void writeFailureTest() throws Throwable
+    {
+        try(Cluster cluster = init(Cluster.build(1)
+                                          .withInstanceInitializer(BBFailHelper::install)
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            try
+            {
+                cluster.coordinator(1).execute("insert into " + KEYSPACE + ".tbl (id, t) values (1, 1)", ConsistencyLevel.ALL);
+                fail("Should fail");
+            }
+            catch (RuntimeException e)
+            {
+                // expected
+            }
+        }
+    }
+
+    public static class BBFailHelper
+    {
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            new ByteBuddy().redefine(ModificationStatement.class)
+                           .method(named("execute"))
+                           .intercept(MethodDelegation.to(BBFailHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+        public static ResultMessage execute()
+        {
+            throw new RuntimeException();
+        }
+    }
+
+    @Test
+    public void countTest() throws IOException
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withInstanceInitializer(BBCountHelper::install)
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            cluster.coordinator(1).execute("select * from " + KEYSPACE + ".tbl;", ConsistencyLevel.ALL);
+            cluster.coordinator(2).execute("select * from " + KEYSPACE + ".tbl;", ConsistencyLevel.ALL);
+            cluster.get(1).runOnInstance(() -> {
+                assertEquals(1, BBCountHelper.count.get());
+            });
+            cluster.get(2).runOnInstance(() -> {
+                assertEquals(0, BBCountHelper.count.get());
+            });
+
+        }
+    }
+
+    public static class BBCountHelper
+    {
+        static AtomicInteger count = new AtomicInteger();
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber != 1)
+                return;
+            new ByteBuddy().rebase(SelectStatement.class)
+                           .method(named("execute").and(takesArguments(3)))
+                           .intercept(MethodDelegation.to(BBCountHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime, @SuperCall Callable<ResultMessage.Rows> r) throws Exception
+        {
+            count.incrementAndGet();
+            return r.call();
+        }
+    }
+
+}