Miscellaneous improvements to continuous code (#212)
diff --git a/src/main/java/org/apache/accumulo/testing/TestEnv.java b/src/main/java/org/apache/accumulo/testing/TestEnv.java
index 18f6afc..6fd91fa 100644
--- a/src/main/java/org/apache/accumulo/testing/TestEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/TestEnv.java
@@ -18,11 +18,13 @@
package org.apache.accumulo.testing;
import java.lang.management.ManagementFactory;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,6 +39,7 @@
private final Properties clientProps;
private AccumuloClient client = null;
private Configuration hadoopConfig = null;
+ private final SecureRandom random = new SecureRandom();
public TestEnv(String[] args) {
@@ -180,6 +183,10 @@
return Accumulo.newClient().from(clientProps).as(principal, token).build();
}
+ public Random getRandom() {
+ return random;
+ }
+
@Override
public void close() throws Exception {
if (client != null) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
index 685efa1..290b180 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
@@ -85,27 +85,25 @@
// create splits file for KeyRangePartitioner
String splitsFile = bulkDir + "/splits.txt";
- try (AccumuloClient client = env.getAccumuloClient()) {
+ AccumuloClient client = env.getAccumuloClient();
- // make sure splits file is closed before continuing
- try (PrintStream out = new PrintStream(
- new BufferedOutputStream(fs.create(fs.makeQualified(new Path(splitsFile)))))) {
- Collection<Text> splits = client.tableOperations().listSplits(tableName,
- env.getBulkReducers() - 1);
- for (Text split : splits) {
- out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
- }
- job.setNumReduceTasks(splits.size() + 1);
- }
-
- job.setPartitionerClass(KeyRangePartitioner.class);
- KeyRangePartitioner.setSplitFile(job, fs.makeQualified(new Path(splitsFile)).toString());
-
- job.waitForCompletion(true);
- boolean success = job.isSuccessful();
-
- return success ? 0 : 1;
+ // make sure splits file is closed before continuing
+ try (PrintStream out = new PrintStream(
+ new BufferedOutputStream(fs.create(fs.makeQualified(new Path(splitsFile)))))) {
+ Collection<Text> splits = client.tableOperations().listSplits(tableName,
+ env.getBulkReducers() - 1);
+ splits.stream().map(Text::copyBytes).map(split -> Base64.getEncoder().encodeToString(split))
+ .forEach(out::println);
+ job.setNumReduceTasks(splits.size() + 1);
}
+
+ job.setPartitionerClass(KeyRangePartitioner.class);
+ KeyRangePartitioner.setSplitFile(job, fs.makeQualified(new Path(splitsFile)).toString());
+
+ job.waitForCompletion(true);
+ boolean success = job.isSuccessful();
+
+ return success ? 0 : 1;
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index fe6cfcf..5d613b5 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -18,7 +18,7 @@
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import java.util.ArrayList;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -26,6 +26,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
@@ -39,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
public class ContinuousBatchWalker {
private static final Logger log = LoggerFactory.getLogger(ContinuousBatchWalker.class);
@@ -47,26 +50,22 @@
try (ContinuousEnv env = new ContinuousEnv(args)) {
Authorizations auths = env.getRandomAuthorizations();
AccumuloClient client = env.getAccumuloClient();
- Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
- int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
- scanner.setBatchSize(scanBatchSize);
- Random r = new Random();
-
- while (true) {
- BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths);
-
- Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r);
- List<Range> ranges = new ArrayList<>(batch.size());
-
- for (Text row : batch) {
- ranges.add(new Range(row));
+ try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+ auths)) {
+ int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
+ scanner.setBatchSize(scanBatchSize);
+ Duration bwSleep = Duration
+ .ofMillis(Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS)));
+ while (true) {
+ try (BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths)) {
+ Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize,
+ env.getRandom());
+ List<Range> ranges = batch.stream().map(Range::new).collect(Collectors.toList());
+ runBatchScan(scanBatchSize, bs, batch, ranges);
+ }
+ sleepUninterruptibly(bwSleep);
}
-
- runBatchScan(scanBatchSize, bs, batch, ranges);
-
- int bwSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
- sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
}
}
}
@@ -95,16 +94,13 @@
long t2 = System.currentTimeMillis();
if (!rowsSeen.equals(batch)) {
- HashSet<Text> copy1 = new HashSet<>(rowsSeen);
- HashSet<Text> copy2 = new HashSet<>(batch);
+ Set<Text> extraSeen = Sets.difference(rowsSeen, batch);
+ Set<Text> notSeen = Sets.difference(batch, rowsSeen);
- copy1.removeAll(batch);
- copy2.removeAll(rowsSeen);
-
- log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
- log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
- log.info("Extra seen : {}", copy1);
- log.info("Not seen : {}", copy2);
+ log.info("DIF {} {} {}", t1, extraSeen.size(), notSeen.size());
+ log.info("DIF {} {} {}", t1, extraSeen.size(), notSeen.size());
+ log.info("Extra seen : {}", extraSeen);
+ log.info("Not seen : {}", notSeen);
} else {
log.info("BRQ {} {} {} {} {}", t1, (t2 - t1), rowsSeen.size(), count,
(rowsSeen.size() / ((t2 - t1) / 1000.0)));
@@ -124,7 +120,7 @@
}
}
- private static HashSet<Text> rowsToQuery = new HashSet<>();
+ private static final HashSet<Text> rowsToQuery = new HashSet<>();
private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
index 4896026..ed36583 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
@@ -17,10 +17,10 @@
package org.apache.accumulo.testing.continuous;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.TestEnv;
@@ -43,10 +43,8 @@
if (authValue == null || authValue.trim().isEmpty()) {
authList = Collections.singletonList(Authorizations.EMPTY);
} else {
- authList = new ArrayList<>();
- for (String a : authValue.split("\\|")) {
- authList.add(new Authorizations(a.split(",")));
- }
+ authList = Arrays.stream(authValue.split("\\|")).map(a -> a.split(","))
+ .map(Authorizations::new).collect(Collectors.toList());
}
}
return authList;
@@ -56,8 +54,7 @@
* @return random authorization
*/
Authorizations getRandomAuthorizations() {
- Random r = new Random();
- return getAuthList().get(r.nextInt(getAuthList().size()));
+ return getAuthList().get(this.getRandom().nextInt(getAuthList().size()));
}
public long getRowMin() {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 73eb545..4edd581 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -98,8 +98,9 @@
public static void main(String[] args) throws Exception {
- try (ContinuousEnv env = new ContinuousEnv(args);
- AccumuloClient client = env.getAccumuloClient()) {
+ try (ContinuousEnv env = new ContinuousEnv(args)) {
+
+ AccumuloClient client = env.getAccumuloClient();
final long rowMin = env.getRowMin();
final long rowMax = env.getRowMax();
@@ -112,8 +113,6 @@
"Consult the README and create the table before starting ingest.");
}
- Random rand = new Random();
-
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());
@@ -153,7 +152,7 @@
if (pauseEnabled) {
lastPauseNs = System.nanoTime();
- pauseWaitSec = getPause(rand);
+ pauseWaitSec = getPause(env.getRandom());
log.info("PAUSING enabled");
log.info("INGESTING for {}s", pauseWaitSec);
}
@@ -164,17 +163,17 @@
try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
- ColumnVisibility cv = getVisibility(rand);
+ ColumnVisibility cv = getVisibility(env.getRandom());
// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
- long rowLong = genLong(rowMin, rowMax, rand);
+ long rowLong = genLong(rowMin, rowMax, env.getRandom());
byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);
- int cfInt = rand.nextInt(maxColF);
- int cqInt = rand.nextInt(maxColQ);
+ int cfInt = env.getRandom().nextInt(maxColF);
+ int cqInt = env.getRandom().nextInt(maxColQ);
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
@@ -186,11 +185,11 @@
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
- pauseCheck(rand);
+ pauseCheck(env.getRandom());
}
// random chance that the entries will be deleted
- final boolean delete = rand.nextFloat() < deleteProbability;
+ final boolean delete = env.getRandom().nextFloat() < deleteProbability;
// if the previously written entries are scheduled to be deleted
if (delete) {
@@ -205,7 +204,7 @@
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
- pauseCheck(rand);
+ pauseCheck(env.getRandom());
}
} else {
// create one big linked list, this makes all the first inserts point to something
@@ -222,7 +221,7 @@
if (entriesWritten >= numEntries)
break out;
- pauseCheck(rand);
+ pauseCheck(env.getRandom());
}
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
index 4e17bb7..2b63c2c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
@@ -26,9 +26,10 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
-import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@ -81,11 +82,7 @@
@Override
public List<InputSplit> getSplits(JobContext jobContext) {
int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1);
- List<InputSplit> splits = new ArrayList<>();
- for (int i = 0; i < numTask; i++) {
- splits.add(new RandomSplit());
- }
- return splits;
+ return Stream.generate(RandomSplit::new).limit(numTask).collect(Collectors.toList());
}
public static void configure(Configuration conf, String uuid, ContinuousEnv env) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
index 99e2fdf..12e86b7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
@@ -63,7 +63,6 @@
private short max_cf;
private short max_cq;
private Random random;
- private String ingestInstanceId;
private byte[] iiId;
private long count;
@@ -81,7 +80,7 @@
this.max_cq = (short) max_cq;
random = new Random();
- ingestInstanceId = context.getConfiguration().get(CI_ID);
+ final String ingestInstanceId = context.getConfiguration().get(CI_ID);
iiId = ingestInstanceId.getBytes(UTF_8);
count = 0;
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index 27ab4c4..808779f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -19,17 +19,11 @@
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
@@ -42,56 +36,50 @@
try (ContinuousEnv env = new ContinuousEnv(args)) {
- Random r = new Random();
-
- long distance = 1000000000000l;
+ long distance = 1_000_000_000_000L;
AccumuloClient client = env.getAccumuloClient();
- Authorizations auths = env.getRandomAuthorizations();
- Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
int numToScan = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
int scannerSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
double delta = Math.min(.05, .05 / (numToScan / 1000.0));
+ try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+ env.getRandomAuthorizations())) {
+ while (true) {
+ long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance,
+ env.getRandom());
+ byte[] scanStart = ContinuousIngest.genRow(startRow);
+ byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
- while (true) {
- long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r);
- byte[] scanStart = ContinuousIngest.genRow(startRow);
- byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+ scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
- scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+ long t1 = System.currentTimeMillis();
- int count = 0;
- Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ long count = scanner.stream()
+ .peek(entry -> ContinuousWalk.validate(entry.getKey(), entry.getValue())).count();
- long t1 = System.currentTimeMillis();
+ long t2 = System.currentTimeMillis();
- while (iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
- ContinuousWalk.validate(entry.getKey(), entry.getValue());
- count++;
- }
-
- long t2 = System.currentTimeMillis();
-
- if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
- if (count == 0) {
- distance = distance * 10;
- if (distance < 0)
- distance = 1000000000000l;
- } else {
- double ratio = (double) numToScan / count;
- // move ratio closer to 1 to make change slower
- ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
- distance = (long) (ratio * distance);
+ if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
+ if (count == 0) {
+ distance = distance * 10;
+ if (distance < 0)
+ distance = 1_000_000_000_000L;
+ } else {
+ double ratio = (double) numToScan / count;
+ // move ratio closer to 1 to make change slower
+ ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+ distance = (long) (ratio * distance);
+ }
}
- }
- log.debug("SCN {} {} {} {}", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+ log.debug("SCAN - start: {}ms, start row: {}, duration: {}ms, total scanned: {}", t1,
+ new String(scanStart, UTF_8), (t2 - t1), count);
- if (scannerSleepMs > 0) {
- sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+ if (scannerSleepMs > 0) {
+ sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+ }
}
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index 10d224e..05ace17 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -22,8 +22,9 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Random;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.Key;
@@ -119,15 +120,11 @@
}
if (defCount == 0 && refs.size() > 0) {
- StringBuilder sb = new StringBuilder();
- String comma = "";
- for (Long ref : refs) {
- sb.append(comma);
- comma = ",";
- sb.append(new String(ContinuousIngest.genRow(ref), UTF_8));
- }
+ List<String> rowList = refs.stream().map(ContinuousIngest::genRow)
+ .map(row -> new String(row, UTF_8)).collect(Collectors.toList());
+ String rows = String.join(",", rowList);
- context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
+ context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(rows));
context.getCounter(Counts.UNDEFINED).increment(1L);
} else if (defCount > 0 && refs.size() == 0) {
@@ -164,8 +161,8 @@
String table;
if (scanOffline) {
- Random random = new Random();
- clone = tableName + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffL));
+ clone = tableName + "_"
+ + String.format("%016x", (env.getRandom().nextLong() & 0x7fffffffffffffffL));
client.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>());
ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
client.tableOperations().offline(clone);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 54f9786..28446dd 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -51,47 +51,46 @@
AccumuloClient client = env.getAccumuloClient();
- Random r = new Random();
-
ArrayList<Value> values = new ArrayList<>();
int sleepTime = Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
while (true) {
- Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
- env.getRandomAuthorizations());
- String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
+ try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+ env.getRandomAuthorizations())) {
+ String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, env.getRandom());
- while (row != null) {
+ while (row != null) {
- values.clear();
+ values.clear();
- long t1 = System.currentTimeMillis();
+ long t1 = System.currentTimeMillis();
- scanner.setRange(new Range(new Text(row)));
- for (Entry<Key,Value> entry : scanner) {
- validate(entry.getKey(), entry.getValue());
- values.add(entry.getValue());
- }
+ scanner.setRange(new Range(new Text(row)));
+ for (Entry<Key,Value> entry : scanner) {
+ validate(entry.getKey(), entry.getValue());
+ values.add(entry.getValue());
+ }
- long t2 = System.currentTimeMillis();
+ long t2 = System.currentTimeMillis();
- log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
+ log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
- if (values.size() > 0) {
- row = getPrevRow(values.get(r.nextInt(values.size())));
- } else {
- log.debug("MIS {} {}", t1, row);
- log.debug("MIS {} {}", t1, row);
- row = null;
+ if (values.size() > 0) {
+ row = getPrevRow(values.get(env.getRandom().nextInt(values.size())));
+ } else {
+ log.debug("MIS {} {}", t1, row);
+ log.debug("MIS {} {}", t1, row);
+ row = null;
+ }
+
+ if (sleepTime > 0)
+ Thread.sleep(sleepTime);
}
if (sleepTime > 0)
Thread.sleep(sleepTime);
}
-
- if (sleepTime > 0)
- Thread.sleep(sleepTime);
}
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index 831d315..13a118f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -38,8 +38,8 @@
public static void main(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args)) {
-
AccumuloClient client = env.getAccumuloClient();
+
String tableName = env.getAccumuloTableName();
if (client.tableOperations().exists(tableName)) {
log.error("Accumulo table {} already exists", tableName);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
index 33ea7ef..93b46da 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
@@ -23,9 +23,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.testing.cli.ClientOpts.TimeConverter;
import org.apache.accumulo.testing.cli.Help;
@@ -156,39 +154,36 @@
}
}
- TreeMap<Long,DoubleWrapper> sorted = new TreeMap<>(aggregation1);
-
- Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet();
-
- double cumulative = 0;
- for (Entry<Long,DoubleWrapper> entry : es) {
- String value;
+ AtomicReference<Double> cumulative = new AtomicReference<>((double) 0);
+ aggregation1.entrySet().stream().sorted().forEach(entry -> {
+ final String value;
+ final var currentKey = entry.getKey();
+ final var currentValue = entry.getValue();
switch (operation) {
case AMM_HACK1:
case AMM: {
- DoubleWrapper countdw = aggregation2.get(entry.getKey());
- value = "" + (entry.getValue().d / countdw.d) + " " + aggregation3.get(entry.getKey()).d
- + " " + aggregation4.get(entry.getKey()).d;
+ DoubleWrapper countdw = aggregation2.get(currentKey);
+ value = "" + (currentValue.d / countdw.d) + " " + aggregation3.get(currentKey).d + " "
+ + aggregation4.get(currentKey).d;
break;
}
case AVG: {
- DoubleWrapper countdw = aggregation2.get(entry.getKey());
- value = "" + (entry.getValue().d / countdw.d);
+ DoubleWrapper countdw = aggregation2.get(currentKey);
+ value = "" + (currentValue.d / countdw.d);
break;
}
case CUMULATIVE: {
- cumulative += entry.getValue().d;
+ cumulative.updateAndGet(v -> v + currentValue.d);
value = "" + cumulative;
break;
}
default:
- value = "" + entry.getValue().d;
+ value = "" + currentValue.d;
}
- log.info(sdf.format(new Date(entry.getKey())) + " " + value);
- }
-
+ log.info("{} {}", sdf.format(new Date(currentKey)), value);
+ });
}
private static void increment(long time, HashMap<Long,DoubleWrapper> aggregation, double amount) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
index 00bbe56..0708e6f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
@@ -25,13 +25,16 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -172,67 +175,61 @@
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- if (masterLogs != null) {
- for (File masterLog : masterLogs) {
+ if (masterLogs == null)
+ return;
- String line;
- try (BufferedReader reader = Files.newBufferedReader(masterLog.toPath())) {
- while ((line = reader.readLine()) != null) {
- String[] tokens = line.split("\\s+");
- String day = tokens[0];
- String time = tokens[1];
- String tablet = tokens[2];
- String server = tokens[3];
+ for (File masterLog : masterLogs) {
- int pos1 = -1;
- int pos2 = -1;
- int pos3 = -1;
+ String line;
+ try (BufferedReader reader = Files.newBufferedReader(masterLog.toPath())) {
+ while ((line = reader.readLine()) != null) {
+ String[] tokens = line.split("\\s+");
+ String day = tokens[0];
+ String time = tokens[1];
+ String tablet = tokens[2];
+ String server = tokens[3];
- for (int i = 0; i < tablet.length(); i++) {
- if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
- if (pos1 == -1) {
- pos1 = i;
- } else if (pos2 == -1) {
- pos2 = i;
- } else {
- pos3 = i;
- }
+ int pos1 = -1;
+ int pos2 = -1;
+ int pos3 = -1;
+
+ for (int i = 0; i < tablet.length(); i++) {
+ if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+ if (pos1 == -1) {
+ pos1 = i;
+ } else if (pos2 == -1) {
+ pos2 = i;
+ } else {
+ pos3 = i;
}
}
+ }
- if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
- String tid = tablet.substring(0, pos1);
- String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000"
- : tablet.substring(pos1 + 1, pos2);
- String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
- if (tid.equals(tableId)) {
- Date date = sdf.parse(day + " " + time);
- assignments.add(
- new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+ if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+ String tid = tablet.substring(0, pos1);
+ String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000"
+ : tablet.substring(pos1 + 1, pos2);
+ String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+ if (tid.equals(tableId)) {
+ Date date = sdf.parse(day + " " + time);
+ assignments
+ .add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
- }
- } else if (!tablet.startsWith("!0")) {
- logger.error("Cannot parse tablet {}", tablet);
}
+ } else if (!tablet.startsWith("!0")) {
+ logger.error("Cannot parse tablet {}", tablet);
}
}
}
}
}
- TabletAssignment findMostRecentAssignment(String row, long time1, long time2) {
-
- long latest = Long.MIN_VALUE;
- TabletAssignment ret = null;
-
- for (TabletAssignment assignment : assignments) {
- if (assignment.contains(row) && assignment.time <= time2 && assignment.time > latest) {
- latest = assignment.time;
- ret = assignment;
- }
- }
-
- return ret;
+ TabletAssignment findMostRecentAssignment(String row, long time2) {
+ Optional<TabletAssignment> ret1 = assignments.stream()
+ .filter(assignment -> assignment.contains(row))
+ .filter(assignment -> assignment.time <= time2)
+ .max(Comparator.comparingLong(assignment -> assignment.time));
+ return ret1.orElse(null);
}
}
@@ -267,10 +264,9 @@
try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
BatchScanner bscanner = client.createBatchScanner(opts.tableName, opts.auths)) {
- List<Range> refs = new ArrayList<>();
- for (UndefinedNode undefinedNode : undefs)
- refs.add(new Range(new Text(undefinedNode.ref)));
+ List<Range> refs = undefs.stream().map(node -> node.ref).map(Text::new).map(Range::new)
+ .collect(Collectors.toList());
bscanner.setRanges(refs);
@@ -292,11 +288,9 @@
List<String> refVals = refInfo.get(undefinedNode.ref);
if (refVals != null) {
- for (String refVal : refVals) {
+ refVals.stream().map(refVal -> refVal.split(":")).forEach(tokens -> {
TabletAssignment ta = null;
- String[] tokens = refVal.split(":");
-
String uuid = tokens[0];
String count = tokens[1];
@@ -311,7 +305,7 @@
if (times.hasNext()) {
long time1 = times.next();
t1 = sdf.format(new Date(time1));
- ta = tabletHistory.findMostRecentAssignment(undefinedNode.undef, time1, time2);
+ ta = tabletHistory.findMostRecentAssignment(undefinedNode.undef, time2);
}
}
}
@@ -322,7 +316,7 @@
logger.debug("{} {} {} {} {} {} {}", undefinedNode.undef, undefinedNode.ref,
ta.tablet, ta.server, uuid, t1, t2);
- }
+ });
} else {
logger.debug("{} {}", undefinedNode.undef, undefinedNode.ref);
}