| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.shared; |
| |
| import java.io.File; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Predicate; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.util.concurrent.Futures; |
| import org.junit.Assert; |
| |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.distributed.api.ICluster; |
| import org.apache.cassandra.distributed.api.IInstance; |
| import org.apache.cassandra.distributed.api.IInstanceConfig; |
| import org.apache.cassandra.distributed.api.IInvokableInstance; |
| import org.apache.cassandra.distributed.api.IMessageFilters; |
| import org.apache.cassandra.distributed.api.NodeToolResult; |
| import org.apache.cassandra.distributed.impl.AbstractCluster; |
| import org.apache.cassandra.distributed.impl.InstanceConfig; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; |
| import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| /** |
| * Utilities for working with jvm-dtest clusters. |
| * |
| * This class is marked as Isolated as it relies on lambdas, which are in a package that is marked as shared, so need to |
| * tell jvm-dtest to not share this class. |
| * |
| * This class should never be called from within the cluster, always in the App ClassLoader. |
| */ |
| @Isolated |
| public class ClusterUtils |
| { |
| /** |
| * Start the instance with the given System Properties, after the instance has started, the properties will be cleared. |
| */ |
| public static <I extends IInstance> I start(I inst, Consumer<WithProperties> fn) |
| { |
| return start(inst, (ignore, prop) -> fn.accept(prop)); |
| } |
| |
| /** |
| * Start the instance with the given System Properties, after the instance has started, the properties will be cleared. |
| */ |
| public static <I extends IInstance> I start(I inst, BiConsumer<I, WithProperties> fn) |
| { |
| try (WithProperties properties = new WithProperties()) |
| { |
| fn.accept(inst, properties); |
| inst.startup(); |
| return inst; |
| } |
| } |
| |
| /** |
| * Stop an instance in a blocking manner. |
| * |
| * The main difference between this and {@link IInstance#shutdown()} is that the wait on the future will catch |
| * the exceptions and throw as runtime. |
| */ |
| public static void stopUnchecked(IInstance i) |
| { |
| Futures.getUnchecked(i.shutdown()); |
| } |
| |
| /** |
| * Stops an instance abruptly. This is done by blocking all messages to/from so all other instances are unable |
| * to communicate, then stopping the instance gracefully. |
| * |
| * The assumption is that hard stopping inbound and outbound messages will apear to the cluster as if the instance |
| * was stopped via kill -9; this does not hold true if the instance is restarted as it knows it was properly shutdown. |
| * |
| * @param cluster to filter messages to |
| * @param inst to shut down |
| */ |
| public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I inst) |
| { |
| // block all messages to/from the node going down to make sure a clean shutdown doesn't happen |
| IMessageFilters.Filter to = cluster.filters().allVerbs().to(inst.config().num()).drop(); |
| IMessageFilters.Filter from = cluster.filters().allVerbs().from(inst.config().num()).drop(); |
| try |
| { |
| stopUnchecked(inst); |
| } |
| finally |
| { |
| from.off(); |
| to.off(); |
| } |
| } |
| |
| /** |
| * Stop all the instances in the cluster. This function is differe than {@link ICluster#close()} as it doesn't |
| * clean up the cluster state, it only stops all the instances. |
| */ |
| public static <I extends IInstance> void stopAll(ICluster<I> cluster) |
| { |
| cluster.stream().forEach(ClusterUtils::stopUnchecked); |
| } |
| |
| /** |
| * Create a new instance and add it to the cluster, without starting it. |
| * |
| * @param cluster to add to |
| * @param dc the instance should be in |
| * @param rack the instance should be in |
| * @param <I> instance type |
| * @return the instance added |
| */ |
| public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster, |
| String dc, String rack) |
| { |
| return addInstance(cluster, dc, rack, ignore -> {}); |
| } |
| |
| /** |
| * Create a new instance and add it to the cluster, without starting it. |
| * |
| * @param cluster to add to |
| * @param dc the instance should be in |
| * @param rack the instance should be in |
| * @param fn function to add to the config before starting |
| * @param <I> instance type |
| * @return the instance added |
| */ |
| public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster, |
| String dc, String rack, |
| Consumer<IInstanceConfig> fn) |
| { |
| Objects.requireNonNull(dc, "dc"); |
| Objects.requireNonNull(rack, "rack"); |
| |
| InstanceConfig config = cluster.newInstanceConfig(); |
| //TODO adding new instances should be cleaner, currently requires you create the cluster with all |
| // instances known about (at least to NetworkTopology and TokenStategy) |
| // this is very hidden, so should be more explicit |
| config.networkTopology().put(config.broadcastAddress(), NetworkTopology.dcAndRack(dc, rack)); |
| |
| fn.accept(config); |
| |
| return cluster.bootstrap(config); |
| } |
| |
| /** |
| * Create and start a new instance that replaces an existing instance. |
| * |
| * The instance will be in the same datacenter and rack as the existing instance. |
| * |
| * @param cluster to add to |
| * @param toReplace instance to replace |
| * @param <I> instance type |
| * @return the instance added |
| */ |
| public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace) |
| { |
| return replaceHostAndStart(cluster, toReplace, ignore -> {}); |
| } |
| |
| /** |
| * Create and start a new instance that replaces an existing instance. |
| * |
| * The instance will be in the same datacenter and rack as the existing instance. |
| * |
| * @param cluster to add to |
| * @param toReplace instance to replace |
| * @param fn lambda to add additional properties |
| * @param <I> instance type |
| * @return the instance added |
| */ |
| public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, |
| IInstance toReplace, |
| Consumer<WithProperties> fn) |
| { |
| IInstanceConfig toReplaceConf = toReplace.config(); |
| I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true)); |
| |
| return start(inst, properties -> { |
| // lower this so the replacement waits less time |
| properties.setProperty("cassandra.broadcast_interval_ms", Long.toString(TimeUnit.SECONDS.toMillis(30))); |
| // default is 30s, lowering as it should be faster |
| properties.setProperty("cassandra.ring_delay_ms", Long.toString(TimeUnit.SECONDS.toMillis(10))); |
| properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, TimeUnit.SECONDS.toMillis(10)); |
| |
| // state which node to replace |
| properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress()); |
| |
| fn.accept(properties); |
| }); |
| } |
| |
| /** |
| * Calls {@link org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list of strings. |
| */ |
| public static List<String> getTokenMetadataTokens(IInvokableInstance inst) |
| { |
| return inst.callOnInstance(() -> |
| StorageService.instance.getTokenMetadata() |
| .sortedTokens().stream() |
| .map(Object::toString) |
| .collect(Collectors.toList())); |
| } |
| |
| public static String getLocalToken(IInvokableInstance inst) |
| { |
| return inst.callOnInstance(() -> { |
| List<String> tokens = new ArrayList<>(); |
| for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddressAndPort())) |
| tokens.add(t.getTokenValue().toString()); |
| |
| assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found"; |
| return tokens.get(0); |
| }); |
| } |
| |
| public static <I extends IInstance> void runAndWaitForLogs(Runnable r, String waitString, AbstractCluster<I> cluster) throws TimeoutException |
| { |
| runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new)); |
| } |
| |
| public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException |
| { |
| long [] marks = new long[instances.length]; |
| for (int i = 0; i < instances.length; i++) |
| marks[i] = instances[i].logs().mark(); |
| r.run(); |
| for (int i = 0; i < instances.length; i++) |
| instances[i].logs().watchFor(marks[i], waitString); |
| } |
| |
| |
| /** |
| * Get the ring from the perspective of the instance. |
| */ |
| public static List<RingInstanceDetails> ring(IInstance inst) |
| { |
| NodeToolResult results = inst.nodetoolResult("ring"); |
| results.asserts().success(); |
| return parseRing(results.getStdout()); |
| } |
| |
| /** |
| * Make sure the target instance is in the ring. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing instance expected in the ring |
| * @return the ring (if target is present) |
| */ |
| public static List<RingInstanceDetails> assertInRing(IInstance instance, IInstance expectedInRing) |
| { |
| String targetAddress = getBroadcastAddressHostString(expectedInRing); |
| List<RingInstanceDetails> ring = ring(instance); |
| Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst(); |
| assertThat(match).as("Not expected to find %s but was found", targetAddress).isPresent(); |
| return ring; |
| } |
| |
| /** |
| * Make sure the target instance's gossip state matches on the source instance |
| * |
| * @param instance instance to check on |
| * @param expectedInRing instance expected in the ring |
| * @param state expected gossip state |
| * @return the ring (if target is present and has expected state) |
| */ |
| public static List<RingInstanceDetails> assertRingState(IInstance instance, IInstance expectedInRing, String state) |
| { |
| String targetAddress = getBroadcastAddressHostString(expectedInRing); |
| List<RingInstanceDetails> ring = ring(instance); |
| List<RingInstanceDetails> match = ring.stream() |
| .filter(d -> d.address.equals(targetAddress)) |
| .collect(Collectors.toList()); |
| assertThat(match) |
| .isNotEmpty() |
| .as("State was expected to be %s but was not", state) |
| .anyMatch(r -> r.state.equals(state)); |
| return ring; |
| } |
| |
| /** |
| * Make sure the target instance is NOT in the ring. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing instance not expected in the ring |
| * @return the ring (if target is not present) |
| */ |
| public static List<RingInstanceDetails> assertNotInRing(IInstance instance, IInstance expectedInRing) |
| { |
| String targetAddress = getBroadcastAddressHostString(expectedInRing); |
| List<RingInstanceDetails> ring = ring(instance); |
| Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst(); |
| Assert.assertEquals("Not expected to find " + targetAddress + " but was found", Optional.empty(), match); |
| return ring; |
| } |
| |
| private static List<RingInstanceDetails> awaitRing(IInstance src, String errorMessage, Predicate<List<RingInstanceDetails>> fn) |
| { |
| List<RingInstanceDetails> ring = null; |
| for (int i = 0; i < 100; i++) |
| { |
| ring = ring(src); |
| if (fn.test(ring)) |
| { |
| return ring; |
| } |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| throw new AssertionError(errorMessage + "\n" + ring); |
| } |
| |
| /** |
| * Wait for the target to be in the ring as seen by the source instance. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing instance to wait for |
| * @return the ring |
| */ |
| public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, IInstance expectedInRing) |
| { |
| return awaitRingJoin(instance, expectedInRing.broadcastAddress().getAddress().getHostAddress()); |
| } |
| |
| /** |
| * Wait for the target to be in the ring as seen by the source instance. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing instance address to wait for |
| * @return the ring |
| */ |
| public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, String expectedInRing) |
| { |
| return awaitRing(instance, "Node " + expectedInRing + " did not join the ring...", ring -> { |
| Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(expectedInRing)).findFirst(); |
| if (match.isPresent()) |
| { |
| RingInstanceDetails details = match.get(); |
| return details.status.equals("Up") && details.state.equals("Normal"); |
| } |
| return false; |
| }); |
| } |
| |
| /** |
| * Wait for the ring to only have instances that are Up and Normal. |
| * |
| * @param src instance to check on |
| * @return the ring |
| */ |
| public static List<RingInstanceDetails> awaitRingHealthy(IInstance src) |
| { |
| return awaitRing(src, "Timeout waiting for ring to become healthy", |
| ring -> |
| ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy)); |
| } |
| |
| /** |
| * Wait for the ring to have the target instance with the provided state. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing to look for |
| * @param state expected |
| * @return the ring |
| */ |
| public static List<RingInstanceDetails> awaitRingState(IInstance instance, IInstance expectedInRing, String state) |
| { |
| return awaitRing(instance, "Timeout waiting for " + expectedInRing + " to have state " + state, |
| ring -> |
| ring.stream() |
| .filter(d -> d.address.equals(getBroadcastAddressHostString(expectedInRing))) |
| .filter(d -> d.state.equals(state)) |
| .findAny().isPresent()); |
| } |
| |
| /** |
| * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function |
| * only relies on the expectedInsts param. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing expected instances in the ring |
| * @return the ring (if condition is true) |
| */ |
| public static List<RingInstanceDetails> assertRingIs(IInstance instance, IInstance... expectedInRing) |
| { |
| return assertRingIs(instance, Arrays.asList(expectedInRing)); |
| } |
| |
| /** |
| * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function |
| * only relies on the expectedInsts param. |
| * |
| * @param instance instance to check on |
| * @param expectedInRing expected instances in the ring |
| * @return the ring (if condition is true) |
| */ |
| public static List<RingInstanceDetails> assertRingIs(IInstance instance, Collection<? extends IInstance> expectedInRing) |
| { |
| Set<String> expectedRingAddresses = expectedInRing.stream() |
| .map(i -> i.config().broadcastAddress().getAddress().getHostAddress()) |
| .collect(Collectors.toSet()); |
| return assertRingIs(instance, expectedRingAddresses); |
| } |
| |
| /** |
| * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function |
| * only relies on the expectedInsts param. |
| * |
| * @param instance instance to check on |
| * @param expectedRingAddresses expected instances addresses in the ring |
| * @return the ring (if condition is true) |
| */ |
| public static List<RingInstanceDetails> assertRingIs(IInstance instance, Set<String> expectedRingAddresses) |
| { |
| List<RingInstanceDetails> ring = ring(instance); |
| Set<String> ringAddresses = ring.stream().map(d -> d.address).collect(Collectors.toSet()); |
| assertThat(ringAddresses) |
| .as("Ring addreses did not match for instance %s", instance) |
| .isEqualTo(expectedRingAddresses); |
| return ring; |
| } |
| |
| private static boolean isRingInstanceDetailsHealthy(RingInstanceDetails details) |
| { |
| return details.status.equals("Up") && details.state.equals("Normal"); |
| } |
| |
| private static List<RingInstanceDetails> parseRing(String str) |
| { |
| // 127.0.0.3 rack0 Up Normal 46.21 KB 100.00% -1 |
| // /127.0.0.1:7012 Unknown ? Normal ? 100.00% -3074457345618258603 |
| Pattern pattern = Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$"); |
| List<RingInstanceDetails> details = new ArrayList<>(); |
| String[] lines = str.split("\n"); |
| for (String line : lines) |
| { |
| Matcher matcher = pattern.matcher(line); |
| if (!matcher.find()) |
| { |
| continue; |
| } |
| details.add(new RingInstanceDetails(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5))); |
| } |
| |
| return details; |
| } |
| |
| private static Map<String, Map<String, String>> awaitGossip(IInstance src, String errorMessage, Predicate<Map<String, Map<String, String>>> fn) |
| { |
| Map<String, Map<String, String>> gossip = null; |
| for (int i = 0; i < 100; i++) |
| { |
| gossip = gossipInfo(src); |
| if (fn.test(gossip)) |
| { |
| return gossip; |
| } |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| throw new AssertionError(errorMessage + "\n" + gossip); |
| } |
| |
| /** |
| * Wait for the target instance to have the desired status. Target status is checked via string contains so works |
| * with 'NORMAL' but also can check tokens or full state. |
| * |
| * @param instance instance to check on |
| * @param expectedInGossip instance to wait for |
| * @param targetStatus for the instance |
| * @return gossip info |
| */ |
| public static Map<String, Map<String, String>> awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus) |
| { |
| return awaitGossip(instance, "Node " + expectedInGossip + " did not match state " + targetStatus, gossip -> { |
| Map<String, String> state = gossip.get(getBroadcastAddressString(expectedInGossip)); |
| if (state == null) |
| return false; |
| String status = state.get("STATUS_WITH_PORT"); |
| if (status == null) |
| status = state.get("STATUS"); |
| if (status == null) |
| return targetStatus == null; |
| return status.contains(targetStatus); |
| }); |
| } |
| |
| /** |
| * Get the gossip information from the node. Currently only address, generation, and heartbeat are returned |
| * |
| * @param inst to check on |
| * @return gossip info |
| */ |
| public static Map<String, Map<String, String>> gossipInfo(IInstance inst) |
| { |
| NodeToolResult results = inst.nodetoolResult("gossipinfo"); |
| results.asserts().success(); |
| return parseGossipInfo(results.getStdout()); |
| } |
| |
| /** |
| * Make sure the gossip info for the specific target has the expected generation and heartbeat |
| * |
| * @param instance to check on |
| * @param expectedInGossip instance to check for |
| * @param expectedGeneration expected generation |
| * @param expectedHeartbeat expected heartbeat |
| */ |
| public static void assertGossipInfo(IInstance instance, |
| InetSocketAddress expectedInGossip, int expectedGeneration, int expectedHeartbeat) |
| { |
| String targetAddress = expectedInGossip.getAddress().toString(); |
| Map<String, Map<String, String>> gossipInfo = gossipInfo(instance); |
| Map<String, String> gossipState = gossipInfo.get(targetAddress); |
| if (gossipState == null) |
| throw new NullPointerException("Unable to find gossip info for " + targetAddress + "; gossip info = " + gossipInfo); |
| Assert.assertEquals(Long.toString(expectedGeneration), gossipState.get("generation")); |
| Assert.assertEquals(Long.toString(expectedHeartbeat), gossipState.get("heartbeat")); //TODO do we really mix these two? |
| } |
| |
| private static Map<String, Map<String, String>> parseGossipInfo(String str) |
| { |
| Map<String, Map<String, String>> map = new HashMap<>(); |
| String[] lines = str.split("\n"); |
| String currentInstance = null; |
| for (String line : lines) |
| { |
| if (line.startsWith("/")) |
| { |
| // start of new instance |
| currentInstance = line; |
| continue; |
| } |
| Objects.requireNonNull(currentInstance); |
| String[] kv = line.trim().split(":", 2); |
| assert kv.length == 2 : "When splitting line '" + line + "' expected 2 parts but not true"; |
| Map<String, String> state = map.computeIfAbsent(currentInstance, ignore -> new HashMap<>()); |
| state.put(kv[0], kv[1]); |
| } |
| |
| return map; |
| } |
| |
| /** |
| * Get the tokens assigned to the instance via config. This method does not work if the instance has learned |
| * or generated its tokens. |
| * |
| * @param instance to get tokens from |
| * @return non-empty list of tokens |
| */ |
| public static List<String> getTokens(IInstance instance) |
| { |
| IInstanceConfig conf = instance.config(); |
| int numTokens = conf.getInt("num_tokens"); |
| Assert.assertEquals("Only single token is supported", 1, numTokens); |
| String token = conf.getString("initial_token"); |
| Assert.assertNotNull("initial_token was not found", token); |
| return Arrays.asList(token); |
| } |
| |
| /** |
| * Get all data directories for the given instance. |
| * |
| * @param instance to get data directories for |
| * @return data directories |
| */ |
| public static List<File> getDataDirectories(IInstance instance) |
| { |
| IInstanceConfig conf = instance.config(); |
| // this isn't safe as it assumes the implementation of InstanceConfig |
| // might need to get smarter... some day... |
| String[] ds = (String[]) conf.get("data_file_directories"); |
| List<File> files = new ArrayList<>(ds.length); |
| for (int i = 0; i < ds.length; i++) |
| files.add(new File(ds[i])); |
| return files; |
| } |
| |
| /** |
| * Get the commit log directory for the given instance. |
| * |
| * @param instance to get the commit log directory for |
| * @return commit log directory |
| */ |
| public static File getCommitLogDirectory(IInstance instance) |
| { |
| IInstanceConfig conf = instance.config(); |
| // this isn't safe as it assumes the implementation of InstanceConfig |
| // might need to get smarter... some day... |
| String d = (String) conf.get("commitlog_directory"); |
| return new File(d); |
| } |
| |
| /** |
| * Get the hints directory for the given instance. |
| * |
| * @param instance to get the hints directory for |
| * @return hints directory |
| */ |
| public static File getHintsDirectory(IInstance instance) |
| { |
| IInstanceConfig conf = instance.config(); |
| // this isn't safe as it assumes the implementation of InstanceConfig |
| // might need to get smarter... some day... |
| String d = (String) conf.get("hints_directory"); |
| return new File(d); |
| } |
| |
| /** |
| * Get the saved caches directory for the given instance. |
| * |
| * @param instance to get the saved caches directory for |
| * @return saved caches directory |
| */ |
| public static File getSavedCachesDirectory(IInstance instance) |
| { |
| IInstanceConfig conf = instance.config(); |
| // this isn't safe as it assumes the implementation of InstanceConfig |
| // might need to get smarter... some day... |
| String d = (String) conf.get("saved_caches_directory"); |
| return new File(d); |
| } |
| |
| /** |
| * Get all writable directories for the given instance. |
| * |
| * @param instance to get directories for |
| * @return all writable directories |
| */ |
| public static List<File> getDirectories(IInstance instance) |
| { |
| List<File> out = new ArrayList<>(); |
| out.addAll(getDataDirectories(instance)); |
| out.add(getCommitLogDirectory(instance)); |
| out.add(getHintsDirectory(instance)); |
| out.add(getSavedCachesDirectory(instance)); |
| return out; |
| } |
| |
| /** |
| * Gets the name of the Partitioner for the given instance. |
| * |
| * @param instance to get partitioner from |
| * @return partitioner name |
| */ |
| public static String getPartitionerName(IInstance instance) |
| { |
| return (String) instance.config().get("partitioner"); |
| } |
| |
| /** |
| * Changes the instance's address to the new address. This method should only be called while the instance is |
| * down, else has undefined behavior. |
| * |
| * @param instance to update address for |
| * @param address to set |
| */ |
| public static void updateAddress(IInstance instance, String address) |
| { |
| updateAddress(instance.config(), address); |
| } |
| |
| /** |
| * Changes the instance's address to the new address. This method should only be called while the instance is |
| * down, else has undefined behavior. |
| * |
| * @param conf to update address for |
| * @param address to set |
| */ |
| private static void updateAddress(IInstanceConfig conf, String address) |
| { |
| for (String key : Arrays.asList("broadcast_address", "listen_address", "broadcast_rpc_address", "rpc_address")) |
| conf.set(key, address); |
| |
| // InstanceConfig caches InetSocketAddress -> InetAddressAndPort |
| // this causes issues as startup now ignores config, so force reset it to pull from conf. |
| ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove the need to null out the cache... |
| conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack())); |
| } |
| |
| /** |
| * Get the broadcast address host address only (ex. 127.0.0.1) |
| */ |
| private static String getBroadcastAddressHostString(IInstance target) |
| { |
| return target.config().broadcastAddress().getAddress().getHostAddress(); |
| } |
| |
| /** |
| * Get the broadcast address in host:port format (ex. 127.0.0.1:7190) |
| */ |
| public static String getBroadcastAddressHostWithPortString(IInstance target) |
| { |
| InetSocketAddress address = target.config().broadcastAddress(); |
| return address.getAddress().getHostAddress() + ":" + address.getPort(); |
| } |
| |
| /** |
| * Get the broadcast address InetAddess string (ex. localhost/127.0.0.1 or /127.0.0.1) |
| */ |
| private static String getBroadcastAddressString(IInstance target) |
| { |
| return target.config().broadcastAddress().getAddress().toString(); |
| } |
| |
| public static final class RingInstanceDetails |
| { |
| private final String address; |
| private final String rack; |
| private final String status; |
| private final String state; |
| private final String token; |
| |
| private RingInstanceDetails(String address, String rack, String status, String state, String token) |
| { |
| this.address = address; |
| this.rack = rack; |
| this.status = status; |
| this.state = state; |
| this.token = token; |
| } |
| |
| public String getAddress() |
| { |
| return address; |
| } |
| |
| public String getRack() |
| { |
| return rack; |
| } |
| |
| public String getStatus() |
| { |
| return status; |
| } |
| |
| public String getState() |
| { |
| return state; |
| } |
| |
| public String getToken() |
| { |
| return token; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| RingInstanceDetails that = (RingInstanceDetails) o; |
| return Objects.equals(address, that.address) && |
| Objects.equals(rack, that.rack) && |
| Objects.equals(status, that.status) && |
| Objects.equals(state, that.state) && |
| Objects.equals(token, that.token); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(address, rack, status, state, token); |
| } |
| |
| public String toString() |
| { |
| return Arrays.asList(address, rack, status, state, token).toString(); |
| } |
| } |
| } |