blob: c88b241eb93256d453d4a505eede41fea57ded56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.performance.tests;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SplitBalancingPT implements PerformanceTest {
private static final Logger LOG = LoggerFactory.getLogger(SplitBalancingPT.class);
private static final String TABLE_NAME = "splitBalancing";
private static final String RESERVED_PREFIX = "~";
private static final int NUM_SPLITS = 1_000;
private static final int MARGIN = 3;
private static final Text TSERVER_ASSIGNED_TABLETS_COL_FAM = new Text("loc");
@Override
public SystemConfiguration getSystemConfig() {
return new SystemConfiguration();
}
@Override
public Report runTest(final Environment env) throws Exception {
AccumuloClient client = env.getClient();
client.tableOperations().create(TABLE_NAME);
client.tableOperations().addSplits(TABLE_NAME, getSplits());
client.instanceOperations().waitForBalance();
int totalTabletServers = client.instanceOperations().getTabletServers().size();
int expectedAllocation = NUM_SPLITS / totalTabletServers;
int min = expectedAllocation - MARGIN;
int max = expectedAllocation + MARGIN;
Report.Builder reportBuilder = Report.builder().id("split_balancing").description(
"Evaluate and verify that when a high number of splits are created, that the tablets are balanced equally among tablet servers.")
.parameter("num_splits", NUM_SPLITS, "The number of splits")
.parameter("num_tservers", totalTabletServers, "The number of tablet servers")
.parameter("tserver_min", min,
"The minimum number of tablets that should be assigned to a tablet server.")
.parameter("tserver_max", max,
"The maximum number of tablets that should be assigned to a tablet server.");
boolean allServersBalanced = true;
Map<String,Integer> tablets = getTablets(client);
for (String tabletServer : tablets.keySet()) {
int count = tablets.get(tabletServer);
boolean balanced = count >= min && count <= max;
allServersBalanced = allServersBalanced & balanced;
reportBuilder.result("size_tserver_" + tabletServer, count,
"Total tablets assigned to tablet server " + tabletServer);
}
return reportBuilder.build();
}
private SortedSet<Text> getSplits() {
SortedSet<Text> splits = new TreeSet<>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new Text(String.valueOf(i)));
}
return splits;
}
private Map<String,Integer> getTablets(final AccumuloClient client) {
Map<String,Integer> tablets = new HashMap<>();
try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
scanner.fetchColumnFamily(TSERVER_ASSIGNED_TABLETS_COL_FAM);
Range range = new Range(null, false, RESERVED_PREFIX, false);
scanner.setRange(range);
for (Map.Entry<Key,Value> entry : scanner) {
String host = entry.getValue().toString();
if (tablets.containsKey(host)) {
tablets.put(host, tablets.get(host) + 1);
} else {
tablets.put(host, 1);
}
}
} catch (Exception e) {
LOG.error("Error occurred during scan:", e);
}
return tablets;
}
}