blob: fc52ab6de19d172dec2c8b8e1e93c0154f4150fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.streaming;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.streaming.CassandraIncomingFile;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
import org.apache.cassandra.io.util.SequentialWriter;
import org.assertj.core.api.Assertions;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
public class StreamCloseInMiddleTest extends TestBaseImpl
{
@Test
public void zeroCopy() throws IOException
{
streamClose(true);
}
@Test
public void notZeroCopy() throws IOException
{
streamClose(false);
}
private void streamClose(boolean zeroCopyStreaming) throws IOException
{
try (Cluster cluster = Cluster.build(2)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
.withInstanceInitializer(BBHelper::install)
.withConfig(c -> c.with(Feature.values())
.set("stream_entire_sstables", zeroCopyStreaming)
// when die, this will try to halt JVM, which is easier to validate in the test
// other levels require checking state of the subsystems
.set("disk_failure_policy", "die"))
.start())
{
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)"));
triggerStreaming(cluster, zeroCopyStreaming);
// make sure disk failure policy is not triggered
assertNoNodeShutdown(cluster);
// now bootstrap a new node; streaming will fail
IInvokableInstance node3 = ClusterUtils.addInstance(cluster, cluster.get(1).config(), c -> c.set("auto_bootstrap", true));
node3.startup();
for (String line : Arrays.asList("Error while waiting on bootstrap to complete. Bootstrap will have to be restarted", // bootstrap failed
"Some data streaming failed. Use nodetool to check bootstrap state and resume")) // didn't join ring because bootstrap failed
Assertions.assertThat(node3.logs().grep(line).getResult())
.hasSize(1);
assertNoNodeShutdown(cluster);
}
}
private void assertNoNodeShutdown(Cluster cluster)
{
AssertionError t = null;
for (IInvokableInstance i : cluster.stream().collect(Collectors.toList()))
{
try
{
Assertions.assertThat(i.isShutdown()).describedAs("%s was shutdown; this is not expected", i).isFalse();
Assertions.assertThat(i.killAttempts()).describedAs("%s saw kill attempts; this is not expected", i).isEqualTo(0);
}
catch (AssertionError t2)
{
if (t == null)
t = t2;
else
t.addSuppressed(t2);
}
}
if (t != null)
throw t;
}
private static void triggerStreaming(Cluster cluster, boolean expectedEntireSSTable)
{
IInvokableInstance node1 = cluster.get(1);
IInvokableInstance node2 = cluster.get(2);
// repair will do streaming IFF there is a mismatch; so cause one
for (int i = 0; i < 10; i++)
node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); // timestamp won't match, causing a mismatch
// trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event)
node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().failure();
assertStreamingType(node2, expectedEntireSSTable);
}
private static void assertStreamingType(IInvokableInstance node, boolean expectedEntireSSTable)
{
String key = "org.apache.cassandra.metrics.Streaming.%s./127.0.0.1.7012";
long entire = node.metrics().getCounter(String.format(key, "EntireSSTablesStreamedIn"));
long partial = node.metrics().getCounter(String.format(key, "PartialSSTablesStreamedIn"));
if (expectedEntireSSTable)
{
Assertions.assertThat(partial).isEqualTo(0);
Assertions.assertThat(entire).isGreaterThan(0);
}
else
{
Assertions.assertThat(partial).isGreaterThan(0);
Assertions.assertThat(entire).isEqualTo(0);
}
}
public static class BBHelper
{
@SuppressWarnings("unused")
public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall Callable<Integer> zuper) throws Exception
{
if (isCaller(BigTableZeroCopyWriter.class.getName(), "write"))
throw new java.nio.channels.ClosedChannelException();
// different context; pass through
return zuper.call();
}
@SuppressWarnings("unused")
public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception
{
if (isCaller(CassandraIncomingFile.class.getName(), "read")) // handles compressed and non-compressed
throw new java.nio.channels.ClosedChannelException();
// different context; pass through
return zuper.call();
}
private static boolean isCaller(String klass, String method)
{
//TODO is there a cleaner way to check this?
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (int i = 0; i < stack.length; i++)
{
StackTraceElement e = stack[i];
if (klass.equals(e.getClassName()) && method.equals(e.getMethodName()))
return true;
}
return false;
}
public static void install(ClassLoader classLoader, Integer num)
{
new ByteBuddy().rebase(SequentialWriter.class)
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
.method(named("append").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}
}