blob: 560fee9ad245f944b0dc9e20fa6066b5a238f16c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming.async;
import java.io.IOError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future; // checkstyle: permit this import
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.StreamDeserializingTask;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.Semaphore;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static com.google.common.base.Throwables.getRootCause;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.lang.System.getProperty;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
import static org.apache.cassandra.streaming.StreamSession.createLogTag;
import static org.apache.cassandra.streaming.messages.StreamMessage.serialize;
import static org.apache.cassandra.streaming.messages.StreamMessage.serializedSize;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
import static org.apache.cassandra.utils.JVMStabilityInspector.inspectThrowable;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
import static org.apache.cassandra.utils.concurrent.Semaphore.newFairSemaphore;
/**
* Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s
* for sending {@link OutgoingStreamMessage} instances; all other {@link StreamMessage} types are sent via
* a special control channel. The reason for this is to treat those messages carefully and not let them get stuck
* behind a stream transfer.
*
* One of the challenges when sending streams is we might need to delay shipping the stream if:
*
* - we've exceeded our network I/O use due to rate limiting (at the cassandra level)
* - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not
* move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory.
*
* When those conditions occur, it's easy enough to reschedule processing the stream once the resources pick up
* (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that
* no other messages are submitted to the same channel while the current stream is still being processed.
*/
public class StreamingMultiplexedChannel
{
private static final Logger logger = LoggerFactory.getLogger(StreamingMultiplexedChannel.class);
private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = getAvailableProcessors();
private static final int MAX_PARALLEL_TRANSFERS = parseInt(getProperty(PROPERTY_PREFIX + "streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
// a simple mechansim for allowing a degree of fairness across multiple sessions
private static final Semaphore fileTransferSemaphore = newFairSemaphore(DEFAULT_MAX_PARALLEL_TRANSFERS);
private final StreamingChannel.Factory factory;
private final InetAddressAndPort to;
private final StreamSession session;
private final int messagingVersion;
private volatile boolean closed;
/**
* A special {@link Channel} for sending non-stream streaming messages, basically anything that isn't an
* {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but a node doesn't send that, it's only received).
*/
private volatile StreamingChannel controlChannel;
// note: this really doesn't need to be a LBQ, just something that's thread safe
private final Collection<ScheduledFuture<?>> channelKeepAlives = newBlockingQueue();
private final ExecutorPlus fileTransferExecutor;
/**
* A mapping of each {@link #fileTransferExecutor} thread to a channel that can be written to (on that thread).
*/
private final ConcurrentMap<Thread, StreamingChannel> threadToChannelMap = new ConcurrentHashMap<>();
public StreamingMultiplexedChannel(StreamSession session, StreamingChannel.Factory factory, InetAddressAndPort to, @Nullable StreamingChannel controlChannel, int messagingVersion)
{
this.session = session;
this.factory = factory;
this.to = to;
this.messagingVersion = messagingVersion;
this.controlChannel = controlChannel;
String name = session.peer.toString().replace(':', '.');
fileTransferExecutor = executorFactory()
.configurePooled("NettyStreaming-Outbound-" + name, MAX_PARALLEL_TRANSFERS)
.withKeepAlive(1L, SECONDS).build();
}
public InetAddressAndPort peer()
{
return to;
}
public InetSocketAddress connectedTo()
{
return controlChannel == null ? to : controlChannel.connectedTo();
}
/**
* Used by initiator to setup control message channel connecting to follower
*/
private void setupControlMessageChannel() throws IOException
{
if (controlChannel == null)
{
/*
* Inbound handlers are needed:
* a) for initiator's control channel(the first outbound channel) to receive follower's message.
* b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files,
* in {@link Handler#setupStreamingPipeline}
*/
controlChannel = createControlChannel();
}
}
private StreamingChannel createControlChannel() throws IOException
{
logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
new StreamDeserializingTask(session, channel, messagingVersion));
session.attachInbound(channel);
session.attachOutbound(channel);
scheduleKeepAliveTask(channel);
logger.debug("Creating control {}", channel.description());
return channel;
}
private StreamingChannel createFileChannel(InetAddressAndPort connectTo) throws IOException
{
logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
StreamingChannel channel = factory.create(to, connectTo, messagingVersion, StreamingChannel.Kind.FILE);
session.attachOutbound(channel);
logger.debug("Creating file {}", channel.description());
return channel;
}
public Future<?> sendControlMessage(StreamMessage message)
{
try
{
setupControlMessageChannel();
return sendMessage(controlChannel, message);
}
catch (Exception e)
{
close();
session.onError(e);
return ImmediateFuture.failure(e);
}
}
public Future<?> sendMessage(StreamingChannel channel, StreamMessage message)
{
if (closed)
throw new RuntimeException("stream has been closed, cannot send " + message);
if (message instanceof OutgoingStreamMessage)
{
if (session.isPreview())
throw new RuntimeException("Cannot send stream data messages for preview streaming sessions");
if (logger.isDebugEnabled())
logger.debug("{} Sending {}", createLogTag(session), message);
InetAddressAndPort connectTo = factory.supportsPreferredIp() ? SystemKeyspace.getPreferredIP(to) : to;
return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) message, connectTo));
}
try
{
Future<?> promise = channel.send(outSupplier -> {
// we anticipate that the control messages are rather small, so allocating a ByteBuf shouldn't blow out of memory.
long messageSize = serializedSize(message, messagingVersion);
if (messageSize > 1 << 30)
{
throw new IllegalStateException(format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s",
createLogTag(session, controlChannel.id()), messageSize, message.type));
}
try (StreamingDataOutputPlus out = outSupplier.apply((int) messageSize))
{
StreamMessage.serialize(message, out, messagingVersion, session);
}
});
promise.addListener(future -> onMessageComplete(future, message));
return promise;
}
catch (Exception e)
{
close();
session.onError(e);
return ImmediateFuture.failure(e);
}
}
/**
* Decides what to do after a {@link StreamMessage} is processed.
*
* Note: this is called from the netty event loop.
*
* @return null if the message was processed successfully; else, a {@link java.util.concurrent.Future} to indicate
* the status of aborting any remaining tasks in the session.
*/
Future<?> onMessageComplete(Future<?> future, StreamMessage msg)
{
Throwable cause = future.cause();
if (cause == null)
return null;
Channel channel = future instanceof ChannelFuture ? ((ChannelFuture)future).channel() : null;
logger.error("{} failed to send a stream message/data to peer {}: msg = {}",
createLogTag(session, channel), to, msg, future.cause());
// StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try
// to send any failure messages
return session.onError(cause);
}
class FileStreamTask implements Runnable
{
/**
* Time interval, in minutes, to wait between logging a message indicating that we're waiting on a semaphore
* permit to become available.
*/
private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
/**
* Even though we expect only an {@link OutgoingStreamMessage} at runtime, the type here is {@link StreamMessage}
* to facilitate simpler testing.
*/
private final StreamMessage msg;
private final InetAddressAndPort connectTo;
FileStreamTask(OutgoingStreamMessage ofm, InetAddressAndPort connectTo)
{
this.msg = ofm;
this.connectTo = connectTo;
}
/**
* For testing purposes
*/
FileStreamTask(StreamMessage msg)
{
this.msg = msg;
this.connectTo = null;
}
@Override
public void run()
{
if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
return;
StreamingChannel channel = null;
try
{
channel = getOrCreateFileChannel(connectTo);
// close the DataOutputStreamPlus as we're done with it - but don't close the channel
try (StreamingDataOutputPlus out = channel.acquireOut())
{
serialize(msg, out, messagingVersion, session);
}
}
catch (Exception e)
{
session.onError(e);
}
catch (Throwable t)
{
if (closed && getRootCause(t) instanceof ClosedByInterruptException && fileTransferExecutor.isShutdown())
{
logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", createLogTag(session, channel));
}
else
{
inspectThrowable(t);
if (!session.state().isFinalState())
session.onError(t);
}
}
finally
{
fileTransferSemaphore.release(1);
}
}
boolean acquirePermit(int logInterval)
{
long logIntervalNanos = MINUTES.toNanos(logInterval);
long timeOfLastLogging = nanoTime();
while (true)
{
if (closed)
return false;
try
{
if (fileTransferSemaphore.tryAcquire(1, 1, SECONDS))
return true;
// log a helpful message to operators in case they are wondering why a given session might not be making progress.
long now = nanoTime();
if (now - timeOfLastLogging > logIntervalNanos)
{
timeOfLastLogging = now;
OutgoingStreamMessage ofm = (OutgoingStreamMessage)msg;
if (logger.isInfoEnabled())
logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes",
createLogTag(session), ofm.getName(), logInterval);
}
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
}
}
}
private StreamingChannel getOrCreateFileChannel(InetAddressAndPort connectTo)
{
Thread currentThread = currentThread();
try
{
StreamingChannel channel = threadToChannelMap.get(currentThread);
if (channel != null)
return channel;
channel = createFileChannel(connectTo);
threadToChannelMap.put(currentThread, channel);
return channel;
}
catch (Exception e)
{
throw new IOError(e);
}
}
/**
* For testing purposes
*/
void injectChannel(StreamingChannel channel)
{
Thread currentThread = currentThread();
if (threadToChannelMap.get(currentThread) != null)
throw new IllegalStateException("previous channel already set");
threadToChannelMap.put(currentThread, channel);
}
/**
* For testing purposes
*/
void unsetChannel()
{
threadToChannelMap.remove(currentThread());
}
}
/**
* Periodically sends the {@link KeepAliveMessage}.
* <p>
* NOTE: this task, and the callback function are executed in the netty event loop.
*/
class KeepAliveTask implements Runnable
{
private final StreamingChannel channel;
/**
* A reference to the scheduled task for this instance so that it may be cancelled.
*/
ScheduledFuture<?> future;
KeepAliveTask(StreamingChannel channel)
{
this.channel = channel;
}
@Override
public void run()
{
// if the channel has been closed, cancel the scheduled task and return
if (!channel.connected() || closed)
{
if (null != future)
future.cancel(false);
return;
}
if (logger.isTraceEnabled())
logger.trace("{} Sending keep-alive to {}.", createLogTag(session, channel), session.peer);
sendControlMessage(new KeepAliveMessage()).addListener(f ->
{
if (f.isSuccess() || f.isCancelled())
return;
if (logger.isDebugEnabled())
logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).",
createLogTag(session, channel), f.cause());
});
}
}
private void scheduleKeepAliveTask(StreamingChannel channel)
{
if (!(channel instanceof NettyStreamingChannel))
return;
int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
if (keepAlivePeriod <= 0)
return;
if (logger.isDebugEnabled())
logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(session, channel), keepAlivePeriod);
KeepAliveTask task = new KeepAliveTask(channel);
ScheduledFuture<?> scheduledFuture =
((NettyStreamingChannel)channel).channel
.eventLoop()
.scheduleAtFixedRate(task, keepAlivePeriod, keepAlivePeriod, TimeUnit.SECONDS);
task.future = scheduledFuture;
channelKeepAlives.add(scheduledFuture);
}
/**
* For testing purposes only.
*/
public void setClosed()
{
closed = true;
}
void setControlChannel(NettyStreamingChannel channel)
{
controlChannel = channel;
}
int semaphoreAvailablePermits()
{
return fileTransferSemaphore.permits();
}
public boolean connected()
{
return !closed && (controlChannel == null || controlChannel.connected());
}
public void close()
{
if (closed)
return;
closed = true;
if (logger.isDebugEnabled())
logger.debug("{} Closing stream connection channels on {}", createLogTag(session), to);
for (ScheduledFuture<?> future : channelKeepAlives)
future.cancel(false);
channelKeepAlives.clear();
threadToChannelMap.values().forEach(StreamingChannel::close);
threadToChannelMap.clear();
fileTransferExecutor.shutdownNow();
}
}