blob: 920c9905cdf43d21e1c43702f298a410143edb05 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.testing.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.testing.TestProps;
import org.apache.accumulo.testing.continuous.ContinuousIngest.RandomGeneratorFactory;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
public class ManySplits {
private static final Logger log = LoggerFactory.getLogger(ManySplits.class);
private static final String NAMESPACE = "manysplits";
public static void main(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args)) {
AccumuloClient client = env.getAccumuloClient();
Properties testProps = env.getTestProperties();
final int tableCount =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TABLE_COUNT));
final long rowMin = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MIN));
final long rowMax = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MAX));
final int maxColF = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CF));
final int maxColQ = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CQ));
final int initialTabletCount =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_TABLETS));
final int initialData =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_WRITE_SIZE));
String initialSplitThresholdStr = testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD);
final long initialSplitThreshold =
ConfigurationTypeHelper.getFixedMemoryAsBytes(initialSplitThresholdStr);
final int splitThresholdReductionFactor =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD_REDUCTION_FACTOR));
final int testRounds =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_TEST_ROUNDS));
// disable deletes for ingest
testProps.setProperty(TestProps.CI_INGEST_DELETE_PROBABILITY, "0.0");
final Random random = env.getRandom();
Preconditions.checkArgument(tableCount > 0, "Test cannot run without any tables");
final List<String> tableNames = IntStream.range(1, tableCount + 1)
.mapToObj(i -> NAMESPACE + ".table" + i).collect(Collectors.toList());
try {
client.namespaceOperations().create(NAMESPACE);
} catch (NamespaceExistsException e) {
log.warn("The namespace '{}' already exists. Continuing with existing namespace.",
NAMESPACE);
}
final String firstTable = tableNames.get(0);
Map<String,String> tableProps =
Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), initialSplitThresholdStr);
log.info("Properties being used to create tables for this test: {}", tableProps);
log.info("Creating initial table: {}", firstTable);
CreateTable.createTable(client, firstTable, initialTabletCount, rowMin, rowMax, tableProps,
Map.of());
log.info("Ingesting {} entries into first table, {}.", initialData, firstTable);
var splitSupplier = ContinuousIngest.createSplitSupplier(client, firstTable);
var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random);
var batchWriterFactory =
ContinuousIngest.BatchWriterFactory.create(client, env, splitSupplier);
ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps,
maxColF, maxColQ, initialData, false, random);
client.tableOperations().flush(firstTable);
// clone tables instead of ingesting into each. it's a lot quicker
log.info("Creating {} more tables by cloning the first", tableCount - 1);
tableNames.stream().parallel().skip(1).forEach(tableName -> {
try {
client.tableOperations().clone(firstTable, tableName, true, null, null);
} catch (TableExistsException e) {
log.warn(
"table {} already exists. Continuing with existing table. Previous data will affect splits",
tableName);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
StringBuilder testResults = new StringBuilder();
testResults.append("Test results:\n");
testResults.append("Total test rounds: ").append(testRounds).append("\n");
testResults.append("Table count: ").append(tableCount).append("\n");
SECONDS.sleep(5);
// main loop
// reduce the split threshold then wait for the expected file size per tablet to be reached
long previousSplitThreshold = initialSplitThreshold;
for (int i = 0; i < testRounds; i++) {
// apply the reduction factor to the previous threshold
final long splitThreshold = previousSplitThreshold / splitThresholdReductionFactor;
final String splitThresholdStr = bytesToMemoryString(splitThreshold);
final int totalSplitCountBefore = getTotalSplitCount(client, tableNames);
log.info("Changing split threshold on all tables from {} to {}",
bytesToMemoryString(previousSplitThreshold), splitThresholdStr);
long beforeThresholdUpdate = System.nanoTime();
// update the split threshold on all tables
tableNames.stream().parallel().forEach(tableName -> {
try {
client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
splitThresholdStr);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info("Waiting for each tablet to have a sum file size <= {}", splitThresholdStr);
// wait for all tablets to reach the expected sum file size
tableNames.stream().parallel().forEach(tableName -> {
long elapsedMillis = 0;
long sleepMillis = SECONDS.toMillis(1);
try {
// wait for each tablet to reach the expected sum file size
while (true) {
Collection<Long> tabletFileSizes = getTabletFileSizes(client, tableName).values();
// filter out the tablets that are already the expected size
Set<Long> offendingTabletSizes =
tabletFileSizes.stream().filter(tabletFileSize -> tabletFileSize > splitThreshold)
.collect(Collectors.toSet());
// if all tablets are good, move on
if (offendingTabletSizes.isEmpty()) {
break;
}
elapsedMillis += sleepMillis;
// log every 3 seconds
if (elapsedMillis % SECONDS.toMillis(3) == 0) {
double averageFileSize =
offendingTabletSizes.stream().mapToLong(l -> l).average().orElse(0);
long diff = (long) (averageFileSize - splitThreshold);
log.info(
"{} tablets have file sizes not yet <= {} on table {}. Diff of avg offending file(s): {}",
offendingTabletSizes.size(), splitThresholdStr, tableName,
bytesToMemoryString(diff));
}
MILLISECONDS.sleep(sleepMillis);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
long timeTakenNanos = System.nanoTime() - beforeThresholdUpdate;
long seconds = NANOSECONDS.toSeconds(timeTakenNanos);
long millis = NANOSECONDS.toMillis(timeTakenNanos);
final int splitCountAfter = getTotalSplitCount(client, tableNames);
final int splitCountThisRound = splitCountAfter - totalSplitCountBefore;
log.info(
"Time taken for all tables to reach expected total file size ({}): {} seconds ({}ms)",
splitThresholdStr, seconds, millis);
testResults.append("Test round ").append(i).append(":\n");
testResults.append("TABLE_SPLIT_THRESHOLD ")
.append(bytesToMemoryString(previousSplitThreshold)).append(" -> ")
.append(splitThresholdStr).append("\n");
testResults.append("Splits count: ").append(totalSplitCountBefore).append(" -> ")
.append(splitCountAfter).append("\n");
String splitsPerSecond = String.format("%.2f", (double) splitCountThisRound / seconds);
testResults.append("Splits per second: ").append(splitsPerSecond).append("\n");
previousSplitThreshold = splitThreshold;
}
log.info("Test completed successfully.");
log.info(testResults.toString());
log.info("Deleting tables");
tableNames.stream().parallel().forEach(tableName -> {
try {
client.tableOperations().delete(tableName);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info("Deleting namespace");
client.namespaceOperations().delete(NAMESPACE);
}
}
/**
* @return a map of tablets to the sum of their file size
*/
private static Map<Text,Long> getTabletFileSizes(AccumuloClient client, String tableName)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName));
try (Scanner scanner = client.createScanner("accumulo.metadata")) {
scanner.fetchColumnFamily("file");
scanner.setRange(getMetaRangeForTable(tableId.canonical()));
Map<Text,Long> result = new HashMap<>();
for (var entry : scanner) {
String encodedDFV = new String(entry.getValue().get(), UTF_8);
String[] ba = encodedDFV.split(",", 2);
long tabletFileSize = Long.parseLong(ba[0]);
result.merge(entry.getKey().getRow(), tabletFileSize, Long::sum);
}
return result;
}
}
public static String bytesToMemoryString(long bytes) {
if (bytes < 1024) {
return bytes + "B"; // Bytes
} else if (bytes < 1024 * 1024) {
return (bytes / 1024) + "K"; // Kilobytes
} else if (bytes < 1024 * 1024 * 1024) {
return (bytes / (1024 * 1024)) + "M"; // Megabytes
} else {
return (bytes / (1024 * 1024 * 1024)) + "G"; // Gigabytes
}
}
private static Range getMetaRangeForTable(String tableId) {
return new Range(tableId + ";", false, tableId + "<", true);
}
/**
* @return the total number of splits across all given tables
*/
private static int getTotalSplitCount(AccumuloClient client, List<String> tableNames) {
return tableNames.stream().parallel().mapToInt(tableName -> {
try {
return client.tableOperations().listSplits(tableName).size();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).sum();
}
}