Merge branch 'cassandra-3.0' into cassandra-3.11
diff --git a/build.xml b/build.xml
index 25a4733..9932604 100644
--- a/build.xml
+++ b/build.xml
@@ -120,6 +120,7 @@
<property name="jacoco.version" value="0.7.5.201505241946"/>
<property name="byteman.version" value="3.0.3"/>
+ <property name="bytebuddy.version" value="1.10.10"/>
<property name="ecj.version" value="4.4.2"/>
@@ -423,7 +424,7 @@
</dependency>
<dependency groupId="junit" artifactId="junit" version="4.6" />
<dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.2" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
@@ -447,6 +448,8 @@
<dependency groupId="org.jboss.byteman" artifactId="byteman-submit" version="${byteman.version}"/>
<dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" version="${byteman.version}"/>
+ <dependency groupId="net.bytebuddy" artifactId="byte-buddy" version="${bytebuddy.version}" />
+ <dependency groupId="net.bytebuddy" artifactId="byte-buddy-agent" version="${bytebuddy.version}" />
<dependency groupId="org.openjdk.jmh" artifactId="jmh-core" version="1.21"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" version="1.21"/>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0016316..3cb8dac 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -116,6 +117,8 @@
// mutated by user-facing API
private final MessageFilters filters;
+ private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
@@ -152,6 +155,8 @@
private IInvokableInstance newInstance(int generation)
{
ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
+ if (instanceInitializer != null)
+ instanceInitializer.accept(classLoader, config.num());
return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
.apply(config, classLoader);
}
@@ -261,6 +266,7 @@
this.instanceMap = new HashMap<>();
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
+ this.instanceInitializer = builder.getInstanceInitializer();
int generation = GENERATION.incrementAndGet();
for (int i = 0; i < builder.getNodeCount(); ++i)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
new file mode 100644
index 0000000..b49572d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ByteBuddyExamples extends TestBaseImpl
+{
+ @Test
+ public void writeFailureTest() throws Throwable
+ {
+ try(Cluster cluster = init(Cluster.build(1)
+ .withInstanceInitializer(BBFailHelper::install)
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ try
+ {
+ cluster.coordinator(1).execute("insert into " + KEYSPACE + ".tbl (id, t) values (1, 1)", ConsistencyLevel.ALL);
+ fail("Should fail");
+ }
+ catch (RuntimeException e)
+ {
+ // expected
+ }
+ }
+ }
+
+ public static class BBFailHelper
+ {
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ new ByteBuddy().redefine(ModificationStatement.class)
+ .method(named("execute"))
+ .intercept(MethodDelegation.to(BBFailHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ public static ResultMessage execute()
+ {
+ throw new RuntimeException();
+ }
+ }
+
+ @Test
+ public void countTest() throws IOException
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withInstanceInitializer(BBCountHelper::install)
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ cluster.coordinator(1).execute("select * from " + KEYSPACE + ".tbl;", ConsistencyLevel.ALL);
+ cluster.coordinator(2).execute("select * from " + KEYSPACE + ".tbl;", ConsistencyLevel.ALL);
+ cluster.get(1).runOnInstance(() -> {
+ assertEquals(1, BBCountHelper.count.get());
+ });
+ cluster.get(2).runOnInstance(() -> {
+ assertEquals(0, BBCountHelper.count.get());
+ });
+
+ }
+ }
+
+ public static class BBCountHelper
+ {
+ static AtomicInteger count = new AtomicInteger();
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber != 1)
+ return;
+ new ByteBuddy().rebase(SelectStatement.class)
+ .method(named("execute").and(takesArguments(3)))
+ .intercept(MethodDelegation.to(BBCountHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime, @SuperCall Callable<ResultMessage.Rows> r) throws Exception
+ {
+ count.incrementAndGet();
+ return r.call();
+ }
+ }
+
+}