blob: f8e0d398f3f1a0918075a80f9cb900c8a3adc166 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.sidecar.adapters.base;
import java.math.BigInteger;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.sidecar.adapters.base.NodeInfo.NodeState;
import org.apache.cassandra.sidecar.common.JmxClient;
import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse.ReplicaInfo;
import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
import org.jetbrains.annotations.NotNull;
import static java.util.stream.Collectors.toList;
import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
import static org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicas.generateTokenRangeReplicas;
/**
* Aggregates the replica-set by token range
*/
public class TokenRangeReplicaProvider
{
private interface KeyspaceToRangeMappingFunc extends Function<String, Map<List<String>, List<String>>>
{
}
protected final JmxClient jmxClient;
private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicaProvider.class);
public TokenRangeReplicaProvider(JmxClient jmxClient)
{
this.jmxClient = jmxClient;
}
public TokenRangeReplicasResponse tokenRangeReplicas(String keyspace, Partitioner partitioner)
{
Objects.requireNonNull(keyspace, "keyspace must be non-null");
StorageJmxOperations storage = initializeStorageOps();
List<TokenRangeReplicas> naturalTokenRangeReplicas =
getTokenRangeReplicas("Natural", keyspace, partitioner, storage::getRangeToEndpointWithPortMap);
// Pending ranges include bootstrap tokens and leaving endpoints as represented in the Cassandra TokenMetadata
List<TokenRangeReplicas> pendingTokenRangeReplicas =
getTokenRangeReplicas("Pending", keyspace, partitioner, storage::getPendingRangeToEndpointWithPortMap);
// Merge natural and pending range replicas to generate candidates for write-replicas
List<TokenRangeReplicas> allTokenRangeReplicas = new ArrayList<>(naturalTokenRangeReplicas);
allTokenRangeReplicas.addAll(pendingTokenRangeReplicas);
Map<String, String> hostToDatacenter = buildHostToDatacenterMapping(allTokenRangeReplicas);
// Retrieve map of all token ranges (pending & primary) to endpoints
List<ReplicaInfo> writeReplicas = writeReplicasFromPendingRanges(allTokenRangeReplicas, hostToDatacenter);
List<ReplicaInfo> readReplicas = readReplicasFromReplicaMapping(naturalTokenRangeReplicas, hostToDatacenter);
Map<String, String> replicaToStateMap = replicaToStateMap(allTokenRangeReplicas, storage);
return new TokenRangeReplicasResponse(replicaToStateMap,
writeReplicas,
readReplicas);
}
private List<TokenRangeReplicas> getTokenRangeReplicas(String rangeType, String keyspace, Partitioner partitioner,
KeyspaceToRangeMappingFunc rangeMappingSupplier)
{
Map<List<String>, List<String>> rangeMappings = rangeMappingSupplier.apply(keyspace);
LOGGER.debug(rangeType + " token range mappings for keyspace={}, rangeMappings={}", keyspace, rangeMappings);
return transformRangeMappings(rangeMappings, partitioner);
}
private List<TokenRangeReplicas> transformRangeMappings(Map<List<String>, List<String>> replicaMappings,
Partitioner partitioner)
{
return replicaMappings.entrySet()
.stream()
.map(entry -> generateTokenRangeReplicas(new BigInteger(entry.getKey().get(0)),
new BigInteger(entry.getKey().get(1)),
partitioner,
new HashSet<>(entry.getValue())))
.flatMap(Collection::stream)
.collect(toList());
}
private Map<String, String> replicaToStateMap(List<TokenRangeReplicas> replicaSet, StorageJmxOperations storage)
{
List<String> joiningNodes = storage.getJoiningNodesWithPort();
List<String> leavingNodes = storage.getLeavingNodesWithPort();
List<String> movingNodes = storage.getMovingNodesWithPort();
String rawGossipInfo = getRawGossipInfo();
GossipInfoResponse gossipInfo = GossipInfoParser.parse(rawGossipInfo);
StateWithReplacement state = new StateWithReplacement(joiningNodes, leavingNodes, movingNodes, gossipInfo);
return replicaSet.stream()
.map(TokenRangeReplicas::replicaSet)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toMap(Function.identity(), state::of));
}
protected EndpointSnitchJmxOperations initializeEndpointProxy()
{
return jmxClient.proxy(EndpointSnitchJmxOperations.class, ENDPOINT_SNITCH_INFO_OBJ_NAME);
}
protected StorageJmxOperations initializeStorageOps()
{
return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
}
protected String getRawGossipInfo()
{
return jmxClient.proxy(ClusterMembershipJmxOperations.class, FAILURE_DETECTOR_OBJ_NAME)
.getAllEndpointStatesWithPort();
}
private List<ReplicaInfo> writeReplicasFromPendingRanges(List<TokenRangeReplicas> tokenRangeReplicaSet,
Map<String, String> hostToDatacenter)
{
// Candidate write-replica mappings are normalized by consolidating overlapping ranges
return TokenRangeReplicas.normalize(tokenRangeReplicaSet).stream()
.map(range -> buildReplicaInfo(hostToDatacenter, range))
.collect(toList());
}
private List<ReplicaInfo> readReplicasFromReplicaMapping(List<TokenRangeReplicas> naturalTokenRangeReplicas,
Map<String, String> hostToDatacenter)
{
return naturalTokenRangeReplicas.stream()
.sorted()
.map(rep -> buildReplicaInfo(hostToDatacenter, rep))
.collect(toList());
}
@NotNull
private static ReplicaInfo buildReplicaInfo(Map<String, String> hostToDatacenter, TokenRangeReplicas rep)
{
Map<String, List<String>> replicasByDc = replicasByDataCenter(hostToDatacenter, rep.replicaSet());
return new ReplicaInfo(rep.start().toString(),
rep.end().toString(),
replicasByDc);
}
private Map<String, String> buildHostToDatacenterMapping(List<TokenRangeReplicas> replicaSet)
{
EndpointSnitchJmxOperations endpointSnitchInfo = initializeEndpointProxy();
return replicaSet.stream()
.map(TokenRangeReplicas::replicaSet)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toMap(Function.identity(),
(String host) -> getDatacenter(endpointSnitchInfo, host)));
}
private String getDatacenter(EndpointSnitchJmxOperations endpointSnitchInfo, String host)
{
try
{
return endpointSnitchInfo.getDatacenter(host);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
@NotNull
private static Map<String, List<String>> replicasByDataCenter(Map<String, String> hostToDatacenter,
Collection<String> replicas)
{
Map<String, List<String>> dcReplicaMapping = new HashMap<>();
replicas.stream()
.filter(hostToDatacenter::containsKey)
.forEach(item ->
dcReplicaMapping.computeIfAbsent(hostToDatacenter.get(item), v -> new ArrayList<>())
.add(item));
return dcReplicaMapping;
}
/**
* We want to identity a joining node, to replace a dead node, differently from a newly joining node. To
* do this we analyze gossip info and set 'Replacing' state for node replacing a dead node.
* {@link StateWithReplacement} is used to set replacing state for a node.
*
* <p>We are adding this state for token range replica provider endpoint. To send out replicas for a
* range along with state of replicas including replacing state.
*/
static class StateWithReplacement extends RingProvider.State
{
private final GossipInfoResponse gossipInfo;
StateWithReplacement(List<String> joiningNodes, List<String> leavingNodes, List<String> movingNodes,
GossipInfoResponse gossipInfo)
{
super(joiningNodes, leavingNodes, movingNodes);
this.gossipInfo = gossipInfo;
}
/**
* This method returns state of a node and accounts for a new 'Replacing' state if the node is
* replacing a dead node. For returning this state, the method checks status of the node in gossip
* information.
*
* @param endpoint node information represented usually in form of 'ip:port'
* @return Node status
*/
@Override
String of(String endpoint)
{
if (joiningNodes.contains(endpoint))
{
GossipInfoResponse.GossipInfo gossipInfoEntry = gossipInfo.get(endpoint);
if (gossipInfoEntry != null)
{
LOGGER.debug("Found gossipInfoEntry={}", gossipInfoEntry);
String hostStatus = gossipInfoEntry.status();
if (hostStatus != null && hostStatus.startsWith("BOOT_REPLACE,"))
{
return NodeState.REPLACING.displayName();
}
}
}
return super.of(endpoint);
}
}
}