blob: 408b6f4abedfd3a5c0dd25c504ec62e3ae356a6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import org.apache.cassandra.utils.TimeUUID;
/**
* StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
*
* All stream operations should be created through this class to track streaming status and progress.
*/
public class StreamManager implements StreamManagerMBean
{
private static final Logger logger = LoggerFactory.getLogger(StreamManager.class);
public static final StreamManager instance = new StreamManager();
/**
* Gets streaming rate limiter.
* When stream_throughput_outbound is 0, this returns rate limiter
* with the rate of Double.MAX_VALUE bytes per second.
* Rate unit is bytes per sec.
*
* @return StreamRateLimiter with rate limit set based on peer location.
*/
public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer)
{
return new StreamRateLimiter(peer,
StreamRateLimiter.LIMITER,
StreamRateLimiter.INTER_DC_LIMITER,
DatabaseDescriptor.getStreamThroughputOutboundBytesPerSec(),
DatabaseDescriptor.getInterDCStreamThroughputOutboundBytesPerSec());
}
/**
* Get streaming rate limiter for entire SSTable operations.
* When {@code entire_sstable_stream_throughput_outbound}
* is less than or equal ot {@code 0}, this returns rate limiter with the
* rate of {@link Double.MAX_VALUE} bytes per second.
* Rate unit is bytes per sec.
*
* @param peer the peer location
* @return {@link StreamRateLimiter} with entire SSTable rate limit set based on peer location
*/
public static StreamRateLimiter getEntireSSTableRateLimiter(InetAddressAndPort peer)
{
return new StreamRateLimiter(peer,
StreamRateLimiter.ENTIRE_SSTABLE_LIMITER,
StreamRateLimiter.ENTIRE_SSTABLE_INTER_DC_LIMITER,
DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundBytesPerSec(),
DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundBytesPerSec());
}
public static class StreamRateLimiter implements StreamingDataOutputPlus.RateLimiter
{
public static final double BYTES_PER_MEBIBYTE = 1024.0 * 1024.0;
private static final RateLimiter LIMITER = RateLimiter.create(calculateRateInBytes());
private static final RateLimiter INTER_DC_LIMITER = RateLimiter.create(calculateInterDCRateInBytes());
private static final RateLimiter ENTIRE_SSTABLE_LIMITER = RateLimiter.create(calculateEntireSSTableRateInBytes());
private static final RateLimiter ENTIRE_SSTABLE_INTER_DC_LIMITER = RateLimiter.create(calculateEntireSSTableInterDCRateInBytes());
private final RateLimiter limiter;
private final RateLimiter interDCLimiter;
private final boolean isLocalDC;
private final double throughput;
private final double interDCThroughput;
private StreamRateLimiter(InetAddressAndPort peer, RateLimiter limiter, RateLimiter interDCLimiter, double throughput, double interDCThroughput)
{
this.limiter = limiter;
this.interDCLimiter = interDCLimiter;
this.throughput = throughput;
this.interDCThroughput = interDCThroughput;
if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
else
isLocalDC = true;
}
@Override
public void acquire(int toTransfer)
{
limiter.acquire(toTransfer);
if (!isLocalDC)
interDCLimiter.acquire(toTransfer);
}
@Override
public boolean isRateLimited()
{
// Rate limiting is enabled when throughput greater than 0.
// If the peer is not local, also check whether inter-DC rate limiting is enabled.
return throughput > 0 || (!isLocalDC && interDCThroughput > 0);
}
public static void updateThroughput()
{
LIMITER.setRate(calculateRateInBytes());
}
public static void updateInterDCThroughput()
{
INTER_DC_LIMITER.setRate(calculateInterDCRateInBytes());
}
public static void updateEntireSSTableThroughput()
{
ENTIRE_SSTABLE_LIMITER.setRate(calculateEntireSSTableRateInBytes());
}
public static void updateEntireSSTableInterDCThroughput()
{
ENTIRE_SSTABLE_INTER_DC_LIMITER.setRate(calculateEntireSSTableInterDCRateInBytes());
}
private static double calculateRateInBytes()
{
double throughput = DatabaseDescriptor.getStreamThroughputOutboundBytesPerSec();
return calculateEffectiveRateInBytes(throughput);
}
private static double calculateInterDCRateInBytes()
{
double throughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundBytesPerSec();
return calculateEffectiveRateInBytes(throughput);
}
private static double calculateEntireSSTableRateInBytes()
{
double throughput = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundBytesPerSec();
return calculateEffectiveRateInBytes(throughput);
}
private static double calculateEntireSSTableInterDCRateInBytes()
{
double throughput = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundBytesPerSec();
return calculateEffectiveRateInBytes(throughput);
}
@VisibleForTesting
public static double getRateLimiterRateInBytes()
{
return LIMITER.getRate();
}
@VisibleForTesting
public static double getInterDCRateLimiterRateInBytes()
{
return INTER_DC_LIMITER.getRate();
}
@VisibleForTesting
public static double getEntireSSTableRateLimiterRateInBytes()
{
return ENTIRE_SSTABLE_LIMITER.getRate();
}
@VisibleForTesting
public static double getEntireSSTableInterDCRateLimiterRateInBytes()
{
return ENTIRE_SSTABLE_INTER_DC_LIMITER.getRate();
}
private static double calculateEffectiveRateInBytes(double throughput)
{
// if throughput is set to 0, throttling is disabled
return throughput > 0
? throughput
: Double.MAX_VALUE;
}
}
private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
private final CopyOnWriteArrayList<StreamListener> listeners = new CopyOnWriteArrayList<>();
/*
* Currently running streams. Removed after completion/failure.
* We manage them in two different maps to distinguish plan from initiated ones to
* receiving ones withing the same JVM.
*/
private final Map<TimeUUID, StreamResultFuture> initiatorStreams = new NonBlockingHashMap<>();
private final Map<TimeUUID, StreamResultFuture> followerStreams = new NonBlockingHashMap<>();
private final Cache<TimeUUID, StreamingState> states;
private final StreamListener listener = new StreamListener()
{
@Override
public void onRegister(StreamResultFuture result)
{
if (!DatabaseDescriptor.getStreamingStatsEnabled())
return;
// reason for synchronized rather than states.get is to detect duplicates
// streaming shouldn't be producing duplicates as that would imply a planId collision
synchronized (states)
{
StreamingState previous = states.getIfPresent(result.planId);
if (previous == null)
{
StreamingState state = new StreamingState(result);
states.put(state.id(), state);
state.phase.start();
result.addEventListener(state);
}
else
{
logger.warn("Duplicate streaming states detected for id {}", result.planId);
}
}
}
};
public StreamManager()
{
DurationSpec.LongNanosecondsBound duration = DatabaseDescriptor.getStreamingStateExpires();
long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
logger.info("Storing streaming state for {} or for {} elements", duration, numElements);
states = CacheBuilder.newBuilder()
.expireAfterWrite(duration.quantity(), duration.unit())
.maximumSize(numElements)
.build();
}
public void start()
{
addListener(listener);
}
public void stop()
{
removeListener(listener);
}
public Collection<StreamingState> getStreamingStates()
{
return states.asMap().values();
}
public StreamingState getStreamingState(TimeUUID id)
{
return states.getIfPresent(id);
}
@VisibleForTesting
public void putStreamingState(StreamingState state)
{
synchronized (states)
{
StreamingState previous = states.getIfPresent(state.id());
if (previous != null)
throw new AssertionError("StreamPlan id " + state.id() + " already exists");
states.put(state.id(), state);
}
}
@VisibleForTesting
public void clearStates()
{
// states.cleanUp() doesn't clear, it looks to only run gc on things that could be removed... this method should remove all state
states.asMap().clear();
}
public Set<CompositeData> getCurrentStreams()
{
return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(), followerStreams.values()), new Function<StreamResultFuture, CompositeData>()
{
public CompositeData apply(StreamResultFuture input)
{
return StreamStateCompositeData.toCompositeData(input.getCurrentState());
}
}));
}
@Override
public boolean getStreamingStatsEnabled()
{
return DatabaseDescriptor.getStreamingStatsEnabled();
}
@Override
public void setStreamingStatsEnabled(boolean streamingStatsEnabled)
{
DatabaseDescriptor.setStreamingStatsEnabled(streamingStatsEnabled);
}
@Override
public String getStreamingSlowEventsLogTimeout()
{
return DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toString();
}
@Override
public void setStreamingSlowEventsLogTimeout(String value)
{
DatabaseDescriptor.setStreamingSlowEventsLogTimeout(value);
}
public void registerInitiator(final StreamResultFuture result)
{
result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or not)
result.addListener(() -> initiatorStreams.remove(result.planId));
initiatorStreams.put(result.planId, result);
notifySafeOnRegister(result);
}
public StreamResultFuture registerFollower(final StreamResultFuture result)
{
result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or not)
result.addListener(() -> followerStreams.remove(result.planId));
StreamResultFuture previous = followerStreams.putIfAbsent(result.planId, result);
if (previous == null)
{
notifySafeOnRegister(result);
return result;
}
return previous;
}
@VisibleForTesting
public void putInitiatorStream(StreamResultFuture future)
{
StreamResultFuture current = initiatorStreams.putIfAbsent(future.planId, future);
assert current == null: "Duplicat initiator stream for " + future.planId;
}
@VisibleForTesting
public void putFollowerStream(StreamResultFuture future)
{
StreamResultFuture current = followerStreams.putIfAbsent(future.planId, future);
assert current == null: "Duplicate follower stream for " + future.planId;
}
public void addListener(StreamListener listener)
{
listeners.add(listener);
}
public void removeListener(StreamListener listener)
{
listeners.remove(listener);
}
private void notifySafeOnRegister(StreamResultFuture result)
{
for (StreamListener l : listeners)
{
try
{
l.onRegister(result);
}
catch (Throwable t)
{
logger.warn("Failed to notify stream listener of new Initiator/Follower", t);
}
}
}
public StreamResultFuture getReceivingStream(TimeUUID planId)
{
return followerStreams.get(planId);
}
public StreamResultFuture getInitiatorStream(TimeUUID planId)
{
return initiatorStreams.get(planId);
}
public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
{
notifier.addNotificationListener(listener, filter, handback);
}
public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException
{
notifier.removeNotificationListener(listener);
}
public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException
{
notifier.removeNotificationListener(listener, filter, handback);
}
public MBeanNotificationInfo[] getNotificationInfo()
{
return notifier.getNotificationInfo();
}
public StreamSession findSession(InetAddressAndPort peer, TimeUUID planId, int sessionIndex, boolean searchInitiatorSessions)
{
Map<TimeUUID, StreamResultFuture> streams = searchInitiatorSessions ? initiatorStreams : followerStreams;
return findSession(streams, peer, planId, sessionIndex);
}
private StreamSession findSession(Map<TimeUUID, StreamResultFuture> streams, InetAddressAndPort peer, TimeUUID planId, int sessionIndex)
{
StreamResultFuture streamResultFuture = streams.get(planId);
if (streamResultFuture == null)
return null;
return streamResultFuture.getSession(peer, sessionIndex);
}
public interface StreamListener
{
default void onRegister(StreamResultFuture result) {}
}
}