blob: 5277b9de73d97b9e2b2a6d8420857ceee3395cfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.streaming.StreamingChannel.Factory.Global.streamingFactory;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
/**
* A future on the result ({@link StreamState}) of a streaming plan.
*
* In practice, this object also groups all the {@link StreamSession} for the streaming job
* involved. One StreamSession will be created for every peer involved and said session will
* handle every streaming (outgoing and incoming) to that peer for this job.
* <p>
* The future will return a result once every session is completed (successfully or not). If
* any session ended up with an error, the future will throw a StreamException.
* <p>
* You can attach {@link StreamEventHandler} to this object to listen on {@link StreamEvent}s to
* track progress of the streaming.
*/
public final class StreamResultFuture extends AsyncFuture<StreamState>
{
private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class);
public final TimeUUID planId;
public final StreamOperation streamOperation;
private final StreamCoordinator coordinator;
private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
private final long slowEventsLogTimeoutNanos = DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds();
/**
* Create new StreamResult of given {@code planId} and streamOperation.
*
* Constructor is package private. You need to use {@link StreamPlan#execute()} to get the instance.
*
* @param planId Stream plan ID
* @param streamOperation Stream streamOperation
*/
public StreamResultFuture(TimeUUID planId, StreamOperation streamOperation, StreamCoordinator coordinator)
{
this.planId = planId;
this.streamOperation = streamOperation;
this.coordinator = coordinator;
// if there is no session to listen to, we immediately set result for returning
if (!coordinator.isFollower() && !coordinator.hasActiveSessions())
trySuccess(getCurrentState());
}
@VisibleForTesting
public StreamResultFuture(TimeUUID planId, StreamOperation streamOperation, TimeUUID pendingRepair, PreviewKind previewKind)
{
this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, streamingFactory(), true, false, pendingRepair, previewKind));
}
public static StreamResultFuture createInitiator(TimeUUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
StreamCoordinator coordinator)
{
StreamResultFuture future = createAndRegisterInitiator(planId, streamOperation, coordinator);
if (listeners != null)
{
for (StreamEventHandler listener : listeners)
future.addEventListener(listener);
}
logger.info("[Stream #{}] Executing streaming plan for {}", planId, streamOperation.getDescription());
// Initialize and start all sessions
for (final StreamSession session : coordinator.getAllStreamSessions())
{
session.init(future);
}
coordinator.connect(future);
return future;
}
public static synchronized StreamResultFuture createFollower(int sessionIndex,
TimeUUID planId,
StreamOperation streamOperation,
InetAddressAndPort from,
StreamingChannel channel,
int messagingVersion,
TimeUUID pendingRepair,
PreviewKind previewKind)
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} {}", planId, sessionIndex, streamOperation.getDescription(),
from, channel.description());
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);
StreamManager.instance.registerFollower(future);
}
future.initInbound(from, channel, messagingVersion, sessionIndex);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from {} {}",
planId, sessionIndex, streamOperation.getDescription(), from, channel.description());
return future;
}
private static StreamResultFuture createAndRegisterInitiator(TimeUUID planId, StreamOperation streamOperation, StreamCoordinator coordinator)
{
StreamResultFuture future = new StreamResultFuture(planId, streamOperation, coordinator);
StreamManager.instance.registerInitiator(future);
return future;
}
public StreamCoordinator getCoordinator()
{
return coordinator;
}
private void initInbound(InetAddressAndPort from, StreamingChannel channel, int messagingVersion, int sessionIndex)
{
StreamSession session = coordinator.getOrCreateInboundSession(from, channel, messagingVersion, sessionIndex);
session.init(this);
}
@SuppressWarnings("UnstableApiUsage")
public void addEventListener(StreamEventHandler listener)
{
addCallback(listener);
eventListeners.add(listener);
}
/**
* @return Current snapshot of streaming progress.
*/
public StreamState getCurrentState()
{
return new StreamState(planId, streamOperation, coordinator.getAllSessionInfo());
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamResultFuture that = (StreamResultFuture) o;
return planId.equals(that.planId);
}
@Override
public int hashCode()
{
return planId.hashCode();
}
void handleSessionPrepared(StreamSession session, StreamSession.PrepareDirection prepareDirection)
{
SessionInfo sessionInfo = session.getSessionInfo();
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({}), sending {} files({})",
session.planId(),
session.sessionIndex(),
sessionInfo.getTotalFilesToReceive(),
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()),
sessionInfo.getTotalFilesToSend(),
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend()));
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo, prepareDirection);
coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}
void handleSessionComplete(StreamSession session)
{
logger.info("[Stream #{}] Session with {} is {}", session.planId(), session.peer, session.state().name().toLowerCase());
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
SessionInfo sessionInfo = session.getSessionInfo();
coordinator.addSessionInfo(sessionInfo);
maybeComplete();
}
public void handleProgress(ProgressInfo progress)
{
coordinator.updateProgress(progress);
fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
}
synchronized void fireStreamEvent(StreamEvent event)
{
// delegate to listener
long startNanos = nanoTime();
for (StreamEventHandler listener : eventListeners)
{
try
{
listener.handleStreamEvent(event);
}
catch (Throwable t)
{
logger.warn("Unexpected exception in listern while calling handleStreamEvent", t);
}
}
long totalNanos = nanoTime() - startNanos;
if (totalNanos > slowEventsLogTimeoutNanos)
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",
() -> new Object[] { Duration.ofNanos(slowEventsLogTimeoutNanos), Duration.ofNanos(totalNanos)});
}
private synchronized void maybeComplete()
{
if (finishedAllSessions())
{
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Stream failed: ");
for (SessionInfo info : finalState.sessions())
{
if (info.isFailed())
stringBuilder.append("\nSession peer ").append(info.peer).append(' ').append(info.failureReason);
}
String message = stringBuilder.toString();
logger.warn("[Stream #{}] {}", planId, message);
tryFailure(new StreamException(finalState, message));
}
else if (finalState.hasAbortedSession())
{
logger.info("[Stream #{}] Stream aborted", planId);
trySuccess(finalState);
}
else
{
logger.info("[Stream #{}] All sessions completed", planId);
trySuccess(finalState);
}
}
}
public StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
{
return coordinator.getSessionById(peer, sessionIndex);
}
/**
* We can't use {@link StreamCoordinator#hasActiveSessions()} directly because {@link this#maybeComplete()}
* relies on the snapshotted state from {@link StreamCoordinator} and not the {@link StreamSession} state
* directly (CASSANDRA-15667), otherwise inconsistent snapshotted states may lead to completion races.
*/
private boolean finishedAllSessions()
{
return coordinator.getAllSessionInfo().stream().allMatch(s -> s.state.isFinalState());
}
}