blob: 2241ed423c43d727162d197612074254e316a51e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageFiltersTest
{
@Test
public void simpleInboundFiltersTest()
{
simpleFiltersTest(true);
}
@Test
public void simpleOutboundFiltersTest()
{
simpleFiltersTest(false);
}
private interface Permit
{
boolean test(int from, int to, IMessage msg);
}
private void simpleFiltersTest(boolean inbound)
{
int VERB1 = 1;
int VERB2 = 2;
int VERB3 = 3;
int i1 = 1;
int i2 = 2;
int i3 = 3;
String MSG1 = "msg1";
String MSG2 = "msg2";
MessageFilters filters = new MessageFilters();
Permit permit = inbound ? filters::permitInbound : filters::permitOutbound;
IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
filter.off();
Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
filters.reset();
filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
filters.reset();
AtomicInteger counter = new AtomicInteger();
filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return Arrays.equals(msg.bytes(), MSG1.getBytes());
}).drop();
Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 1);
Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
Assert.assertEquals(counter.get(), 2);
// filter chain gets interrupted because a higher level filter returns no match
Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 2);
Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
Assert.assertEquals(counter.get(), 2);
filters.reset();
filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
filters.reset();
counter.set(0);
filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return false;
}).drop();
Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(2, counter.get());
}
IMessage msg(int verb, String msg)
{
return new IMessage()
{
public int verb() { return verb; }
public byte[] bytes() { return msg.getBytes(); }
public int id() { return 0; }
public int version() { return 0; }
public InetSocketAddress from() { return null; }
public int fromPort()
{
return 0;
}
};
}
}