Cleanup Continuous Ingest (#71)
* Add SLF4J logging
* Some minor cleanup
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index 6c02f7f..fe6cfcf 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -36,8 +36,11 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ContinuousBatchWalker {
+ private static final Logger log = LoggerFactory.getLogger(ContinuousBatchWalker.class);
public static void main(String[] args) throws Exception {
@@ -98,13 +101,13 @@
copy1.removeAll(batch);
copy2.removeAll(rowsSeen);
- System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
- System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
- System.err.println("Extra seen : " + copy1);
- System.err.println("Not seen : " + copy2);
+ log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
+ log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
+ log.info("Extra seen : {}", copy1);
+ log.info("Not seen : {}", copy2);
} else {
- System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count,
- (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
+ log.info("BRQ {} {} {} {} {}", t1, (t2 - t1), rowsSeen.size(), count,
+ (rowsSeen.size() / ((t2 - t1) / 1000.0)));
}
}
@@ -143,7 +146,7 @@
long t2 = System.currentTimeMillis();
- System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count);
+ log.info("FSB {} {} {}", t1, (t2 - t1), count);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 65779d9..c5229f7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -17,6 +17,10 @@
package org.apache.accumulo.testing.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MAX;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MIN;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MAX;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MIN;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,8 +38,6 @@
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
-// import org.apache.accumulo.core.trace.Trace;
-// import org.apache.accumulo.core.trace.TraceSamplers;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.testing.TestProps;
import org.slf4j.Logger;
@@ -62,24 +64,14 @@
return Boolean.parseBoolean(value);
}
- private static int getPauseWaitSec(Properties props, Random rand) {
- int waitMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
- int waitMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
- Preconditions.checkState(waitMax >= waitMin && waitMin > 0);
- if (waitMax == waitMin) {
- return waitMin;
+ private static int getPause(Properties props, Random rand, String minProp, String maxProp) {
+ int min = Integer.parseInt(props.getProperty(minProp));
+ int max = Integer.parseInt(props.getProperty(maxProp));
+ Preconditions.checkState(max >= min && min > 0);
+ if (max == min) {
+ return min;
}
- return (rand.nextInt(waitMax - waitMin) + waitMin);
- }
-
- private static int getPauseDurationSec(Properties props, Random rand) {
- int durationMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MIN));
- int durationMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MAX));
- Preconditions.checkState(durationMax >= durationMin && durationMin > 0);
- if (durationMax == durationMin) {
- return durationMin;
- }
- return (rand.nextInt(durationMax - durationMin) + durationMin);
+ return (rand.nextInt(max - min) + min);
}
private static int getFlushEntries(Properties props) {
@@ -90,11 +82,12 @@
if (pauseEnabled(props)) {
long elapsedNano = System.nanoTime() - lastPauseNs;
if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
- long pauseDurationSec = getPauseDurationSec(props, rand);
+ long pauseDurationSec = getPause(props, rand, CI_INGEST_PAUSE_DURATION_MIN,
+ CI_INGEST_PAUSE_DURATION_MAX);
log.info("PAUSING for " + pauseDurationSec + "s");
Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
lastPauseNs = System.nanoTime();
- pauseWaitSec = getPauseWaitSec(props, rand);
+ pauseWaitSec = getPause(props, rand, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
log.info("INGESTING for " + pauseWaitSec + "s");
}
}
@@ -128,7 +121,6 @@
}
BatchWriter bw = client.createBatchWriter(tableName);
- // bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
Random r = new Random();
@@ -141,12 +133,9 @@
final int flushInterval = getFlushEntries(env.getTestProperties());
final int maxDepth = 25;
- // always want to point back to flushed data. This way the previous item
- // should
- // always exist in accumulo when verifying data. To do this make insert
- // N point
- // back to the row from insert (N - flushInterval). The array below is
- // used to keep
+ // always want to point back to flushed data. This way the previous item should
+ // always exist in accumulo when verifying data. To do this make insert N point
+ // back to the row from insert (N - flushInterval). The array below is used to keep
// track of this.
long[] prevRows = new long[flushInterval];
long[] firstRows = new long[flushInterval];
@@ -163,7 +152,7 @@
Properties testProps = env.getTestProperties();
if (pauseEnabled(testProps)) {
lastPauseNs = System.nanoTime();
- pauseWaitSec = getPauseWaitSec(testProps, r);
+ pauseWaitSec = getPause(testProps, r, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
log.info("PAUSING enabled");
log.info("INGESTING for " + pauseWaitSec + "s");
}
@@ -192,8 +181,7 @@
if (count >= numEntries)
break out;
- // generate subsequent sets of nodes that link to previous set of
- // nodes
+ // generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, r);
@@ -211,8 +199,7 @@
pauseCheck(testProps, r);
}
- // create one big linked list, this makes all of the first inserts
- // point to something
+ // create one big linked list, this makes all of the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv,
ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
@@ -306,9 +293,6 @@
cksum.getValue();
FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
}
-
- // System.out.println("val "+new String(val));
-
return val;
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index d453ec7..27ab4c4 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -32,8 +32,11 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ContinuousScanner {
+ private static final Logger log = LoggerFactory.getLogger(ContinuousScanner.class);
public static void main(String[] args) throws Exception {
@@ -72,9 +75,6 @@
long t2 = System.currentTimeMillis();
- // System.out.println("P1 " +count +" "+((1-delta) *
- // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
-
if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
if (count == 0) {
distance = distance * 10;
@@ -86,12 +86,9 @@
ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
distance = (long) (ratio * distance);
}
-
- // System.out.println("P2 "+delta
- // +" "+numToScan+" "+distance+" "+((double)numToScan/count ));
}
- System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+ log.debug("SCN {} {} {} {}", t1, new String(scanStart, UTF_8), (t2 - t1), count);
if (scannerSleepMs > 0) {
sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index 880643d..14015e3 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -50,7 +50,6 @@
* referenced nodes are defined.
*/
public class ContinuousVerify extends Configured implements Tool {
-
public static final VLongWritable DEF = new VLongWritable(-1);
public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
@@ -75,7 +74,7 @@
if (corrupt < 1000) {
log.error("Bad checksum : " + key);
} else if (corrupt == 1000) {
- System.out.println("Too many bad checksums, not printing anymore!");
+ log.error("Too many bad checksums, not printing anymore!");
}
corrupt++;
return;
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 11291b1..54f9786 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -28,12 +28,13 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-// import org.apache.accumulo.core.trace.Span;
-// import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ContinuousWalk {
+ private static final Logger log = LoggerFactory.getLogger(ContinuousWalk.class);
static class BadChecksumException extends RuntimeException {
private static final long serialVersionUID = 1L;
@@ -66,25 +67,22 @@
values.clear();
long t1 = System.currentTimeMillis();
- // Span span = Trace.on("walk");
- try {
- scanner.setRange(new Range(new Text(row)));
- for (Entry<Key,Value> entry : scanner) {
- validate(entry.getKey(), entry.getValue());
- values.add(entry.getValue());
- }
- } finally {
- // span.stop();
+
+ scanner.setRange(new Range(new Text(row)));
+ for (Entry<Key,Value> entry : scanner) {
+ validate(entry.getKey(), entry.getValue());
+ values.add(entry.getValue());
}
+
long t2 = System.currentTimeMillis();
- System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
+ log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
if (values.size() > 0) {
row = getPrevRow(values.get(r.nextInt(values.size())));
} else {
- System.out.printf("MIS %d %s%n", t1, row);
- System.err.printf("MIS %d %s%n", t1, row);
+ log.debug("MIS {} {}", t1, row);
+ log.debug("MIS {} {}", t1, row);
row = null;
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index fc3440a..a3f1457 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.testing.continuous;
+import static org.apache.accumulo.testing.TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS;
+
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
@@ -25,8 +27,11 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CreateTable {
+ private static final Logger log = LoggerFactory.getLogger(CreateTable.class);
public static void main(String[] args) throws Exception {
@@ -35,19 +40,18 @@
AccumuloClient client = env.getAccumuloClient();
String tableName = env.getAccumuloTableName();
if (client.tableOperations().exists(tableName)) {
- System.err.println("ERROR: Accumulo table '" + tableName + "' already exists");
+ log.error("Accumulo table {} already exists", tableName);
System.exit(-1);
}
- int numTablets = Integer
- .parseInt(env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
+ int numTablets = Integer.parseInt(env.getTestProperty(CI_COMMON_ACCUMULO_NUM_TABLETS));
if (numTablets < 1) {
- System.err.println("ERROR: numTablets < 1");
+ log.error("numTablets < 1");
System.exit(-1);
}
if (env.getRowMin() >= env.getRowMax()) {
- System.err.println("ERROR: min >= max");
+ log.error("min >= max");
System.exit(-1);
}
@@ -70,8 +74,7 @@
client.tableOperations().create(tableName, ntc);
- System.out
- .println("Created Accumulo table '" + tableName + "' with " + numTablets + " tablets");
+ log.info("Created Accumulo table {} with {} tablets", tableName, numTablets);
}
}
@@ -79,7 +82,7 @@
String[] props = env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS).split(" ");
Map<String,String> tableProps = new HashMap<>();
for (String prop : props) {
- System.out.println("prop" + prop);
+ log.debug("prop: {}", prop);
String[] kv = prop.split("=");
tableProps.put(kv[0], kv[1]);
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
index f231294..33ea7ef 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
@@ -29,10 +29,13 @@
import org.apache.accumulo.testing.cli.ClientOpts.TimeConverter;
import org.apache.accumulo.testing.cli.Help;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
public class TimeBinner {
+ private static final Logger log = LoggerFactory.getLogger(TimeBinner.class);
enum Operation {
AVG,
@@ -149,7 +152,7 @@
}
} catch (Exception e) {
- System.err.println("Failed to process line : " + line + " " + e.getMessage());
+ log.error("Failed to process line: {} {}", line, e.getMessage());
}
}
@@ -183,7 +186,7 @@
value = "" + entry.getValue().d;
}
- System.out.println(sdf.format(new Date(entry.getKey())) + " " + value);
+ log.info(sdf.format(new Date(entry.getKey())) + " " + value);
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
index ebf42dc..61d28ea 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
@@ -41,6 +41,8 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.testing.cli.ClientOpts;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
@@ -51,6 +53,7 @@
* have old dates in them.
*/
public class UndefinedAnalyzer {
+ private static final Logger logger = LoggerFactory.getLogger(UndefinedAnalyzer.class);
static class UndefinedNode {
@@ -90,7 +93,7 @@
String uuid = tokens[2];
if (flushes.containsKey(uuid)) {
- System.err.println("WARN Duplicate uuid " + log);
+ logger.error("WARN Duplicate uuid " + log);
return;
}
@@ -101,7 +104,7 @@
}
if (tm == null) {
- System.err.println("WARN Bad ingest log " + log);
+ logger.error("WARN Bad ingest log " + log);
return;
}
@@ -203,16 +206,13 @@
: tablet.substring(pos1 + 1, pos2);
String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
if (tid.equals(tableId)) {
- // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
Date date = sdf.parse(day + " " + time);
- // System.out.println(" "+date);
-
assignments.add(
new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
}
} else if (!tablet.startsWith("!0")) {
- System.err.println("Cannot parse tablet " + tablet);
+ logger.error("Cannot parse tablet {}", tablet);
}
}
}
@@ -317,15 +317,14 @@
}
if (ta == null)
- System.out.println(
- undefinedNode.undef + " " + undefinedNode.ref + " " + uuid + " " + t1 + " " + t2);
+ logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, uuid, t1, t2);
else
- System.out.println(undefinedNode.undef + " " + undefinedNode.ref + " " + ta.tablet
- + " " + ta.server + " " + uuid + " " + t1 + " " + t2);
+ logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, ta.tablet,
+ ta.server, uuid, t1, t2);
}
} else {
- System.out.println(undefinedNode.undef + " " + undefinedNode.ref);
+ logger.debug("{} {}", undefinedNode.undef, undefinedNode.ref);
}
}
}