blob: d43f6d5138e5d081afb4b20a3b3782c5633a342b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming.async;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.channel.ChannelPromise;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.TestChannel;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.utils.TimeUUID;
import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.TestChannel.REMOTE_ADDR;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
public class StreamingMultiplexedChannelTest
{
private NettyStreamingChannel streamingChannel;
private TestChannel channel;
private StreamSession session;
private StreamingMultiplexedChannel sender;
private StreamingMultiplexedChannel.FileStreamTask fileStreamTask;
@BeforeClass
public static void before()
{
DatabaseDescriptor.daemonInitialization();
}
@Before
public void setUp()
{
channel = new TestChannel();
streamingChannel = new NettyStreamingChannel(current_version, channel, StreamingChannel.Kind.CONTROL);
TimeUUID pendingRepair = nextTimeUUID();
session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, new NettyStreamingConnectionFactory(), streamingChannel, current_version, true, 0, pendingRepair, PreviewKind.ALL);
StreamResultFuture future = StreamResultFuture.createFollower(0, nextTimeUUID(), StreamOperation.REPAIR, REMOTE_ADDR, streamingChannel, current_version, pendingRepair, session.getPreviewKind());
session.init(future);
session.attachOutbound(streamingChannel);
sender = session.getChannel();
sender.setControlChannel(streamingChannel);
}
@After
public void tearDown()
{
if (fileStreamTask != null)
fileStreamTask.unsetChannel();
}
@Test
public void FileStreamTask_acquirePermit_closed()
{
fileStreamTask = sender.new FileStreamTask(null);
sender.setClosed();
Assert.assertFalse(fileStreamTask.acquirePermit(1));
}
@Test
public void FileStreamTask_acquirePermit_HapppyPath()
{
int permits = sender.semaphoreAvailablePermits();
fileStreamTask = sender.new FileStreamTask(null);
Assert.assertTrue(fileStreamTask.acquirePermit(1));
Assert.assertEquals(permits - 1, sender.semaphoreAvailablePermits());
}
@Test
public void FileStreamTask_BadChannelAttr()
{
int permits = sender.semaphoreAvailablePermits();
channel.attr(NettyStreamingChannel.TRANSFERRING_FILE_ATTR).set(Boolean.TRUE);
fileStreamTask = sender.new FileStreamTask(null);
fileStreamTask.injectChannel(streamingChannel);
fileStreamTask.run();
Assert.assertEquals(StreamSession.State.FAILED, session.state());
Assert.assertTrue(channel.releaseOutbound()); // when the session fails, it will send a SessionFailed msg
Assert.assertEquals(permits, sender.semaphoreAvailablePermits());
}
@Test
public void FileStreamTask_HappyPath()
{
int permits = sender.semaphoreAvailablePermits();
fileStreamTask = sender.new FileStreamTask(new CompleteMessage());
fileStreamTask.injectChannel(streamingChannel);
fileStreamTask.run();
Assert.assertNotEquals(StreamSession.State.FAILED, session.state());
Assert.assertTrue(channel.releaseOutbound());
Assert.assertEquals(permits, sender.semaphoreAvailablePermits());
}
@Test
public void onControlMessageComplete_HappyPath()
{
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(sender.connected());
ChannelPromise promise = channel.newPromise();
promise.setSuccess();
Assert.assertNull(sender.onMessageComplete(promise, new CompleteMessage()));
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(sender.connected());
Assert.assertNotEquals(StreamSession.State.FAILED, session.state());
}
@Test
public void onControlMessageComplete_Exception() throws InterruptedException, ExecutionException, TimeoutException
{
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(sender.connected());
ChannelPromise promise = channel.newPromise();
promise.setFailure(new RuntimeException("this is just a testing exception"));
Future f = sender.onMessageComplete(promise, new CompleteMessage());
f.get(5, TimeUnit.SECONDS);
Assert.assertFalse(channel.isOpen());
Assert.assertFalse(sender.connected());
Assert.assertEquals(StreamSession.State.FAILED, session.state());
}
}