blob: 6f6adfb54c1dd31103eb69ceeb099160bec77601 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.shared;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
public abstract class Builder<I extends IInstance, C extends ICluster> {
private final int BROADCAST_PORT = 7012;
public interface Factory<I extends IInstance, C extends ICluster> {
C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
}
private final Factory<I, C> factory;
private int nodeCount;
private int subnet;
private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private TokenSupplier tokenSupplier;
private File root;
private Versions.Version version;
private Consumer<IInstanceConfig> configUpdater;
public Builder(Factory<I, C> factory) {
this.factory = factory;
}
public C start() throws IOException {
C cluster = createWithoutStarting();
cluster.startup();
return cluster;
}
public C createWithoutStarting() throws IOException {
if (root == null)
root = Files.createTempDirectory("dtests").toFile();
if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");
if (nodeIdTopology == null) {
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
}
root.mkdirs();
ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
List<IInstanceConfig> configs = new ArrayList<>();
// TODO: make token allocation strategy configurable
if (tokenSupplier == null)
tokenSupplier = evenlyDistributedTokens(nodeCount);
for (int i = 0; i < nodeCount; ++i) {
int nodeNum = i + 1;
configs.add(createInstanceConfig(nodeNum));
}
return factory.newCluster(root, version, configs, sharedClassLoader);
}
public IInstanceConfig newInstanceConfig(C cluster) {
return createInstanceConfig(cluster.size() + 1);
}
protected IInstanceConfig createInstanceConfig(int nodeNum) {
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
long token = tokenSupplier.token(nodeNum);
NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology);
IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
if (configUpdater != null)
configUpdater.accept(config);
return config;
}
protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);
public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) {
this.tokenSupplier = tokenSupplier;
return this;
}
public Builder<I, C> withSubnet(int subnet) {
this.subnet = subnet;
return this;
}
public Builder<I, C> withNodes(int nodeCount) {
this.nodeCount = nodeCount;
return this;
}
public Builder<I, C> withDCs(int dcCount) {
return withRacks(dcCount, 1);
}
public Builder<I, C> withRacks(int dcCount, int racksPerDC) {
if (nodeCount == 0)
throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
int totalRacks = dcCount * racksPerDC;
int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
return withRacks(dcCount, racksPerDC, nodesPerRack);
}
public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) {
if (nodeIdTopology != null)
throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
nodeIdTopology = new HashMap<>();
int nodeId = 1;
for (int dc = 1; dc <= dcCount; dc++) {
for (int rack = 1; rack <= racksPerDC; rack++) {
for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
}
}
// adjust the node count to match the allocatation
final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
if (adjustedNodeCount != nodeCount) {
assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s",
dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
nodeCount = adjustedNodeCount;
}
return this;
}
public Builder<I, C> withDC(String dcName, int nodeCount) {
return withRack(dcName, rackName(1), nodeCount);
}
public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) {
if (nodeIdTopology == null) {
if (nodeCount > 0)
throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
nodeIdTopology = new HashMap<>();
}
for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
nodeCount += nodesInRack;
return this;
}
// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) {
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
if (nodeIdTopology.get(nodeId) == null)
throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
});
if (nodeCount != nodeIdTopology.size()) {
nodeCount = nodeIdTopology.size();
System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount));
}
this.nodeIdTopology = new HashMap<>(nodeIdTopology);
return this;
}
public Builder<I, C> withRoot(File root) {
this.root = root;
return this;
}
public Builder<I, C> withVersion(Versions.Version version) {
this.version = version;
return this;
}
public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) {
this.configUpdater = updater;
return this;
}
static String dcName(int index)
{
return "datacenter" + index;
}
static String rackName(int index)
{
return "rack" + index;
}
}