fixes file count issue w/ bulk import random walk test (#297)

Running bulk import test failed with the following error.

```
2025-08-29T00:44:43,285 [randomwalk.bulk.BulkMinusOne] ERROR: java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28
-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:591) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeMappingFromFiles(BulkImport.java:499) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.load(BulkImport.java:155) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.testing.randomwalk.bulk.BulkPlusOne.bulkLoadLots(BulkPlusOne.java:98) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.testing.randomwalk.bulk.BulkMinusOne.runLater(BulkMinusOne.java:34) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.testing.randomwalk.bulk.BulkTest.lambda$visit$0(BulkTest.java:33) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?]
        at java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set
to 100
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:585) ~[accumulo-testing-shaded.jar:?]
        ... 13 more
Caused by: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.checkTabletCount(BulkImport.java:627) ~[accumulo-testing-shaded.jar:?]
        at org.apache.accumulo.core.clientImpl.bulk.BulkImport.lambda$computeFileToTabletMappings$7(BulkImport.java:563) ~[accumulo-testing-shaded.jar:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[?:?]
        ... 5 more
```

To avoid this adjusted the tablet setting for the max number of files
that can go to a single tablet to 1000.  Also fixed the following problems
with the test code that made this problem harder to debug.

 * The test schedules lots of background task, but keeps running when
   they fail. Made the test exit when it notices a background test
   failed.  This changes required making a map synchronized.
 * Some log messages related to the test bulk import code logged a
   marker count that correlates the bulk import w/ data in the table.
   This marker count is really important for tracking down bugs w/
   missing data, but only a subet of log messages included it.  Added
   the marker count to all log messages since multiple threads
   log messages are interleaved in test output.

Since the test did not immediataly fail when the bulk import call
failed, I assumed all bulk imports were succesful and accumulo lost
data. Looked down the wrong path for a bit, hoping these changes help
avoid that in the future.
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
index aec30a3..bb78a5b 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java
@@ -29,6 +29,7 @@
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -342,8 +343,9 @@
             log.debug("  " + entry.getKey() + ": " + entry.getValue());
           }
           log.debug("State information");
-          for (String key : new TreeSet<>(state.getMap().keySet())) {
-            Object value = state.getMap().get(key);
+          var stateSnapshot = new TreeMap<>(state.getMap());
+          for (String key : stateSnapshot.keySet()) {
+            Object value = stateSnapshot.get(key);
             String logMsg = "  " + key + ": ";
             if (value == null)
               logMsg += "null";
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
index c79a725..3125e86 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java
@@ -19,8 +19,10 @@
 package org.apache.accumulo.testing.randomwalk;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 /**
@@ -28,7 +30,7 @@
  */
 public class State {
 
-  private final HashMap<String,Object> stateMap = new HashMap<>();
+  private final Map<String,Object> stateMap = Collections.synchronizedMap(new HashMap<>());
   private final List<String> tables = new ArrayList<>();
   private final List<String> namespaces = new ArrayList<>();
   private final List<String> users = new ArrayList<>();
@@ -80,10 +82,12 @@
    * @throws RuntimeException if state object is not present
    */
   public Object get(String key) {
-    if (!stateMap.containsKey(key)) {
-      throw new RuntimeException("State does not contain " + key);
+    synchronized (stateMap) {
+      if (!stateMap.containsKey(key)) {
+        throw new RuntimeException("State does not contain " + key);
+      }
+      return stateMap.get(key);
     }
-    return stateMap.get(key);
   }
 
   public List<String> getTableNames() {
@@ -135,8 +139,10 @@
    *
    * @return state map
    */
-  HashMap<String,Object> getMap() {
-    return stateMap;
+  Map<String,Object> getMap() {
+    synchronized (stateMap) {
+      return Map.copyOf(stateMap);
+    }
   }
 
   /**
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
index 39d3f5f..7287da2 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java
@@ -56,9 +56,12 @@
   private static final Value ONE = new Value("1".getBytes());
 
   static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception {
+    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+    String markerLog = "marker:" + markerColumnQualifier;
+
     final FileSystem fs = (FileSystem) state.get("fs");
     final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID());
-    log.debug("Bulk loading from {}", dir);
+    log.debug("{} bulk loading from {}", markerLog, dir);
     final int parts = env.getRandom().nextInt(10) + 1;
 
     // The set created below should always contain 0. So its very important that zero is first in
@@ -70,9 +73,8 @@
     List<String> printRows =
         startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList());
 
-    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
-    log.debug("preparing bulk files with start rows " + printRows + " last row "
-        + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+    log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog,
+        printRows, String.format(FMT, LOTS - 1));
 
     List<Integer> rows = new ArrayList<>(startRows);
     rows.add(LOTS);
@@ -80,7 +82,7 @@
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.rf", i);
 
-      log.debug("Creating {}", fileName);
+      log.debug("{} creating {}", markerLog, fileName);
       try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
         writer.startDefaultLocalityGroup();
         int start = rows.get(i);
@@ -97,8 +99,7 @@
     env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
         .to(Setup.getTableName()).tableTime(true).load();
     fs.delete(dir, true);
-    log.debug("Finished bulk import, start rows " + printRows + " last row "
-        + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+    log.debug("{} Finished bulk import", markerLog);
   }
 
   @Override
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
index 26e8c20..f584b5d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java
@@ -26,12 +26,22 @@
 
 public abstract class BulkTest extends Test {
 
+  public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure";
+
   @Override
   public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
+
+    if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) {
+      // fail the test early because a previous background task failed
+      throw new IllegalArgumentException(
+          "One or more previous background task failed, aborting test");
+    }
+
     Setup.run(state, () -> {
       try {
         runLater(state, env);
       } catch (Throwable ex) {
+        state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE);
         log.error(ex.toString(), ex);
       }
     });
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
index 3c6d46a..3fd2d8d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.testing.randomwalk.bulk;
 
 import java.net.InetAddress;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
 
@@ -26,6 +27,7 @@
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.iterators.LongCombiner;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -52,14 +54,17 @@
         IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
         SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
         SummingCombiner.setCombineAllColumns(is, true);
-        tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is));
+        var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000");
+
+        tableOps.create(getTableName(),
+            new NewTableConfiguration().attachIterator(is).setProperties(tableProps));
       }
     } catch (TableExistsException ex) {
       // expected if there are multiple walkers
     }
     state.setRandom(env.getRandom());
     state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
-    state.set("bulkImportSuccess", "true");
+    state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
     BulkPlusOne.counter.set(0L);
     ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
         .numCoreThreads(MAX_POOL_SIZE).build();
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
index 8d61149..6db6ab5 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java
@@ -59,15 +59,17 @@
       lastSize = size;
       threadPool.awaitTermination(10, TimeUnit.SECONDS);
     }
-    if (!"true".equals(state.get("bulkImportSuccess"))) {
-      log.info("Not verifying bulk import test due to import failures");
-      return;
+
+    boolean errorFound = false;
+
+    if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) {
+      log.error("One or more background task failed");
+      errorFound = true;
     }
 
     String user = env.getAccumuloClient().whoami();
     Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
     RowIterator rowIter;
-    boolean errorFound = false;
     try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
       scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
       for (Entry<Key,Value> entry : scanner) {