blob: 05b55e801052faf5e2260498f0d8bc898fcfbe6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
public class Dispatcher
{
private static final LocalAwareExecutorService requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
DatabaseDescriptor::setNativeTransportMaxThreads,
"transport",
"Native-Transport-Requests");
private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
private final boolean useLegacyFlusher;
/**
* Takes a Channel, Request and the Response produced by processRequest and outputs a FlushItem
* appropriate for the pipeline, which is specific to the protocol version. V5 and above will
* produce FlushItem.Framed instances whereas earlier versions require FlushItem.Unframed.
* The instances of these FlushItem subclasses are specialized to release resources in the
* right way for the specific pipeline that produced them.
*/
// TODO parameterize with FlushItem subclass
interface FlushItemConverter
{
FlushItem<?> toFlushItem(Channel channel, Message.Request request, Message.Response response);
}
public Dispatcher(boolean useLegacyFlusher)
{
this.useLegacyFlusher = useLegacyFlusher;
}
public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher)
{
requestExecutor.submit(() -> processRequest(channel, request, forFlusher));
}
/**
* Note: this method may be executed on the netty event loop, during initial protocol negotiation
*/
static Message.Response processRequest(ServerConnection connection, Message.Request request)
{
long queryStartNanoTime = System.nanoTime();
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
ClientWarn.instance.captureWarnings();
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion());
Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
connection.requests.inc();
Message.Response response = request.execute(qstate, queryStartNanoTime);
response.setStreamId(request.getStreamId());
response.setWarnings(ClientWarn.instance.getWarnings());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);
return response;
}
/**
* Note: this method is not expected to execute on the netty event loop.
*/
void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher)
{
final Message.Response response;
final ServerConnection connection;
FlushItem<?> toFlush;
try
{
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection) request.connection();
response = processRequest(connection, request);
toFlush = forFlusher.toFlushItem(channel, request, response);
Message.logger.trace("Responding: {}, v={}", response, connection.getVersion());
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
error.setStreamId(request.getStreamId());
toFlush = forFlusher.toFlushItem(channel, request, error);
}
finally
{
ClientWarn.instance.resetWarnings();
}
flush(toFlush);
}
private void flush(FlushItem<?> item)
{
EventLoop loop = item.channel.eventLoop();
Flusher flusher = flusherLookup.get(loop);
if (flusher == null)
{
Flusher created = useLegacyFlusher ? Flusher.legacy(loop) : Flusher.immediate(loop);
Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
if (alt != null)
flusher = alt;
}
flusher.enqueue(item);
flusher.start();
}
public static void shutdown()
{
if (requestExecutor != null)
{
requestExecutor.shutdown();
}
}
/**
* Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy
* for delivering events to registered clients is dependent on protocol version and the configuration
* of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
* wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
* a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter)}.
* It's worth noting that events are not generally fired as a direct response to a client request,
* so this flush item has a null request attribute. The dispatcher itself is created when the
* pipeline is first configured during protocol negotiation and is attached to the channel for
* later retrieval.
*
* Pre-v5 connections simply write the EventMessage directly to the pipeline.
*/
static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");
Consumer<EventMessage> eventDispatcher(final Channel channel,
final ProtocolVersion version,
final FrameEncoder.PayloadAllocator allocator)
{
return eventMessage -> flush(new FlushItem.Framed(channel,
eventMessage.encode(version),
null,
allocator,
f -> f.response.release()));
}
}