blob: 16eb44054014b63234d5f71a9ae6cfff61148eb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import org.slf4j.LoggerFactory;
import net.openhft.chronicle.core.util.ThrowingConsumer;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.NoSpamLogger;
/**
* A message sink that all inbound messages go through.
*
* Default sink used by {@link MessagingService} is {@link IVerbHandler#doVerb(Message)}, but it can be overridden
* to filter out certain messages, record the fact of attempted delivery, or delay arrival.
*
* This facility is most useful for test code.
*
* {@link #accept(Message)} is invoked on a thread belonging to the {@link org.apache.cassandra.concurrent.Stage}
* assigned to the {@link Verb} of the message.
*/
public class InboundSink implements InboundMessageHandlers.MessageConsumer
{
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(LoggerFactory.getLogger(InboundSink.class), 1L, TimeUnit.SECONDS);
private static class Filtered implements ThrowingConsumer<Message<?>, IOException>
{
final Predicate<Message<?>> condition;
final ThrowingConsumer<Message<?>, IOException> next;
private Filtered(Predicate<Message<?>> condition, ThrowingConsumer<Message<?>, IOException> next)
{
this.condition = condition;
this.next = next;
}
public void accept(Message<?> message) throws IOException
{
if (condition.test(message))
next.accept(message);
}
}
@SuppressWarnings("FieldMayBeFinal")
private volatile ThrowingConsumer<Message<?>, IOException> sink;
private static final AtomicReferenceFieldUpdater<InboundSink, ThrowingConsumer> sinkUpdater
= AtomicReferenceFieldUpdater.newUpdater(InboundSink.class, ThrowingConsumer.class, "sink");
private final MessagingService messaging;
InboundSink(MessagingService messaging)
{
this.messaging = messaging;
this.sink = message -> message.header.verb.handler().doVerb((Message<Object>) message);
}
public void fail(Message.Header header, Throwable failure)
{
if (header.callBackOnFailure())
{
InetAddressAndPort to = header.respondTo() != null ? header.respondTo() : header.from;
Message<RequestFailureReason> response = Message.failureResponse(header.id,
header.expiresAtNanos,
RequestFailureReason.forException(failure));
messaging.send(response, to);
}
}
public void accept(Message<?> message)
{
try
{
sink.accept(message);
}
catch (Throwable t)
{
fail(message.header, t);
if (t instanceof TombstoneOverwhelmingException || t instanceof IndexNotAvailableException)
noSpamLogger.error(t.getMessage());
else if (t instanceof RuntimeException)
throw (RuntimeException) t;
else
throw new RuntimeException(t);
}
}
public void add(Predicate<Message<?>> allow)
{
sinkUpdater.updateAndGet(this, sink -> new Filtered(allow, sink));
}
public void remove(Predicate<Message<?>> allow)
{
sinkUpdater.updateAndGet(this, sink -> without(sink, allow));
}
public void clear()
{
sinkUpdater.updateAndGet(this, InboundSink::clear);
}
@Deprecated // TODO: this is not the correct way to do things
public boolean allow(Message<?> message)
{
return allows(sink, message);
}
private static ThrowingConsumer<Message<?>, IOException> clear(ThrowingConsumer<Message<?>, IOException> sink)
{
while (sink instanceof Filtered)
sink = ((Filtered) sink).next;
return sink;
}
private static ThrowingConsumer<Message<?>, IOException> without(ThrowingConsumer<Message<?>, IOException> sink, Predicate<Message<?>> condition)
{
if (!(sink instanceof Filtered))
return sink;
Filtered filtered = (Filtered) sink;
ThrowingConsumer<Message<?>, IOException> next = without(filtered.next, condition);
return condition.equals(filtered.condition) ? next
: next == filtered.next
? sink
: new Filtered(filtered.condition, next);
}
private static boolean allows(ThrowingConsumer<Message<?>, IOException> sink, Message<?> message)
{
while (sink instanceof Filtered)
{
Filtered filtered = (Filtered) sink;
if (!filtered.condition.test(message))
return false;
sink = filtered.next;
}
return true;
}
}