| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.locator; |
| |
| import java.io.InputStream; |
| import java.net.InetAddress; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.gms.ApplicationState; |
| import org.apache.cassandra.gms.EndpointState; |
| import org.apache.cassandra.gms.Gossiper; |
| import org.apache.cassandra.gms.VersionedValue; |
| import org.apache.cassandra.io.util.FileUtils; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.ResourceWatcher; |
| import org.apache.cassandra.utils.WrappedRunnable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.yaml.snakeyaml.TypeDescription; |
| import org.yaml.snakeyaml.Yaml; |
| import org.yaml.snakeyaml.constructor.Constructor; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.net.InetAddresses; |
| |
| /** |
| * Network topology snitch that reads its configuration from a YAML file. |
| * <p> |
| * This snitch supports connections over preferred addresses, such as a data-center-local address, based on the |
| * reconnection trick used in {@link Ec2MultiRegionSnitch}. The configuration file, {@code cassandra-topology.yaml}, is |
| * checked periodically for updates. |
| * </p> |
| */ |
| public class YamlFileNetworkTopologySnitch |
| extends AbstractNetworkTopologySnitch |
| { |
| |
| private static final Logger logger = LoggerFactory.getLogger(YamlFileNetworkTopologySnitch.class); |
| |
| /** |
| * How often to check the topology configuration file, in milliseconds; defaults to one minute. |
| */ |
| private static final int CHECK_PERIOD_IN_MS = 60 * 1000; |
| |
| /** Default name for the topology configuration file. */ |
| private static final String DEFAULT_TOPOLOGY_CONFIG_FILENAME = "cassandra-topology.yaml"; |
| |
| /** Node data map, keyed by broadcast address. */ |
| private volatile Map<InetAddress, NodeData> nodeDataMap; |
| |
| /** Node data for this node. */ |
| private volatile NodeData localNodeData; |
| |
| /** Node data to fall back to when there is no match. */ |
| private volatile NodeData defaultNodeData; |
| |
| /** Name of the topology configuration file. */ |
| private final String topologyConfigFilename; |
| |
| /** True if the gossiper has been initialized. */ |
| private volatile boolean gossiperInitialized = false; |
| |
| /** |
| * Constructor. |
| * |
| * @throws ConfigurationException |
| * on failure |
| */ |
| public YamlFileNetworkTopologySnitch() throws ConfigurationException |
| { |
| this(DEFAULT_TOPOLOGY_CONFIG_FILENAME); |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param topologyConfigFilename |
| * name of the topology configuration file |
| * @throws ConfigurationException |
| * on failure |
| */ |
| YamlFileNetworkTopologySnitch(final String topologyConfigFilename) |
| throws ConfigurationException |
| { |
| this.topologyConfigFilename = topologyConfigFilename; |
| loadTopologyConfiguration(false); |
| |
| try |
| { |
| /* |
| * Check if the topology configuration file is a plain file. |
| */ |
| FBUtilities.resourceToFile(topologyConfigFilename); |
| |
| final Runnable runnable = new WrappedRunnable() |
| { |
| /** |
| * Loads the topology. |
| */ |
| protected void runMayThrow() throws ConfigurationException |
| { |
| loadTopologyConfiguration(true); |
| } |
| }; |
| ResourceWatcher.watch(topologyConfigFilename, runnable, |
| CHECK_PERIOD_IN_MS); |
| } |
| catch (final ConfigurationException e) |
| { |
| logger.debug( |
| "{} found, but does not look like a plain file. Will not watch it for changes", |
| topologyConfigFilename); |
| } |
| } |
| |
| /** |
| * Returns the name of the rack for the endpoint, or {@code UNKNOWN} if not known. |
| * |
| * @return the name of the data center for the endpoint, or {@code UNKNOWN} if not known |
| */ |
| @Override |
| public String getRack(final InetAddress endpoint) |
| { |
| final NodeData nodeData = nodeDataMap.get(endpoint); |
| return nodeData != null ? nodeData.rack : defaultNodeData.rack; |
| } |
| |
| /** |
| * Returns the name of the data center for the endpoint, or {@code UNKNOWN} if not known. |
| * |
| * @return the name of the data center for the endpoint, or {@code UNKNOWN} if not known |
| */ |
| @Override |
| public String getDatacenter(final InetAddress endpoint) |
| { |
| final NodeData nodeData = nodeDataMap.get(endpoint); |
| return nodeData != null ? nodeData.datacenter |
| : defaultNodeData.datacenter; |
| } |
| |
| /** |
| * Root object type for the YAML topology configuration. |
| */ |
| public static class TopologyConfig |
| { |
| public List<Datacenter> topology; |
| public String default_dc_name = "UNKNOWN"; |
| public String default_rack_name = "UNKNOWN"; |
| } |
| |
| /** |
| * Data center object type for the YAML topology configuration. |
| */ |
| public static class Datacenter |
| { |
| public String dc_name; |
| public List<Rack> racks = Collections.emptyList(); |
| } |
| |
| /** |
| * Rack object type for the YAML topology configuration. |
| */ |
| public static class Rack |
| { |
| public String rack_name; |
| public List<Node> nodes = Collections.emptyList(); |
| } |
| |
| /** |
| * Node object type for the YAML topology configuration. |
| */ |
| public static class Node |
| { |
| public String broadcast_address; |
| public String dc_local_address; |
| } |
| |
| /** |
| * Loads the topology configuration file. |
| * |
| * @throws ConfigurationException |
| * on failure |
| */ |
| private synchronized void loadTopologyConfiguration(boolean isUpdate) |
| throws ConfigurationException |
| { |
| logger.debug("Loading topology configuration from {}", |
| topologyConfigFilename); |
| |
| final TypeDescription topologyConfigTypeDescription = new TypeDescription( |
| TopologyConfig.class); |
| topologyConfigTypeDescription.putListPropertyType("topology", |
| Datacenter.class); |
| |
| final TypeDescription topologyTypeDescription = new TypeDescription( |
| Datacenter.class); |
| topologyTypeDescription.putListPropertyType("racks", Rack.class); |
| |
| final TypeDescription rackTypeDescription = new TypeDescription( |
| Rack.class); |
| rackTypeDescription.putListPropertyType("nodes", Node.class); |
| |
| final Constructor configConstructor = new Constructor( |
| TopologyConfig.class); |
| configConstructor.addTypeDescription(topologyConfigTypeDescription); |
| configConstructor.addTypeDescription(topologyTypeDescription); |
| configConstructor.addTypeDescription(rackTypeDescription); |
| |
| final InputStream configFileInputStream = getClass().getClassLoader() |
| .getResourceAsStream(topologyConfigFilename); |
| if (configFileInputStream == null) |
| { |
| throw new ConfigurationException( |
| "Could not read topology config file " |
| + topologyConfigFilename); |
| } |
| Yaml yaml; |
| TopologyConfig topologyConfig; |
| try |
| { |
| yaml = new Yaml(configConstructor); |
| topologyConfig = (TopologyConfig) yaml.load(configFileInputStream); |
| } |
| finally |
| { |
| FileUtils.closeQuietly(configFileInputStream); |
| } |
| final Map<InetAddress, NodeData> nodeDataMap = new HashMap<InetAddress, NodeData>(); |
| |
| if (topologyConfig.topology == null) |
| { |
| throw new ConfigurationException( |
| "Topology configuration file is missing the topology section"); |
| } |
| |
| for (final Datacenter datacenter : topologyConfig.topology) |
| { |
| if (datacenter.dc_name == null) |
| { |
| throw new ConfigurationException( |
| "Topology configuration file is missing a data center name for some data center"); |
| } |
| |
| for (final Rack rack : datacenter.racks) |
| { |
| if (rack.rack_name == null) |
| { |
| throw new ConfigurationException( |
| String.format( |
| "Topology configuration file is missing a rack name for some rack under data center '%s'", |
| datacenter.dc_name)); |
| } |
| |
| for (final Node node : rack.nodes) |
| { |
| if (node.broadcast_address == null) |
| { |
| throw new ConfigurationException( |
| String.format( |
| "Topology configuration file is missing a broadcast address for some node under data center '%s' rack '%s'", |
| datacenter.dc_name, rack.rack_name)); |
| } |
| |
| final InetAddress endpoint = InetAddresses |
| .forString(node.broadcast_address); |
| final InetAddress dcLocalAddress = node.dc_local_address == null ? null |
| : InetAddresses.forString(node.dc_local_address); |
| |
| final NodeData nodeData = new NodeData(); |
| nodeData.datacenter = datacenter.dc_name; |
| nodeData.rack = rack.rack_name; |
| nodeData.dcLocalAddress = dcLocalAddress; |
| |
| if (nodeDataMap.put(endpoint, nodeData) != null) |
| { |
| throw new ConfigurationException( |
| String.format( |
| "IP address '%s' appears more than once in the topology configuration file", |
| endpoint)); |
| } |
| |
| if (dcLocalAddress != null |
| && nodeDataMap.put(dcLocalAddress, nodeData) != null) |
| { |
| throw new ConfigurationException( |
| String.format( |
| "IP address '%s' appears more than once in the topology configuration file", |
| dcLocalAddress)); |
| } |
| } |
| } |
| } |
| |
| final NodeData localNodeData = nodeDataMap.get(FBUtilities |
| .getBroadcastAddress()); |
| if (localNodeData == null) |
| { |
| throw new ConfigurationException( |
| "Topology configuration missing information for the local node"); |
| } |
| |
| final NodeData defaultNodeData = new NodeData(); |
| |
| if (topologyConfig.default_dc_name == null) |
| { |
| throw new ConfigurationException( |
| "default_dc_name must be specified"); |
| } |
| if (topologyConfig.default_rack_name == null) |
| { |
| throw new ConfigurationException( |
| "default_rack_name must be specified"); |
| } |
| |
| defaultNodeData.datacenter = topologyConfig.default_dc_name; |
| defaultNodeData.rack = topologyConfig.default_rack_name; |
| |
| // YAML configuration looks good; now make the changes |
| |
| this.nodeDataMap = nodeDataMap; |
| this.localNodeData = localNodeData; |
| this.defaultNodeData = defaultNodeData; |
| maybeSetApplicationState(); |
| |
| if (logger.isDebugEnabled()) |
| { |
| logger.debug( |
| "Built topology map from config file: localNodeData={}, nodeDataMap={}", |
| localNodeData, nodeDataMap); |
| } |
| |
| if (gossiperInitialized) |
| { |
| StorageService.instance.gossipSnitchInfo(); |
| } |
| |
| if (isUpdate && StorageService.instance != null) |
| StorageService.instance.updateTopology(); |
| } |
| |
| /** |
| * be careful about just blindly updating ApplicationState.INTERNAL_IP everytime we read the yaml file, |
| * as that can cause connections to get unnecessarily reset (via IESCS.onChange()). |
| */ |
| private void maybeSetApplicationState() |
| { |
| if (localNodeData.dcLocalAddress == null) |
| return; |
| final EndpointState es = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()); |
| if (es == null) |
| return; |
| final VersionedValue vv = es.getApplicationState(ApplicationState.INTERNAL_IP); |
| if ((vv != null && !vv.value.equals(localNodeData.dcLocalAddress.getHostAddress())) |
| || vv == null) |
| { |
| Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, |
| StorageService.instance.valueFactory.internalIP(localNodeData.dcLocalAddress.getHostAddress())); |
| } |
| } |
| |
| /** |
| * Topology data for a node. |
| */ |
| private class NodeData |
| { |
| /** Data center name. */ |
| public String datacenter; |
| /** Rack name. */ |
| public String rack; |
| /** Data-center-local address. */ |
| public InetAddress dcLocalAddress; |
| |
| /** |
| * Returns a simple key-value string representation of this node's data. |
| * |
| * @return a simple key-value string representation of this node's data |
| */ |
| public String toString() |
| { |
| return Objects.toStringHelper(this).add("datacenter", datacenter) |
| .add("rack", rack).add("dcLocalAddress", dcLocalAddress) |
| .toString(); |
| } |
| } |
| |
| /** |
| * Called in preparation for the initiation of the gossip loop. |
| */ |
| @Override |
| public synchronized void gossiperStarting() |
| { |
| gossiperInitialized = true; |
| StorageService.instance.gossipSnitchInfo(); |
| Gossiper.instance.register(new ReconnectableSnitchHelper(this, localNodeData.datacenter, true)); |
| } |
| |
| } |