blob: b473597a5b1def61c1d2190a2b51ad82d5c52f33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.streaming;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.streaming.CassandraIncomingFile;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Shared;
import org.awaitility.Awaitility;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
public class StreamFailureLogsFailureDueToSessionTimeoutTest extends AbstractStreamFailureLogs
{
@Test
public void failureDueToSessionTimeout() throws IOException
{
try (Cluster cluster = Cluster.build(2)
.withInstanceInitializer(BBStreamTimeoutHelper::install)
.withConfig(c -> c.with(Feature.values())
// when die, this will try to halt JVM, which is easier to validate in the test
// other levels require checking state of the subsystems
.set("stream_transfer_task_timeout", "1ms"))
.start())
{
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)"));
ForkJoinPool.commonPool().execute(() -> triggerStreaming(cluster));
try
{
Awaitility.await("Did not see stream running or timed out")
.atMost(3, TimeUnit.MINUTES)
.until(() -> State.STREAM_IS_RUNNING.await(false) || searchForLog(cluster.get(1), false, "Session timed out"));
}
finally
{
State.UNBLOCK_STREAM.signal();
}
Awaitility.await("Unable to find 'Session timed out'")
.atMost(1, TimeUnit.MINUTES)
.until(() -> searchForLog(cluster.get(1), false, "Session timed out"));
}
}
@Shared
public static class State
{
public static final TestCondition STREAM_IS_RUNNING = new TestCondition();
public static final TestCondition UNBLOCK_STREAM = new TestCondition();
}
@Shared
public static class TestCondition
{
private volatile boolean signaled = false;
public void await()
{
await(true);
}
public boolean await(boolean throwOnTimeout)
{
long deadlineNanos = Clock.Global.nanoTime() + TimeUnit.MINUTES.toNanos(1);
while (!signaled)
{
long remainingMillis = TimeUnit.NANOSECONDS.toMillis(deadlineNanos - Clock.Global.nanoTime());
if (remainingMillis <= 0)
{
if (throwOnTimeout) throw new UncheckedTimeoutException("Condition not met within 1 minute");
return false;
}
// await may block signal from triggering notify, so make sure not to block for more than 500ms
remainingMillis = Math.min(remainingMillis, 500);
synchronized (this)
{
try
{
this.wait(remainingMillis);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
}
return true;
}
public void signal()
{
signaled = true;
synchronized (this)
{
this.notify();
}
}
}
public static class BBStreamTimeoutHelper
{
@SuppressWarnings("unused")
public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall Callable<Integer> zuper) throws Exception
{
if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
{
State.STREAM_IS_RUNNING.signal();
State.UNBLOCK_STREAM.await();
}
// different context; pass through
return zuper.call();
}
@SuppressWarnings("unused")
public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception
{
if (isCaller(CassandraIncomingFile.class.getName(), "read")) // handles compressed and non-compressed
{
State.STREAM_IS_RUNNING.signal();
State.UNBLOCK_STREAM.await();
}
// different context; pass through
return zuper.call();
}
public static void install(ClassLoader classLoader, Integer num)
{
if (num != FAILING_NODE)
return;
new ByteBuddy().rebase(SequentialWriter.class)
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
.method(named("append").and(takesArguments(1)))
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}
}