blob: e170ca60ef6c6af2100552e4cacac4c0fd0f4aa9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.FileStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future; //checkstyle: permit this import
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.async.StreamingMultiplexedChannel;
import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_STREAMING_DEBUG_STACKTRACE_LIMIT;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.locator.InetAddressAndPort.hostAddressAndPort;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
/**
* Handles the streaming a one or more streams to and from a specific remote node.
*<p/>
* Both this node and the remote one will create a similar symmetrical {@link StreamSession}. A streaming
* session has the following life-cycle:
*<pre>
* 1. Session Initialization
*
* (a) A node (the initiator in the following) create a new {@link StreamSession},
* initialize it {@link #init(StreamResultFuture)}, and then start it ({@link #start()}).
* Starting a session causes a {@link StreamInitMessage} to be sent.
* (b) Upon reception of that {@link StreamInitMessage}, the follower creates its own {@link StreamSession},
* and initializes it if it still does not exist.
* (c) After the initiator sends the {@link StreamInitMessage}, it invokes
* {@link StreamSession#onInitializationComplete()} to start the streaming prepare phase.
*
* 2. Streaming preparation phase
*
* (a) A {@link PrepareSynMessage} is sent that includes a) what files/sections this node will stream to the follower
* (stored locally in a {@link StreamTransferTask}, one for each table) and b) what the follower needs to
* stream back (stored locally in a {@link StreamReceiveTask}, one for each table).
* (b) Upon reception of the {@link PrepareSynMessage}, the follower records which files/sections it will receive
* and send back a {@link PrepareSynAckMessage}, which contains a summary of the files/sections that will be sent to
* the initiator.
* (c) When the initiator receives the {@link PrepareSynAckMessage}, it records which files/sections it will
* receive, and then goes to it's Streaming phase (see next section). If the intiator is to receive files,
* it sends a {@link PrepareAckMessage} to the follower to indicate that it can start streaming to the initiator.
* (d) (Optional) If the follower receives a {@link PrepareAckMessage}, it enters it's Streaming phase.
*
* 3. Streaming phase
*
* (a) The streaming phase is started at each node by calling {@link StreamSession#startStreamingFiles(boolean)}.
* This will send, sequentially on each outbound streaming connection (see {@link StreamingMultiplexedChannel}),
* an {@link OutgoingStreamMessage} for each stream in each of the {@link StreamTransferTask}.
* Each {@link OutgoingStreamMessage} consists of a {@link StreamMessageHeader} that contains metadata about
* the stream, followed by the stream content itself. Once all the files for a {@link StreamTransferTask} are sent,
* the task is marked complete {@link StreamTransferTask#complete(int)}.
* (b) On the receiving side, the incoming data is written to disk, and once the stream is fully received,
* it will be marked as complete ({@link StreamReceiveTask#received(IncomingStream)}). When all streams
* for the {@link StreamReceiveTask} have been received, the data is added to the CFS (and 2ndary indexes/MV are built),
* and the task is marked complete ({@link #taskCompleted(StreamReceiveTask)}).
* (b) If during the streaming of a particular stream an error occurs on the receiving end of a stream
* (it may be either the initiator or the follower), the node will send a {@link SessionFailedMessage}
* to the sender and close the stream session.
* (c) When all transfer and receive tasks for a session are complete, the session moves to the Completion phase
* ({@link #maybeCompleted()}).
*
* 4. Completion phase
*
* (a) When the initiator finishes streaming, it enters the {@link StreamSession.State#WAIT_COMPLETE} state, and waits
* for the follower to send a {@link CompleteMessage} once it finishes streaming too. Once the {@link CompleteMessage}
* is received, initiator sets its own state to {@link StreamSession.State#COMPLETE} and closes all channels attached
* to this session.
*
* </pre>
*
* In brief, the message passing looks like this (I for initiator, F for follwer):
* <pre>
* (session init)
* I: StreamInitMessage
* (session prepare)
* I: PrepareSynMessage
* F: PrepareSynAckMessage
* I: PrepareAckMessage
* (stream - this can happen in both directions)
* I: OutgoingStreamMessage
* F: ReceivedMessage
* (completion)
* F: CompleteMessage
*</pre>
*
* All messages which derive from {@link StreamMessage} are sent by the standard internode messaging
* (via {@link org.apache.cassandra.net.MessagingService}, while the actual files themselves are sent by a special
* "streaming" connection type. See {@link StreamingMultiplexedChannel} for details. Because of the asynchronous
*/
public class StreamSession
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
private static final int DEBUG_STACKTRACE_LIMIT = CASSANDRA_STREAMING_DEBUG_STACKTRACE_LIMIT.getInt();
public enum PrepareDirection { SEND, ACK }
// for test purpose to record received message and state transition
public volatile static MessageStateSink sink = MessageStateSink.NONE;
private final StreamOperation streamOperation;
/**
* Streaming endpoint.
*
* Each {@code StreamSession} is identified by this InetAddressAndPort which is broadcast address of the node streaming.
*/
public final InetAddressAndPort peer;
private final int index;
// should not be null when session is started
private StreamResultFuture streamResult;
// stream requests to send to the peer
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
@VisibleForTesting
protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
private final boolean isFollower;
private final StreamingMultiplexedChannel channel;
// contains both inbound and outbound channels
private final ConcurrentMap<Object, StreamingChannel> inbound = new ConcurrentHashMap<>();
private final ConcurrentMap<Object, StreamingChannel> outbound = new ConcurrentHashMap<>();
// "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
// threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
// receiving peer's CompleteMessage.
private boolean maybeCompleted = false;
private Future<?> closeFuture;
private final TimeUUID pendingRepair;
private final PreviewKind previewKind;
public String failureReason;
/**
* State Transition:
*
* <pre>
* +------------------+-----> FAILED | ABORTED <---------------+
* | | ^ |
* | | | initiator |
* INITIALIZED --> PREPARING --> STREAMING ------------> WAIT_COMPLETE ----> COMPLETED
* | | | ^ ^
* | | | follower | |
* | | +-------------------------)-----------------+
* | | | |
* | | if preview | |
* | +----------------------------------------+ |
* | nothing to request or to transfer |
* +-----------------------------------------------------------------------------+
* nothing to request or to transfer
*
* </pre>
*/
public enum State
{
INITIALIZED(false),
PREPARING(false),
STREAMING(false),
WAIT_COMPLETE(false),
COMPLETE(true),
FAILED(true),
ABORTED(true);
private final boolean finalState;
State(boolean finalState)
{
this.finalState = finalState;
}
/**
* @return true if current state is final, either COMPLETE, FAILED, or ABORTED.
*/
public boolean isFinalState()
{
return finalState;
}
}
private volatile State state = State.INITIALIZED;
/**
* Create new streaming session with the peer.
*/
public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamingChannel.Factory factory, @Nullable StreamingChannel controlChannel, int messagingVersion,
boolean isFollower, int index, TimeUUID pendingRepair, PreviewKind previewKind)
{
this.streamOperation = streamOperation;
this.peer = peer;
this.isFollower = isFollower;
this.index = index;
this.channel = new StreamingMultiplexedChannel(this, factory, peer, controlChannel, messagingVersion);
this.metrics = StreamingMetrics.get(peer);
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
}
public boolean isFollower()
{
return isFollower;
}
public TimeUUID planId()
{
return streamResult == null ? null : streamResult.planId;
}
public int sessionIndex()
{
return index;
}
public StreamOperation streamOperation()
{
if (streamResult == null)
{
logger.warn("StreamResultFuture not initialized {} {}", channel.connectedTo(), isFollower ? "follower" : "initiator");
return null;
}
else
{
return streamResult.streamOperation;
}
}
public StreamOperation getStreamOperation()
{
return streamOperation;
}
public TimeUUID getPendingRepair()
{
return pendingRepair;
}
public boolean isPreview()
{
return previewKind.isPreview();
}
public PreviewKind getPreviewKind()
{
return previewKind;
}
public StreamReceiver getAggregator(TableId tableId)
{
assert receivers.containsKey(tableId) : "Missing tableId " + tableId;
return receivers.get(tableId).getReceiver();
}
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
*
* @param streamResult result to report to
*/
public void init(StreamResultFuture streamResult)
{
this.streamResult = streamResult;
StreamHook.instance.reportStreamFuture(this, streamResult);
}
/**
* Attach a channel to this session upon receiving the first inbound message.
*
* @param channel The channel to attach.
* @return False if the channel was already attached, true otherwise.
*/
public synchronized boolean attachInbound(StreamingChannel channel)
{
failIfFinished();
boolean attached = inbound.putIfAbsent(channel.id(), channel) == null;
if (attached)
channel.onClose(() -> {
if (null != inbound.remove(channel.id()) && inbound.isEmpty())
this.channel.close();
});
return attached;
}
/**
* Attach a channel to this session upon sending the first outbound message.
*
* @param channel The channel to attach.
* @return False if the channel was already attached, true otherwise.
*/
public synchronized boolean attachOutbound(StreamingChannel channel)
{
failIfFinished();
boolean attached = outbound.putIfAbsent(channel.id(), channel) == null;
if (attached)
channel.onClose(() -> outbound.remove(channel.id()));
return attached;
}
/**
* invoked by the node that begins the stream session (it may be sending files, receiving files, or both)
*/
public void start()
{
if (requests.isEmpty() && transfers.isEmpty())
{
logger.info("[Stream #{}] Session does not have any tasks.", planId());
closeSession(State.COMPLETE);
return;
}
try
{
logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
hostAddressAndPort(channel.peer()),
channel.connectedTo().equals(channel.peer()) ? "" : " through " + hostAddressAndPort(channel.connectedTo()));
StreamInitMessage message = new StreamInitMessage(getBroadcastAddressAndPort(),
sessionIndex(),
planId(),
streamOperation(),
getPendingRepair(),
getPreviewKind());
channel.sendControlMessage(message).sync();
onInitializationComplete();
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
onError(e);
}
}
/**
* Request data fetch task to this session.
*
* Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
* and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
*
* @param keyspace Requesting keyspace
* @param fullRanges Ranges to retrieve data that will return full data from the source
* @param transientRanges Ranges to retrieve data that will return transient data from the source
* @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
*/
public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
}
/**
* Set up transfer for specific keyspace/ranges/CFs
*
* @param keyspace Transfer keyspace
* @param replicas Transfer ranges
* @param columnFamilies Transfer ColumnFamilies
* @param flushTables flush tables?
*/
synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables)
{
failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
if (flushTables)
flushSSTables(stores);
//Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have?
//Do we need to unwrap here also or is that just making it worse?
//Range and if it's transient
RangesAtEndpoint unwrappedRanges = replicas.unwrap();
List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges, stores, pendingRepair, previewKind);
addTransferStreams(streams);
Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace);
if (toBeUpdated == null)
{
toBeUpdated = new HashSet<>();
}
toBeUpdated.addAll(replicas.ranges());
transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
}
private void failIfFinished()
{
if (state().isFinalState())
throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
}
private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
{
Collection<ColumnFamilyStore> stores = new HashSet<>();
// if columnfamilies are not specified, we add all cf under the keyspace
if (columnFamilies.isEmpty())
{
stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
}
else
{
for (String cf : columnFamilies)
stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
}
return stores;
}
@VisibleForTesting
public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, TimeUUID pendingRepair, PreviewKind previewKind)
{
List<OutgoingStream> streams = new ArrayList<>();
try
{
for (ColumnFamilyStore cfs: stores)
{
streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind));
}
}
catch (Throwable t)
{
streams.forEach(OutgoingStream::finish);
throw t;
}
return streams;
}
synchronized void addTransferStreams(Collection<OutgoingStream> streams)
{
failIfFinished();
for (OutgoingStream stream: streams)
{
TableId tableId = stream.getTableId();
StreamTransferTask task = transfers.get(tableId);
if (task == null)
{
//guarantee atomicity
StreamTransferTask newTask = new StreamTransferTask(this, tableId);
task = transfers.putIfAbsent(tableId, newTask);
if (task == null)
task = newTask;
}
task.addTransferStream(stream);
}
}
private Future<?> closeSession(State finalState)
{
return closeSession(finalState, null);
}
private synchronized Future<?> closeSession(State finalState, String failureReason)
{
// it's session is already closed
if (closeFuture != null)
return closeFuture;
state(finalState);
//this refers to StreamInfo
this.failureReason = failureReason;
List<Future<?>> futures = new ArrayList<>();
// ensure aborting the tasks do not happen on the network IO thread (read: netty event loop)
// as we don't want any blocking disk IO to stop the network thread
if (finalState == State.FAILED || finalState == State.ABORTED)
futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
// Channels should only be closed by the initiator; but, if this session closed
// due to failure, channels should be always closed regardless, even if this is not the initator.
if (!isFollower || state != State.COMPLETE)
{
logger.debug("[Stream #{}] Will close attached inbound {} and outbound {} channels", planId(), inbound, outbound);
inbound.values().forEach(channel -> futures.add(channel.close()));
outbound.values().forEach(channel -> futures.add(channel.close()));
}
sink.onClose(peer);
streamResult.handleSessionComplete(this);
closeFuture = FutureCombiner.allOf(futures);
return closeFuture;
}
private void abortTasks()
{
try
{
receivers.values().forEach(StreamReceiveTask::abort);
transfers.values().forEach(StreamTransferTask::abort);
}
catch (Exception e)
{
logger.warn("[Stream #{}] failed to abort some streaming tasks", planId(), e);
}
}
/**
* Set current state to {@code newState}.
*
* @param newState new state to set
*/
public void state(State newState)
{
if (logger.isDebugEnabled())
logger.debug("[Stream #{}] Changing session state from {} to {}", planId(), state, newState);
sink.recordState(peer, newState);
state = newState;
}
/**
* @return current state
*/
public State state()
{
return state;
}
public StreamingMultiplexedChannel getChannel()
{
return channel;
}
/**
* Return if this session completed successfully.
*
* @return true if session completed successfully.
*/
public boolean isSuccess()
{
return state == State.COMPLETE;
}
public synchronized void messageReceived(StreamMessage message)
{
if (message.type != StreamMessage.Type.KEEP_ALIVE)
failIfFinished();
sink.recordMessage(peer, message.type);
switch (message.type)
{
case STREAM_INIT:
// at follower, nop
break;
case PREPARE_SYN:
// at follower
PrepareSynMessage msg = (PrepareSynMessage) message;
prepare(msg.requests, msg.summaries);
break;
case PREPARE_SYNACK:
// at initiator
prepareSynAck((PrepareSynAckMessage) message);
break;
case PREPARE_ACK:
// at follower
prepareAck((PrepareAckMessage) message);
break;
case STREAM:
receive((IncomingStreamMessage) message);
break;
case RECEIVED:
ReceivedMessage received = (ReceivedMessage) message;
received(received.tableId, received.sequenceNumber);
break;
case COMPLETE:
// at initiator
complete();
break;
case KEEP_ALIVE:
// NOP - we only send/receive the KEEP_ALIVE to force the TCP connection to remain open
break;
case SESSION_FAILED:
sessionFailed();
break;
default:
throw new AssertionError("unhandled StreamMessage type: " + message.getClass().getName());
}
}
/**
* Call back when connection initialization is complete to start the prepare phase.
*/
public void onInitializationComplete()
{
// send prepare message
state(State.PREPARING);
PrepareSynMessage prepare = new PrepareSynMessage();
prepare.requests.addAll(requests);
for (StreamTransferTask task : transfers.values())
{
prepare.summaries.add(task.getSummary());
}
channel.sendControlMessage(prepare).syncUninterruptibly();
}
/**
* Signal an error to this stream session: if it's an EOF exception, it tries to understand if the socket was closed
* after completion or because the peer was down, otherwise sends a {@link SessionFailedMessage} and closes
* the session as {@link State#FAILED}.
*/
public Future<?> onError(Throwable e)
{
boolean isEofException = e instanceof EOFException || e instanceof ClosedChannelException;
if (isEofException)
{
State state = this.state;
if (state.finalState)
{
logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state);
return null;
}
else
{
logger.error("[Stream #{}] Socket closed before session completion, peer {} is probably down.",
planId(),
peer.getHostAddressAndPort(),
e);
return closeSession(State.FAILED, "Failed because there was an " + e.getClass().getCanonicalName() + " with state=" + state.name());
}
}
logError(e);
if (channel.connected())
{
state(State.FAILED); // make sure subsequent error handling sees the session in a final state
channel.sendControlMessage(new SessionFailedMessage()).awaitUninterruptibly();
}
StringBuilder failureReason = new StringBuilder("Failed because of an unknown exception\n");
boundStackTrace(e, DEBUG_STACKTRACE_LIMIT, failureReason);
return closeSession(State.FAILED, failureReason.toString());
}
private void logError(Throwable e)
{
if (e instanceof SocketTimeoutException)
{
logger.error("[Stream #{}] Timeout from peer {}{}. Is peer down? " +
"If not, and earlier failure detection is required enable (or lower) streaming_keep_alive_period.",
planId(),
hostAddressAndPort(channel.peer()),
channel.peer().equals(channel.connectedTo()) ? "" : " through " + hostAddressAndPort(channel.connectedTo()),
e);
}
else
{
logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(),
hostAddressAndPort(channel.peer()),
channel.peer().equals(channel.connectedTo()) ? "" : " through " + hostAddressAndPort(channel.connectedTo()),
e);
}
}
/**
* Prepare this session for sending/receiving files.
*/
public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
{
// prepare tasks
state(State.PREPARING);
ScheduledExecutors.nonPeriodicTasks.execute(() -> {
try
{
prepareAsync(requests, summaries);
}
catch (Exception e)
{
onError(e);
}
});
}
public void countStreamedIn(boolean isEntireSSTable)
{
metrics.countStreamedIn(isEntireSSTable);
}
/**
* Finish preparing the session. This method is blocking (memtables are flushed in {@link #addTransferRanges}),
* so the logic should not execute on the main IO thread (read: netty event loop).
*/
private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
{
if (StreamOperation.REPAIR == streamOperation())
checkAvailableDiskSpaceAndCompactions(summaries);
for (StreamRequest request : requests)
addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request
for (StreamSummary summary : summaries)
prepareReceiving(summary);
PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
if (!peer.equals(FBUtilities.getBroadcastAddressAndPort()))
for (StreamTransferTask task : transfers.values())
prepareSynAck.summaries.add(task.getSummary());
streamResult.handleSessionPrepared(this, PrepareDirection.SEND);
// After sending the message the initiator can close the channel which will cause a ClosedChannelException
// in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
// the session. To avoid a race condition between sending and setting state, make sure to update the state
// before sending the message (without closing the channel)
// see CASSANDRA-17116
if (isPreview())
state(State.COMPLETE);
channel.sendControlMessage(prepareSynAck).syncUninterruptibly();
if (isPreview())
completePreview();
else
maybeCompleted();
}
private void prepareSynAck(PrepareSynAckMessage msg)
{
if (StreamOperation.REPAIR == streamOperation())
checkAvailableDiskSpaceAndCompactions(msg.summaries);
if (!msg.summaries.isEmpty())
{
for (StreamSummary summary : msg.summaries)
prepareReceiving(summary);
// only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files
if (!isPreview())
channel.sendControlMessage(new PrepareAckMessage()).syncUninterruptibly();
}
if (isPreview())
completePreview();
else
startStreamingFiles(PrepareDirection.ACK);
}
private void prepareAck(PrepareAckMessage msg)
{
if (isPreview())
throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", planId()));
startStreamingFiles(PrepareDirection.ACK);
}
/**
* In the case where we have an error checking disk space we allow the Operation to continue.
* In the case where we do _not_ have available space, this method raises a RTE.
* TODO: Consider revising this to returning a boolean and allowing callers upstream to handle that.
*/
private void checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries)
{
if (DatabaseDescriptor.getSkipStreamDiskSpaceCheck())
return;
boolean hasAvailableSpace = true;
try
{
hasAvailableSpace = checkAvailableDiskSpaceAndCompactions(summaries, planId(), peer.getHostAddress(true), pendingRepair != null);
}
catch (Exception e)
{
logger.error("[Stream #{}] Could not check available disk space and compactions for {}, summaries = {}", planId(), this, summaries, e);
}
if (!hasAvailableSpace)
throw new RuntimeException(String.format("Not enough disk space for stream %s), summaries=%s", this, summaries));
}
/**
* Makes sure that we expect to have enough disk space available for the new streams, taking into consideration
* the ongoing compactions and streams.
*/
@VisibleForTesting
public static boolean checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries,
@Nullable TimeUUID planId,
@Nullable String remoteAddress,
boolean isForIncremental)
{
Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
Map<TableId, Integer> perTableIdIncomingFiles = new HashMap<>();
long newStreamTotal = 0;
for (StreamSummary summary : summaries)
{
perTableIdIncomingFiles.merge(summary.tableId, summary.files, Integer::sum);
perTableIdIncomingBytes.merge(summary.tableId, summary.totalSize, Long::sum);
newStreamTotal += summary.totalSize;
}
if (perTableIdIncomingBytes.isEmpty() || newStreamTotal == 0)
return true;
return checkDiskSpace(perTableIdIncomingBytes, planId, Directories::getFileStore) &&
checkPendingCompactions(perTableIdIncomingBytes, perTableIdIncomingFiles, planId, remoteAddress, isForIncremental, newStreamTotal);
}
@VisibleForTesting
static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
TimeUUID planId,
Function<File, FileStore> fileStoreMapper)
{
Map<FileStore, Long> newStreamBytesToWritePerFileStore = new HashMap<>();
Set<FileStore> allFileStores = new HashSet<>();
// Sum up the incoming bytes per file store - we assume that the stream is evenly distributed over the writable
// file stores for the table.
for (Map.Entry<TableId, Long> entry : perTableIdIncomingBytes.entrySet())
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(entry.getKey());
if (cfs == null || perTableIdIncomingBytes.get(entry.getKey()) == 0)
continue;
Set<FileStore> allWriteableFileStores = cfs.getDirectories().allFileStores(fileStoreMapper);
if (allWriteableFileStores.isEmpty())
{
logger.error("[Stream #{}] Could not get any writeable FileStores for {}.{}", planId, cfs.keyspace.getName(), cfs.getTableName());
continue;
}
allFileStores.addAll(allWriteableFileStores);
long totalBytesInPerFileStore = entry.getValue() / allWriteableFileStores.size();
for (FileStore fs : allWriteableFileStores)
newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
}
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
fileStoreMapper);
long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
Map<FileStore, Long> allWriteData = new HashMap<>();
for (Map.Entry<FileStore, Long> fsBytes : newStreamBytesToWritePerFileStore.entrySet())
allWriteData.put(fsBytes.getKey(), fsBytes.getValue() +
totalBytesStreamRemainingPerFileStore +
totalCompactionWriteRemaining.getOrDefault(fsBytes.getKey(), 0L));
if (!Directories.hasDiskSpaceForCompactionsAndStreams(allWriteData))
{
logger.error("[Stream #{}] Not enough disk space to stream {} to {} (stream ongoing remaining={}, compaction ongoing remaining={}, all ongoing writes={})",
planId,
newStreamBytesToWritePerFileStore,
perTableIdIncomingBytes.keySet().stream()
.map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull)
.map(cfs -> cfs.keyspace.getName() + '.' + cfs.name)
.collect(Collectors.joining(",")),
totalStreamRemaining,
totalCompactionWriteRemaining,
allWriteData);
return false;
}
return true;
}
@VisibleForTesting
static boolean checkPendingCompactions(Map<TableId, Long> perTableIdIncomingBytes,
Map<TableId, Integer> perTableIdIncomingFiles,
TimeUUID planId, String remoteAddress,
boolean isForIncremental,
long newStreamTotal)
{
int pendingCompactionsBeforeStreaming = 0;
int pendingCompactionsAfterStreaming = 0;
List<String> tables = new ArrayList<>(perTableIdIncomingFiles.size());
for (Keyspace ks : Keyspace.all())
{
Map<ColumnFamilyStore, TableId> cfStreamed = perTableIdIncomingBytes.keySet().stream()
.filter(ks::hasColumnFamilyStore)
.collect(Collectors.toMap(ks::getColumnFamilyStore, Function.identity()));
for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
{
CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
int tasksOther = csm.getEstimatedRemainingTasks();
int tasksStreamed = tasksOther;
if (cfStreamed.containsKey(cfs))
{
TableId tableId = cfStreamed.get(cfs);
tasksStreamed = csm.getEstimatedRemainingTasks(perTableIdIncomingFiles.get(tableId),
perTableIdIncomingBytes.get(tableId),
isForIncremental);
tables.add(String.format("%s.%s", cfs.keyspace.getName(), cfs.name));
}
pendingCompactionsBeforeStreaming += tasksOther;
pendingCompactionsAfterStreaming += tasksStreamed;
}
}
Collections.sort(tables);
int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
if (pendingCompactionsAfterStreaming > pendingThreshold)
{
logger.error("[Stream #{}] Rejecting incoming files based on pending compactions calculation " +
"pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={}",
planId, pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress);
return false;
}
long newStreamFiles = perTableIdIncomingFiles.values().stream().mapToInt(i -> i).sum();
logger.info("[Stream #{}] Accepting incoming files newStreamTotalSSTables={} newStreamTotalBytes={} " +
"pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={} " +
"streamedTables=\"{}\"",
planId, newStreamFiles, newStreamTotal,
pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress,
String.join(",", tables));
return true;
}
/**
* Call back after sending StreamMessageHeader.
*
* @param message sent stream message
*/
public void streamSent(OutgoingStreamMessage message)
{
long headerSize = message.stream.getEstimatedSize();
StreamingMetrics.totalOutgoingBytes.inc(headerSize);
metrics.outgoingBytes.inc(headerSize);
if(StreamOperation.REPAIR == getStreamOperation())
{
StreamingMetrics.totalOutgoingRepairBytes.inc(headerSize);
StreamingMetrics.totalOutgoingRepairSSTables.inc(message.stream.getNumFiles());
}
// schedule timeout for receiving ACK
StreamTransferTask task = transfers.get(message.header.tableId);
if (task != null)
{
task.scheduleTimeout(message.header.sequenceNumber, DatabaseDescriptor.getStreamTransferTaskTimeout().toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
/**
* Call back after receiving a stream.
*
* @param message received stream
*/
public void receive(IncomingStreamMessage message)
{
if (isPreview())
{
throw new RuntimeException(String.format("[Stream #%s] Cannot receive files for preview session", planId()));
}
long headerSize = message.stream.getSize();
StreamingMetrics.totalIncomingBytes.inc(headerSize);
metrics.incomingBytes.inc(headerSize);
// send back file received message
channel.sendControlMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber)).syncUninterruptibly();
StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
long receivedStartNanos = nanoTime();
try
{
receivers.get(message.header.tableId).received(message.stream);
}
finally
{
long latencyNanos = nanoTime() - receivedStartNanos;
metrics.incomingProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
if (timeout > 0 && latencyMs > timeout)
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN,
1, TimeUnit.MINUTES,
"The time taken ({} ms) for processing the incoming stream message ({})" +
" exceeded internode streaming TCP user timeout ({} ms).\n" +
"The streaming connection might be closed due to tcp user timeout.\n" +
"Try to increase the internode_streaming_tcp_user_timeout" +
" or set it to 0 to use system defaults.",
latencyMs, message, timeout);
}
}
public void progress(String filename, ProgressInfo.Direction direction, long bytes, long delta, long total)
{
if (delta < 0)
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
"[id={}, key={{}, {}, {})] Stream event reported a negative delta ({})",
planId(), peer, filename, direction, delta);
ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, delta, total);
streamResult.handleProgress(progress);
}
public void received(TableId tableId, int sequenceNumber)
{
transfers.get(tableId).complete(sequenceNumber);
}
/**
* Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message.
*/
public synchronized void complete()
{
logger.debug("[Stream #{}] handling Complete message, state = {}", planId(), state);
if (!isFollower) // initiator
{
initiatorCompleteOrWait();
}
else // follower
{
// pre-4.0 nodes should not be connected via streaming, see {@link MessagingService#accept_streaming}
throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", planId()));
}
}
/**
* Synchronize both {@link #complete()} and {@link #maybeCompleted()} to avoid racing
*/
private synchronized boolean maybeCompleted()
{
if (!(receivers.isEmpty() && transfers.isEmpty()))
return false;
// if already executed once, skip it
if (maybeCompleted)
return true;
maybeCompleted = true;
if (!isFollower) // initiator
{
initiatorCompleteOrWait();
}
else // follower
{
// After sending the message the initiator can close the channel which will cause a ClosedChannelException
// in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
// the session. To avoid a race condition between sending and setting state, make sure to update the state
// before sending the message (without closing the channel)
// see CASSANDRA-17116
state(State.COMPLETE);
channel.sendControlMessage(new CompleteMessage()).syncUninterruptibly();
closeSession(State.COMPLETE);
}
return true;
}
private void initiatorCompleteOrWait()
{
// This is called when coordination completes AND when COMPLETE message is seen; it is possible that the
// COMPLETE method is seen first!
if (state == State.WAIT_COMPLETE)
closeSession(State.COMPLETE);
else
state(State.WAIT_COMPLETE);
}
/**
* Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
*/
public synchronized void sessionFailed()
{
logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.toString());
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Remote peer ").append(peer).append(" failed stream session");
closeSession(State.FAILED, stringBuilder.toString());
}
/**
* Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
*/
public synchronized void sessionTimeout()
{
logger.error("[Stream #{}] timeout with {}.", planId(), peer.toString());
closeSession(State.FAILED, "Session timed out");
}
/**
* @return Current snapshot of this session info.
*/
public SessionInfo getSessionInfo()
{
List<StreamSummary> receivingSummaries = Lists.newArrayList();
for (StreamTask receiver : receivers.values())
receivingSummaries.add(receiver.getSummary());
List<StreamSummary> transferSummaries = Lists.newArrayList();
for (StreamTask transfer : transfers.values())
transferSummaries.add(transfer.getSummary());
return new SessionInfo(channel.peer(), index, channel.connectedTo(), receivingSummaries, transferSummaries, state, failureReason);
}
public synchronized void taskCompleted(StreamReceiveTask completedTask)
{
receivers.remove(completedTask.tableId);
maybeCompleted();
}
public synchronized void taskCompleted(StreamTransferTask completedTask)
{
transfers.remove(completedTask.tableId);
maybeCompleted();
}
private void completePreview()
{
try
{
state(State.WAIT_COMPLETE);
closeSession(State.COMPLETE);
}
finally
{
// aborting the tasks here needs to be the last thing we do so that we accurately report
// expected streaming, but don't leak any resources held by the task
for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
task.abort();
}
}
/**
* Flushes matching column families from the given keyspace, or all columnFamilies
* if the cf list is empty.
*/
private void flushSSTables(Iterable<ColumnFamilyStore> stores)
{
List<Future<?>> flushes = new ArrayList<>();
for (ColumnFamilyStore cfs : stores)
flushes.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.STREAMING));
FBUtilities.waitOnFutures(flushes);
}
@VisibleForTesting
public synchronized void prepareReceiving(StreamSummary summary)
{
failIfFinished();
if (summary.files > 0)
receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
}
private void startStreamingFiles(@Nullable PrepareDirection prepareDirection)
{
if (prepareDirection != null)
streamResult.handleSessionPrepared(this, prepareDirection);
state(State.STREAMING);
for (StreamTransferTask task : transfers.values())
{
Collection<OutgoingStreamMessage> messages = task.getFileMessages();
if (!messages.isEmpty())
{
for (OutgoingStreamMessage ofm : messages)
{
// pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created)
ofm.header.addSessionInfo(this);
// do not sync here as this does disk access
channel.sendControlMessage(ofm);
}
}
else
{
taskCompleted(task); // there are no files to send
}
}
maybeCompleted();
}
@VisibleForTesting
public int getNumRequests()
{
return requests.size();
}
@VisibleForTesting
public int getNumTransfers()
{
return transferredRangesPerKeyspace.size();
}
@VisibleForTesting
public static interface MessageStateSink
{
static final MessageStateSink NONE = new MessageStateSink() {
@Override
public void recordState(InetAddressAndPort from, State state)
{
}
@Override
public void recordMessage(InetAddressAndPort from, StreamMessage.Type message)
{
}
@Override
public void onClose(InetAddressAndPort from)
{
}
};
/**
* @param from peer that is connected in the stream session
* @param state new state to change to
*/
public void recordState(InetAddressAndPort from, StreamSession.State state);
/**
* @param from peer that sends the given message
* @param message stream message sent by peer
*/
public void recordMessage(InetAddressAndPort from, StreamMessage.Type message);
/**
*
* @param from peer that is being disconnected
*/
public void onClose(InetAddressAndPort from);
}
public static String createLogTag(StreamSession session)
{
return createLogTag(session, (Object) null);
}
public static String createLogTag(StreamSession session, StreamingChannel channel)
{
return createLogTag(session, channel == null ? null : channel.id());
}
public static String createLogTag(StreamSession session, Channel channel)
{
return createLogTag(session, channel == null ? null : channel.id());
}
public static String createLogTag(StreamSession session, Object channelId)
{
StringBuilder sb = new StringBuilder(64);
sb.append("[Stream");
if (session != null)
sb.append(" #").append(session.planId());
if (channelId != null)
sb.append(" channel: ").append(channelId);
sb.append(']');
return sb.toString();
}
public synchronized void abort()
{
if (state.isFinalState())
{
logger.debug("[Stream #{}] Stream session with peer {} is already in a final state on abort.", planId(), peer);
return;
}
logger.info("[Stream #{}] Aborting stream session with peer {}...", planId(), peer);
if (channel.connected())
channel.sendControlMessage(new SessionFailedMessage());
try
{
closeSession(State.ABORTED);
}
catch (Exception e)
{
logger.error("[Stream #{}] Error aborting stream session with peer {}", planId(), peer);
}
}
@Override
public String toString()
{
return "StreamSession{" +
"streamOperation=" + streamOperation +
", peer=" + peer +
", channel=" + channel +
", requests=" + requests +
", transfers=" + transfers +
", isFollower=" + isFollower +
", pendingRepair=" + pendingRepair +
", previewKind=" + previewKind +
", state=" + state +
'}';
}
public static StringBuilder boundStackTrace(Throwable e, int limit, StringBuilder out)
{
Set<Throwable> visited = Collections.newSetFromMap(new IdentityHashMap<>());
return boundStackTrace(e, limit, limit, visited, out);
}
public static StringBuilder boundStackTrace(Throwable e, int limit, int counter, Set<Throwable> visited, StringBuilder out)
{
if (e == null)
return out;
if (!visited.add(e))
return out.append("[CIRCULAR REFERENCE: ").append(e.getClass().getName()).append(": ").append(e.getMessage()).append("]").append('\n');
visited.add(e);
StackTraceElement[] stackTrace = e.getStackTrace();
out.append(e.getClass().getName() + ": " + e.getMessage()).append('\n');
// When dealing with the leaf, ignore how many stack traces were already written, and allow the max.
// This is here as the leaf tends to show where the issue started, so tends to be impactful for debugging
if (e.getCause() == null)
counter = limit;
for (int i = 0, size = Math.min(e.getStackTrace().length, limit); i < size && counter > 0; i++)
{
out.append('\t').append(stackTrace[i]).append('\n');
counter--;
}
boundStackTrace(e.getCause(), limit, counter, visited, out);
return out;
}
}