Merge branch '2.1'
diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 7e4bc91..8faab5d 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -44,18 +44,9 @@
# Format: a,b|a,b,c|c
test.ci.common.auths=
# Accumulo tserver properties to set when creating a table
-test.ci.common.accumulo.server.props=\
-tserver.compaction.major.service.cs1.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \
-tserver.compaction.major.service.cs1.planner.opts.executors=\
-[{"name":"small","type":"internal","maxSize":"16M","numThreads":8},\
-{"name":"medium","type":"internal","maxSize":"128M","numThreads":4},\
-{"name":"large","type":"internal","numThreads":2}]
-
+test.ci.common.accumulo.server.props=
# Accumulo table properties to set when creating table
-test.ci.common.accumulo.table.props=\
-table.compaction.dispatcher=org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher \
-table.compaction.dispatcher.opts.service=cs1
-
+test.ci.common.accumulo.table.props=
# Ingest
# ------
# Number of entries each ingest client should write
@@ -93,6 +84,11 @@
# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1
+# If set to a path in hdfs will use bulk import instead of batch writer to ingest data
+test.ci.ingest.bulk.workdir=
+# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating
+# rfiles and importing them.
+test.ci.ingest.bulk.memory.limit=512000000
# Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution.
test.ci.ingest.zipfian.enabled=true
# Minimum size to insert into the value when Zipfian distribution is enabled
@@ -101,11 +97,6 @@
test.ci.ingest.zipfian.max.size=10000
# Exponent of the Zipfian distribution
test.ci.ingest.zipfian.exponent=1.5
-# If set to a path in hdfs will use bulk import instead of batch writer to ingest data
-test.ci.ingest.bulk.workdir=
-# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating
-# rfiles and importing them.
-test.ci.ingest.bulk.memory.limit=512000000
# Batch walker
# ------------
diff --git a/contrib/terraform-testing-infrastructure/modules/config-files/templates/cluster.yaml.tftpl b/contrib/terraform-testing-infrastructure/modules/config-files/templates/cluster.yaml.tftpl
index 4919b3f..547610f 100644
--- a/contrib/terraform-testing-infrastructure/modules/config-files/templates/cluster.yaml.tftpl
+++ b/contrib/terraform-testing-infrastructure/modules/config-files/templates/cluster.yaml.tftpl
@@ -27,27 +27,29 @@
- ${manager_ip}
tserver:
+ default:
%{ for ip in worker_ips ~}
- - ${ip}
+ - ${ip}
%{ endfor ~}
sserver:
- - default:
+ default:
%{ for ip in worker_ips ~}
- - ${ip}
+ - ${ip}
%{ endfor ~}
-compaction:
- coordinator:
- - ${manager_ip}
- compactor:
- - q1:
+compactor:
+ accumulo_meta:
%{ for ip in worker_ips ~}
- - ${ip}
+ - ${ip}
%{ endfor ~}
- - q2:
+ user_small:
%{ for ip in worker_ips ~}
- - ${ip}
+ - ${ip}
+%{ endfor ~}
+ user_large:
+%{ for ip in worker_ips ~}
+ - ${ip}
%{ endfor ~}
#
diff --git a/pom.xml b/pom.xml
index b187960..ad6d973 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,23 +28,23 @@
</parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-testing</artifactId>
- <version>2.1.4-SNAPSHOT</version>
+ <version>4.0.0-SNAPSHOT</version>
<name>Apache Accumulo Testing</name>
<description>Testing tools for Apache Accumulo</description>
<properties>
- <accumulo.version>2.1.4</accumulo.version>
+ <accumulo.version>4.0.0-SNAPSHOT</accumulo.version>
<!-- prevent introduction of new compiler warnings -->
<maven.compiler.failOnWarning>true</maven.compiler.failOnWarning>
- <maven.compiler.release>11</maven.compiler.release>
- <maven.compiler.source>11</maven.compiler.source>
- <maven.compiler.target>11</maven.compiler.target>
+ <maven.compiler.release>17</maven.compiler.release>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
<maven.javadoc.failOnWarnings>true</maven.javadoc.failOnWarnings>
<maven.site.deploy.skip>true</maven.site.deploy.skip>
<maven.site.skip>true</maven.site.skip>
<!-- versions-maven-plugin ignore patterns for snapshots, alpha, beta, milestones, and release candidates -->
<maven.version.ignore>.+-SNAPSHOT,(?i).*(alpha|beta)[0-9.-]*,(?i).*[.-](m|rc)[0-9]+</maven.version.ignore>
- <minimalJavaBuildVersion>11</minimalJavaBuildVersion>
- <minimalMavenBuildVersion>3.6.0</minimalMavenBuildVersion>
+ <minimalJavaBuildVersion>17</minimalJavaBuildVersion>
+ <minimalMavenBuildVersion>3.9</minimalMavenBuildVersion>
<!-- timestamp for reproducible outputs, updated on release by the release plugin -->
<project.build.outputTimestamp>2021-12-15T00:00:00Z</project.build.outputTimestamp>
</properties>
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 6cedf44..62cad5d 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -86,7 +86,6 @@
if (maxTablets == Integer.MAX_VALUE) {
return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random);
} else {
- var tableName = env.getAccumuloTableName();
return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier,
random);
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
index a6fa97d..394d28b 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
@@ -143,8 +143,7 @@
byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
- @SuppressWarnings("deprecation")
- byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
+ byte[] cv = visibilities.get(random.nextInt(visibilities.size())).getExpression();
if (cksum != null) {
cksum.update(row);
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
index 48d79b1..1ab20a6 100644
--- a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
@@ -25,6 +25,7 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -60,7 +61,8 @@
client.tableOperations().addSplits(TABLE_NAME, getSplits());
client.instanceOperations().waitForBalance();
- int totalTabletServers = client.instanceOperations().getTabletServers().size();
+ int totalTabletServers =
+ client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
int expectedAllocation = NUM_SPLITS / totalTabletServers;
int min = expectedAllocation - MARGIN;
int max = expectedAllocation + MARGIN;
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
index aec30a3..bb78a5b 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
@@ -29,6 +29,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -342,8 +343,9 @@
log.debug(" " + entry.getKey() + ": " + entry.getValue());
}
log.debug("State information");
- for (String key : new TreeSet<>(state.getMap().keySet())) {
- Object value = state.getMap().get(key);
+ var stateSnapshot = new TreeMap<>(state.getMap());
+ for (String key : stateSnapshot.keySet()) {
+ Object value = stateSnapshot.get(key);
String logMsg = " " + key + ": ";
if (value == null)
logMsg += "null";
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
index c79a725..3125e86 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
@@ -19,8 +19,10 @@
package org.apache.accumulo.testing.randomwalk;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
/**
@@ -28,7 +30,7 @@
*/
public class State {
- private final HashMap<String,Object> stateMap = new HashMap<>();
+ private final Map<String,Object> stateMap = Collections.synchronizedMap(new HashMap<>());
private final List<String> tables = new ArrayList<>();
private final List<String> namespaces = new ArrayList<>();
private final List<String> users = new ArrayList<>();
@@ -80,10 +82,12 @@
* @throws RuntimeException if state object is not present
*/
public Object get(String key) {
- if (!stateMap.containsKey(key)) {
- throw new RuntimeException("State does not contain " + key);
+ synchronized (stateMap) {
+ if (!stateMap.containsKey(key)) {
+ throw new RuntimeException("State does not contain " + key);
+ }
+ return stateMap.get(key);
}
- return stateMap.get(key);
}
public List<String> getTableNames() {
@@ -135,8 +139,10 @@
*
* @return state map
*/
- HashMap<String,Object> getMap() {
- return stateMap;
+ Map<String,Object> getMap() {
+ synchronized (stateMap) {
+ return Map.copyOf(stateMap);
+ }
}
/**
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
index 39d3f5f..7287da2 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
@@ -56,9 +56,12 @@
private static final Value ONE = new Value("1".getBytes());
static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception {
+ String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+ String markerLog = "marker:" + markerColumnQualifier;
+
final FileSystem fs = (FileSystem) state.get("fs");
final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID());
- log.debug("Bulk loading from {}", dir);
+ log.debug("{} bulk loading from {}", markerLog, dir);
final int parts = env.getRandom().nextInt(10) + 1;
// The set created below should always contain 0. So its very important that zero is first in
@@ -70,9 +73,8 @@
List<String> printRows =
startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList());
- String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
- log.debug("preparing bulk files with start rows " + printRows + " last row "
- + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+ log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog,
+ printRows, String.format(FMT, LOTS - 1));
List<Integer> rows = new ArrayList<>(startRows);
rows.add(LOTS);
@@ -80,7 +82,7 @@
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);
- log.debug("Creating {}", fileName);
+ log.debug("{} creating {}", markerLog, fileName);
try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
writer.startDefaultLocalityGroup();
int start = rows.get(i);
@@ -97,8 +99,7 @@
env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
.to(Setup.getTableName()).tableTime(true).load();
fs.delete(dir, true);
- log.debug("Finished bulk import, start rows " + printRows + " last row "
- + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+ log.debug("{} Finished bulk import", markerLog);
}
@Override
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
index 26e8c20..f584b5d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
@@ -26,12 +26,22 @@
public abstract class BulkTest extends Test {
+ public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure";
+
@Override
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
+
+ if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) {
+ // fail the test early because a previous background task failed
+ throw new IllegalArgumentException(
+ "One or more previous background task failed, aborting test");
+ }
+
Setup.run(state, () -> {
try {
runLater(state, env);
} catch (Throwable ex) {
+ state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE);
log.error(ex.toString(), ex);
}
});
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java
index 8ef6567..186cfbb 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java
@@ -21,6 +21,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.slf4j.Logger;
@@ -37,7 +38,7 @@
final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
final AccumuloClient client = env.getAccumuloClient();
- int numTservers = client.instanceOperations().getTabletServers().size();
+ int numTservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
if (!shouldQueue(queuedThreads, numTservers)) {
log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
index 3c6d46a..3fd2d8d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.testing.randomwalk.bulk;
import java.net.InetAddress;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
@@ -26,6 +27,7 @@
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.iterators.LongCombiner;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -52,14 +54,17 @@
IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
SummingCombiner.setCombineAllColumns(is, true);
- tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is));
+ var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000");
+
+ tableOps.create(getTableName(),
+ new NewTableConfiguration().attachIterator(is).setProperties(tableProps));
}
} catch (TableExistsException ex) {
// expected if there are multiple walkers
}
state.setRandom(env.getRandom());
state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
- state.set("bulkImportSuccess", "true");
+ state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
BulkPlusOne.counter.set(0L);
ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
.numCoreThreads(MAX_POOL_SIZE).build();
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
index 8d61149..6db6ab5 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
@@ -59,15 +59,17 @@
lastSize = size;
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
- if (!"true".equals(state.get("bulkImportSuccess"))) {
- log.info("Not verifying bulk import test due to import failures");
- return;
+
+ boolean errorFound = false;
+
+ if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) {
+ log.error("One or more background task failed");
+ errorFound = true;
}
String user = env.getAccumuloClient().whoami();
Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
RowIterator rowIter;
- boolean errorFound = false;
try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
for (Entry<Key,Value> entry : scanner) {
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
index 9c86e01..c697851 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
@@ -57,30 +57,12 @@
final Property TSERV_READ_AHEAD_MAXCONCURRENT_deprecated =
Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
- @SuppressWarnings("deprecation")
- final Property TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN_deprecated =
- Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN;
-
- @SuppressWarnings("deprecation")
- final Property TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS_deprecated =
- Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS;
-
- @SuppressWarnings("deprecation")
- final Property TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN_deprecated =
- Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN;
-
- @SuppressWarnings("deprecation")
- final Property TABLE_MINC_COMPACT_IDLETIME_deprecated = Property.TABLE_MINC_COMPACT_IDLETIME;
-
// @formatter:off
final Setting[] settings = {
s(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, 1, 10),
s(Property.TSERV_DATACACHE_SIZE, 0, 1000000000L),
s(Property.TSERV_INDEXCACHE_SIZE, 0, 1000000000L),
s(Property.TSERV_CLIENT_TIMEOUT, 100, 10000),
- s(TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS_deprecated, 1, 10),
- s(Property.TSERV_MAJC_DELAY, 100, 10000),
- s(TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN_deprecated, 3, 100),
s(Property.TSERV_MINC_MAXCONCURRENT, 1, 10),
s(Property.TSERV_DEFAULT_BLOCKSIZE, 100000, 10000000L),
s(Property.TSERV_MAX_IDLE, 10000, 500 * 1000),
@@ -94,10 +76,8 @@
s(Property.TSERV_MINTHREADS, 1, 100),
s(Property.TSERV_SESSION_MAXIDLE, 100, 5 * 60 * 1000),
s(Property.TSERV_WAL_SORT_BUFFER_SIZE, 1024 * 1024, 1024 * 1024 * 1024L),
- s(TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN_deprecated, 5, 100),
s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L),
s(Property.MANAGER_BULK_TIMEOUT, 10, 600),
- s(Property.MANAGER_FATE_THREADPOOL_SIZE, 1, 100),
s(Property.MANAGER_RECOVERY_DELAY, 0, 100),
s(Property.MANAGER_LEASE_RECOVERY_WAITING_PERIOD, 0, 10),
s(Property.MANAGER_THREADCHECK, 100, 10000),
@@ -106,7 +86,6 @@
final Setting[] tableSettings = {
s(Property.TABLE_MAJC_RATIO, 1, 10),
s(Property.TABLE_SPLIT_THRESHOLD, 10 * 1024, 10L * 1024 * 1024 * 1024),
- s(TABLE_MINC_COMPACT_IDLETIME_deprecated, 100, 100 * 60 * 60 * 1000L),
s(Property.TABLE_SCAN_MAXMEM, 10 * 1024, 10 * 1024 * 1024),
s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, 10 * 1024, 10 * 1024 * 1024L),
s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, 10 * 1024, 10 * 1024 * 1024L),