| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.shared; |
| |
| import org.apache.cassandra.distributed.api.ICluster; |
| import org.apache.cassandra.distributed.api.IInstance; |
| import org.apache.cassandra.distributed.api.IInstanceConfig; |
| import org.apache.cassandra.distributed.api.TokenSupplier; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens; |
| |
| public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>> |
| { |
| public interface Factory<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>> |
| { |
| C newCluster(B builder); |
| } |
| |
| private final Factory<I, C, B> factory; |
| private int nodeCount; |
| private int subnet; |
| private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; |
| private TokenSupplier tokenSupplier; |
| private File root; |
| private Versions.Version version; |
| private Consumer<IInstanceConfig> configUpdater; |
| private ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); |
| private int broadcastPort = 7012; |
| private BiConsumer<ClassLoader, Integer> instanceInitializer = (cl, id) -> {}; |
| |
| public AbstractBuilder(Factory<I, C, B> factory) |
| { |
| this.factory = factory; |
| } |
| |
| public int getNodeCount() { |
| return nodeCount; |
| } |
| |
| public int getSubnet() { |
| return subnet; |
| } |
| |
| public Map<Integer, NetworkTopology.DcAndRack> getNodeIdTopology() { |
| return nodeIdTopology; |
| } |
| |
| public TokenSupplier getTokenSupplier() { |
| return tokenSupplier; |
| } |
| |
| public File getRoot() { |
| return root; |
| } |
| |
| public Versions.Version getVersion() { |
| return version; |
| } |
| |
| public Consumer<IInstanceConfig> getConfigUpdater() { |
| return configUpdater; |
| } |
| |
| public ClassLoader getSharedClassLoader() { |
| return sharedClassLoader; |
| } |
| |
| public int getBroadcastPort() { |
| return broadcastPort; |
| } |
| |
| public BiConsumer<ClassLoader, Integer> getInstanceInitializer() |
| { |
| return instanceInitializer; |
| } |
| |
| public C start() throws IOException |
| { |
| C cluster = createWithoutStarting(); |
| cluster.startup(); |
| return cluster; |
| } |
| |
| public C createWithoutStarting() throws IOException |
| { |
| if (root == null) |
| root = Files.createTempDirectory("dtests").toFile(); |
| |
| if (nodeCount <= 0) |
| throw new IllegalStateException("Cluster must have at least one node"); |
| |
| root.mkdirs(); |
| |
| if (nodeIdTopology == null) |
| nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() |
| .collect(Collectors.toMap(nodeId -> nodeId, |
| nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); |
| |
| // TODO: make token allocation strategy configurable |
| if (tokenSupplier == null) |
| tokenSupplier = evenlyDistributedTokens(nodeCount); |
| |
| return factory.newCluster((B) this); |
| } |
| |
| public B withSharedClassLoader(ClassLoader sharedClassLoader) |
| { |
| this.sharedClassLoader = Objects.requireNonNull(sharedClassLoader, "sharedClassLoader"); |
| return (B) this; |
| } |
| |
| public B withBroadcastPort(int broadcastPort) { |
| this.broadcastPort = broadcastPort; |
| return (B) this; |
| } |
| |
| public B withTokenSupplier(TokenSupplier tokenSupplier) |
| { |
| this.tokenSupplier = tokenSupplier; |
| return (B) this; |
| } |
| |
| public B withSubnet(int subnet) |
| { |
| this.subnet = subnet; |
| return (B) this; |
| } |
| |
| public B withNodes(int nodeCount) |
| { |
| this.nodeCount = nodeCount; |
| return (B) this; |
| } |
| |
| public B withDCs(int dcCount) |
| { |
| return withRacks(dcCount, 1); |
| } |
| |
| public B withRacks(int dcCount, int racksPerDC) |
| { |
| if (nodeCount == 0) |
| throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder"); |
| |
| int totalRacks = dcCount * racksPerDC; |
| int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer |
| return withRacks(dcCount, racksPerDC, nodesPerRack); |
| } |
| |
| public B withRacks(int dcCount, int racksPerDC, int nodesPerRack) |
| { |
| if (nodeIdTopology != null) |
| throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls"); |
| |
| nodeIdTopology = new HashMap<>(); |
| int nodeId = 1; |
| for (int dc = 1; dc <= dcCount; dc++) |
| { |
| for (int rack = 1; rack <= racksPerDC; rack++) |
| { |
| for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++) |
| nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack))); |
| } |
| } |
| // adjust the node count to match the allocatation |
| final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack; |
| if (adjustedNodeCount != nodeCount) |
| { |
| assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count"; |
| System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s", |
| dcCount, racksPerDC, nodesPerRack, adjustedNodeCount)); |
| nodeCount = adjustedNodeCount; |
| } |
| return (B) this; |
| } |
| |
| public B withDC(String dcName, int nodeCount) |
| { |
| return withRack(dcName, rackName(1), nodeCount); |
| } |
| |
| public B withRack(String dcName, String rackName, int nodesInRack) |
| { |
| if (nodeIdTopology == null) |
| { |
| if (nodeCount > 0) |
| throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks"); |
| |
| nodeIdTopology = new HashMap<>(); |
| } |
| for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++) |
| nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); |
| |
| nodeCount += nodesInRack; |
| return (B) this; |
| } |
| |
| // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount |
| public B withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) |
| { |
| if (nodeIdTopology.isEmpty()) |
| throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); |
| |
| IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> { |
| if (nodeIdTopology.get(nodeId) == null) |
| throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId); |
| }); |
| |
| if (nodeCount != nodeIdTopology.size()) |
| { |
| nodeCount = nodeIdTopology.size(); |
| System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount)); |
| } |
| |
| this.nodeIdTopology = new HashMap<>(nodeIdTopology); |
| |
| return (B) this; |
| } |
| |
| public B withRoot(File root) |
| { |
| this.root = root; |
| return (B) this; |
| } |
| |
| public B withVersion(Versions.Version version) |
| { |
| this.version = version; |
| return (B) this; |
| } |
| |
| public B withConfig(Consumer<IInstanceConfig> updater) |
| { |
| this.configUpdater = updater; |
| return (B) this; |
| } |
| |
| public B withInstanceInitializer(BiConsumer<ClassLoader, Integer> instanceInitializer) |
| { |
| this.instanceInitializer = instanceInitializer; |
| return (B) this; |
| } |
| |
| static String dcName(int index) |
| { |
| return "datacenter" + index; |
| } |
| |
| static String rackName(int index) |
| { |
| return "rack" + index; |
| } |
| } |
| |
| |