blob: 0197a6b3461316f4ba10d2e8259fac25a8078f81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.cassandra.net.Verb.PING_REQ;
import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
public class StartupClusterConnectivityChecker
{
private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
private final boolean blockForRemoteDcs;
private final long timeoutNanos;
public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs)
{
if (timeoutSecs > 100)
logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
}
@VisibleForTesting
StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
{
this.blockForRemoteDcs = blockForRemoteDcs;
this.timeoutNanos = timeoutNanos;
}
/**
* @param peers The currently known peers in the cluster; argument is not modified.
* @param getDatacenterSource A function for mapping peers to their datacenter.
* @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
* else false.
*/
public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
{
if (peers == null || this.timeoutNanos < 0)
return true;
// make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
peers = new HashSet<>(peers);
InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
String localDc = getDatacenterSource.apply(localAddress);
peers.remove(localAddress);
if (peers.isEmpty())
return true;
// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such)
Map<InetAddressAndPort, String> peerToDatacenter = new HashMap<>();
SetMultimap<String, InetAddressAndPort> datacenterToPeers = HashMultimap.create();
for (InetAddressAndPort peer : peers)
{
String datacenter = getDatacenterSource.apply(peer);
peerToDatacenter.put(peer, datacenter);
datacenterToPeers.put(datacenter, peer);
}
// In the case where we do not want to block startup on remote datacenters (e.g. because clients only use
// LOCAL_X consistency levels), we remove all other datacenter hosts from the mapping and we only wait
// on the remaining local datacenter.
if (!blockForRemoteDcs)
{
datacenterToPeers.keySet().retainAll(Collections.singleton(localDc));
logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s",
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
}
else
{
logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s",
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
}
// The threshold is 3 because for each peer we want to have 3 acks,
// one for small message connection, one for large message connnection and one for alive event from gossip.
AckMap acks = new AckMap(3, peers);
Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
for (String datacenter: datacenterToPeers.keys())
{
dcToRemainingPeers.put(datacenter,
newCountDownLatch(Math.max(datacenterToPeers.get(datacenter).size() - 1, 0)));
}
long startNanos = nanoTime();
// set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
AliveListener listener = new AliveListener(alivePeers, dcToRemainingPeers, acks, peerToDatacenter::get);
Gossiper.instance.register(listener);
// send out a ping message to open up the non-gossip connections to all peers. Note that this sends the
// ping messages to _all_ peers, not just the ones we block for in dcToRemainingPeers.
sendPingMessages(peers, dcToRemainingPeers, acks, peerToDatacenter::get);
for (InetAddressAndPort peer : peers)
{
if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
{
String datacenter = peerToDatacenter.get(peer);
// We have to check because we might only have the local DC in the map
if (dcToRemainingPeers.containsKey(datacenter))
dcToRemainingPeers.get(datacenter).decrement();
}
}
boolean succeeded = true;
for (CountDownLatch countDownLatch : dcToRemainingPeers.values())
{
long remainingNanos = Math.max(1, timeoutNanos - (nanoTime() - startNanos));
//noinspection UnstableApiUsage
succeeded &= countDownLatch.awaitUninterruptibly(remainingNanos, TimeUnit.NANOSECONDS);
}
Gossiper.instance.unregister(listener);
if (succeeded)
{
logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
dcToRemainingPeers.keySet(), TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
}
else
{
// dc -> missing peer host addresses
Map<String, List<String>> peersDown = acks.getMissingPeers().stream()
.collect(groupingBy(peer -> {
String dc = peerToDatacenter.get(peer);
if (dc != null)
return dc;
return StringUtils.defaultString(getDatacenterSource.apply(peer), "unknown");
},
mapping(InetAddressAndPort::getHostAddressAndPort,
toList())));
logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos),
peersDown);
}
return succeeded;
}
/**
* Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
* used for internode messaging (that is not gossip).
*/
private void sendPingMessages(Set<InetAddressAndPort> peers, Map<String, CountDownLatch> dcToRemainingPeers,
AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
{
RequestCallback responseHandler = msg -> {
if (acks.incrementAndCheck(msg.from()))
{
String datacenter = getDatacenter.apply(msg.from());
// We have to check because we might only have the local DC in the map
if (dcToRemainingPeers.containsKey(datacenter))
dcToRemainingPeers.get(datacenter).decrement();
}
};
Message<PingRequest> small = Message.out(PING_REQ, PingRequest.forSmall);
Message<PingRequest> large = Message.out(PING_REQ, PingRequest.forLarge);
for (InetAddressAndPort peer : peers)
{
MessagingService.instance().sendWithCallback(small, peer, responseHandler, SMALL_MESSAGES);
MessagingService.instance().sendWithCallback(large, peer, responseHandler, LARGE_MESSAGES);
}
}
/**
* A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
* {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
*/
private static final class AliveListener implements IEndpointStateChangeSubscriber
{
private final Map<String, CountDownLatch> dcToRemainingPeers;
private final Set<InetAddressAndPort> livePeers;
private final Function<InetAddressAndPort, String> getDatacenter;
private final AckMap acks;
AliveListener(Set<InetAddressAndPort> livePeers, Map<String, CountDownLatch> dcToRemainingPeers,
AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
{
this.livePeers = livePeers;
this.dcToRemainingPeers = dcToRemainingPeers;
this.acks = acks;
this.getDatacenter = getDatacenter;
}
public void onAlive(InetAddressAndPort endpoint, EndpointState state)
{
if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
{
String datacenter = getDatacenter.apply(endpoint);
if (dcToRemainingPeers.containsKey(datacenter))
dcToRemainingPeers.get(datacenter).decrement();
}
}
}
private static final class AckMap
{
private final int threshold;
private final Map<InetAddressAndPort, AtomicInteger> acks;
AckMap(int threshold, Iterable<InetAddressAndPort> initialPeers)
{
this.threshold = threshold;
acks = new ConcurrentHashMap<>();
for (InetAddressAndPort peer : initialPeers)
initOrGetCounter(peer);
}
boolean incrementAndCheck(InetAddressAndPort address)
{
return initOrGetCounter(address).incrementAndGet() == threshold;
}
/**
* Get a list of peers that has not fully ack'd, i.e. not reaching threshold acks
*/
List<InetAddressAndPort> getMissingPeers()
{
List<InetAddressAndPort> missingPeers = new ArrayList<>();
for (Map.Entry<InetAddressAndPort, AtomicInteger> entry : acks.entrySet())
{
if (entry.getValue().get() < threshold)
missingPeers.add(entry.getKey());
}
return missingPeers;
}
// init the counter for the peer just in case
private AtomicInteger initOrGetCounter(InetAddressAndPort address)
{
return acks.computeIfAbsent(address, addr -> new AtomicInteger(0));
}
}
}