blob: 28aa6b0bccf11e771d712dcde3f52f3bf932be29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.streaming;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.streaming.CassandraIncomingFile;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.LogResult;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
import org.apache.cassandra.io.util.SequentialWriter;
import org.assertj.core.api.Assertions;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
public class AbstractStreamFailureLogs extends TestBaseImpl
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractStreamFailureLogs.class);
protected static final int FAILING_NODE = 2;
protected void streamTest(boolean zeroCopyStreaming, String reason, Integer failedNode) throws IOException
{
try (Cluster cluster = Cluster.build(2)
.withInstanceInitializer(BBStreamHelper::install)
.withConfig(c -> c.with(Feature.values())
.set("stream_entire_sstables", zeroCopyStreaming)
// when die, this will try to halt JVM, which is easier to validate in the test
// other levels require checking state of the subsystems
.set("disk_failure_policy", "die"))
.start())
{
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)"));
triggerStreaming(cluster);
// make sure disk failure policy is not triggered
IInvokableInstance failingNode = cluster.get(failedNode);
searchForLog(failingNode, reason);
}
}
protected void triggerStreaming(Cluster cluster)
{
IInvokableInstance node1 = cluster.get(1);
IInvokableInstance node2 = cluster.get(2);
// repair will do streaming IFF there is a mismatch; so cause one
for (int i = 0; i < 10; i++)
node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); // timestamp won't match, causing a mismatch
// trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event)
//Blocks until the stream is complete
node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().failure();
}
protected void searchForLog(IInvokableInstance failingNode, String reason)
{
searchForLog(failingNode, true, reason);
}
protected boolean searchForLog(IInvokableInstance failingNode, boolean failIfNoMatch, String reason)
{
LogResult<List<String>> result = failingNode.logs().grepForErrors(-1, Pattern.compile("Stream failed:"));
// grepForErrors will include all ERROR logs even if they don't match the pattern; for this reason need to filter after the fact
List<String> matches = result.getResult();
matches = matches.stream().filter(s -> s.startsWith("WARN")).collect(Collectors.toList());
logger.info("Stream failed logs found: {}", String.join("\n", matches));
if (matches.isEmpty() && !failIfNoMatch)
return false;
Assertions.assertThat(matches)
.describedAs("node%d expected to find %s but could not", failingNode.config().num(), reason)
.hasSize(1);
String logLine = matches.get(0);
Assertions.assertThat(logLine).contains(reason);
Matcher match = Pattern.compile(".*\\[Stream #(.*)\\]").matcher(logLine);
if (!match.find()) throw new AssertionError("Unable to parse: " + logLine);
UUID planId = UUID.fromString(match.group(1));
SimpleQueryResult qr = failingNode.executeInternalWithResult("SELECT * FROM system_views.streaming WHERE id=?", planId);
Assertions.assertThat(qr.hasNext()).isTrue();
Assertions.assertThat(qr.next().getString("failure_cause")).contains(reason);
return true;
}
public static class BBStreamHelper
{
@SuppressWarnings("unused")
public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall Callable<Integer> zuper) throws Exception
{
if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
throw new RuntimeException("TEST");
// different context; pass through
return zuper.call();
}
@SuppressWarnings("unused")
public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception
{
if (isCaller(CassandraIncomingFile.class.getName(), "read")) // handles compressed and non-compressed
throw new java.nio.channels.ClosedChannelException();
// different context; pass through
return zuper.call();
}
public static void install(ClassLoader classLoader, Integer num)
{
if (num != FAILING_NODE)
return;
new ByteBuddy().rebase(SequentialWriter.class)
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBStreamHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
.method(named("append").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBStreamHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}
protected static boolean isCaller(String klass, String method)
{
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (int i = 0; i < stack.length; i++)
{
StackTraceElement e = stack[i];
if (klass.equals(e.getClassName()) && method.equals(e.getMethodName()))
return true;
}
return false;
}
}