Merge branch '2.1'
diff --git a/bin/cingest b/bin/cingest
index 4ef312c..26b20d4 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -39,6 +39,7 @@
manysplits Repeatedly lowers the split threshold on a table to create
many splits in order to test split performance
bulk Create RFiles in a Map Reduce job and calls importDirectory if successful
+ corrupt Corrupts the first entry after the minimum row. Use -o test.ci.ingest.row.min to change the minimum.
EOF
}
@@ -81,6 +82,9 @@
fi
ci_main="${ci_package}.BulkIngest"
;;
+ corrupt)
+ ci_main="${ci_package}.CorruptEntry"
+ ;;
*)
echo "Unknown application: $1"
print_usage
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 94a53bb..0dc13f5 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -46,6 +46,9 @@
super(msg);
}
+ public BadChecksumException(String msg, Exception nfe) {
+ super(msg, nfe);
+ }
}
public static void main(String[] args) throws Exception {
@@ -154,7 +157,7 @@
return null;
}
- private static int getChecksumOffset(byte[] val) {
+ static int getChecksumOffset(byte[] val) {
if (val[val.length - 1] != ':') {
if (val[val.length - 9] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));
@@ -169,7 +172,12 @@
if (ckOff < 0)
return;
- long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
+ long storedCksum;
+ try {
+ storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
+ } catch (NumberFormatException nfe) {
+ throw new BadChecksumException("Checksum invalid " + key + " " + value, nfe);
+ }
CRC32 cksum = new CRC32();
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java
new file mode 100644
index 0000000..bd71fa1
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+
+public class CorruptEntry {
+ public static void main(String[] args) throws Exception {
+ try (ContinuousEnv env = new ContinuousEnv(args);
+ AccumuloClient client = env.getAccumuloClient();
+ var scanner = client.createScanner(env.getAccumuloTableName());
+ var writer = client.createBatchWriter(env.getAccumuloTableName())) {
+ final long rowMin = env.getRowMin();
+
+ var startRow = ContinuousIngest.genRow(rowMin);
+ new Range(new Text(startRow), null);
+ scanner.setRange(new Range(new Text(startRow), null));
+ var iter = scanner.iterator();
+ if (iter.hasNext()) {
+ var entry = iter.next();
+ byte[] val = entry.getValue().get();
+ int offset = ContinuousWalk.getChecksumOffset(val);
+ if (offset >= 0) {
+ for (int i = 0; i < 8; i++) {
+ if (val[i + offset] == 'f' || val[i + offset] == 'F') {
+ // in the case of an f hex char, set the hex char to 0
+ val[i + offset] = '0';
+
+ } else {
+ // increment the hex char in the checcksum
+ val[i + offset]++;
+ }
+ }
+ Key key = entry.getKey();
+ Mutation m = new Mutation(key.getRow());
+ m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+ .visibility(key.getColumnVisibility()).put(val);
+ writer.addMutation(m);
+ writer.flush();
+ System.out.println("Corrupted checksum value on key " + key);
+ }
+ } else {
+ throw new IllegalArgumentException("No entry found after " + new String(startRow, UTF_8));
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
index 281b183..9259fa0 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
@@ -187,7 +187,7 @@
long t2 = System.nanoTime();
log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
- loadPlan.getDestinations().size(), mutations.size(), memUsed,
+ loadPlan.getDestinations().size(), keysValues.size(), memUsed,
TimeUnit.NANOSECONDS.toMillis(t2 - t1));
fileSystem.delete(tmpDir, true);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java
new file mode 100644
index 0000000..500e7a5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+/**
+ * Validate the checksum on each entry read in the continuous ingest table
+ */
+public class ValidatingIterator extends WrappingIterator {
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+ throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+ if (super.hasTop()) {
+ ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
+ }
+ }
+
+ public void next() throws IOException {
+ super.next();
+ if (super.hasTop()) {
+ ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
+ }
+ }
+}
diff --git a/test/compaction-failures/README.md b/test/compaction-failures/README.md
new file mode 100644
index 0000000..1b48b29
--- /dev/null
+++ b/test/compaction-failures/README.md
@@ -0,0 +1,53 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+Some scripts for testing different compaction failure scenarios.
+
+```bash
+# start compactors, half of which will always fail any compaction because they are missing an iterator class
+./setup-compactors.sh
+# starting ingest into table ci1
+./start-ingest.sh ci1 NORMAL
+# starting ingest into table ci2 with a non-existent compaction iterator configured, all compactions should fail on this table
+./start-ingest.sh ci2 BAD_ITER
+# starting ingest into table ci3 with a compaction service that has no compactors running, no compactions should ever run for this table
+./start-ingest.sh ci3 BAD_SERVICE
+# starting ingest into table ci4, corrupting data in a single tablet such that that tablet can never compact
+./start-ingest.sh ci4 BAD_TABLET
+```
+
+While test are running can use the following to monitor files per tablet on a table.
+
+```
+$ accumulo jshell
+Preparing JShell for Apache Accumulo
+
+Use 'client' to interact with Accumulo
+
+| Welcome to JShell -- Version 17.0.15
+| For an introduction type: /help intro
+
+jshell> /open count-file-per-tablet.jshell
+
+jshell> CFPT.printStats(client, "ci1", 3000)
+ 0 secs min:20 avg:30.37 max:35
+ 3 secs min:20 avg:30.28 max:35
+```
diff --git a/test/compaction-failures/count-file-per-tablet.jshell b/test/compaction-failures/count-file-per-tablet.jshell
new file mode 100644
index 0000000..5f3cf40
--- /dev/null
+++ b/test/compaction-failures/count-file-per-tablet.jshell
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Streams;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.data.Range;
+
+// counts files per tablet for a table over time
+public class CFPT {
+
+ public static void printPerTabletFileStats(AccumuloClient client, String table, long startTime) throws Exception {
+ var tableId = client.tableOperations().tableIdMap().get(table);
+ try(var scanner = client.createScanner("accumulo.metadata")) {
+ scanner.setRange(new Range(tableId+":", tableId+"<"));
+ scanner.fetchColumnFamily("file");
+ RowIterator rowIterator = new RowIterator(scanner);
+ var stats = Streams.stream(rowIterator).mapToInt(Iterators::size).summaryStatistics();
+ long diff = (System.currentTimeMillis() -startTime)/1000;
+ System.out.printf("%3d secs min:%d avg:%.2f max:%d\n", diff, stats.getMin(),stats.getAverage(),stats.getMax() );
+ }
+ }
+
+ public static void printStats(AccumuloClient client, String table, long sleep) throws Exception {
+ long startTime = System.currentTimeMillis();
+ printPerTabletFileStats(client, table, startTime);
+
+ while(true) {
+ Thread.sleep(sleep);
+ printPerTabletFileStats(client, table, startTime);
+ }
+ }
+}
diff --git a/test/compaction-failures/setup-compactors.sh b/test/compaction-failures/setup-compactors.sh
new file mode 100755
index 0000000..8c28ae3
--- /dev/null
+++ b/test/compaction-failures/setup-compactors.sh
@@ -0,0 +1,60 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# configure compaction services
+accumulo shell -u root -p secret <<EOF
+config -s tserver.compaction.major.service.cs1.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
+config -s tserver.compaction.major.service.cs1.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"small"},{"name":"large","type":"external","queue":"large"}]
+config -s tserver.compaction.major.service.cs2.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
+config -s tserver.compaction.major.service.cs2.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"emptysmall"},{"name":"large","type":"external","queue":"emptylarge"}]
+EOF
+
+mkdir -p logs
+
+pkill -f "Main compactor"
+pkill -f "Main compaction-coord"
+
+ACCUMULO_SERVICE_INSTANCE=coord accumulo compaction-coordinator &> logs/coord.out &
+
+# get the absolute path of the the accumulo test non shaded test jar
+TEST_JAR=$(readlink -f $(ls ../../target/accumulo-testing-[0-9].*jar))
+
+# start 4 compactors with the test iterator on their classpath
+for i in $(seq 1 4); do
+ CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
+done
+
+# start four compactors that do not have the test jar on their classpath. Since
+# every table configures an iterator w/ the test jar, every compaction on these
+# compactors should fail
+for i in $(seq 5 8); do
+ ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
+done
+
+# start 4 compactors for the large group w/ the test iterator on the classpath
+for i in $(seq 1 4); do
+ CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
+done
+
+# start 4 compactors for the large group that are missing the iterator used by the test table
+for i in $(seq 5 8); do
+ ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
+done
\ No newline at end of file
diff --git a/test/compaction-failures/start-ingest.sh b/test/compaction-failures/start-ingest.sh
new file mode 100755
index 0000000..9bebfb6
--- /dev/null
+++ b/test/compaction-failures/start-ingest.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+if [ "$#" != "2" ]; then
+ echo "Usage $0 <tablename> BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL"
+ exit 1
+fi
+
+#accumulo-testing directory
+ATD=../..
+
+table=$1
+test_type=$2
+
+$ATD/bin/cingest createtable -o test.ci.common.accumulo.table=$table
+
+# setup a compaction time iterator and point the tablet to compaction service w/ external compactors
+accumulo shell -u root -p secret <<EOF
+config -t $table -s table.iterator.majc.validate=100,org.apache.accumulo.testing.continuous.ValidatingIterator
+config -t $table -s table.compaction.dispatcher.opts.service=cs1
+EOF
+
+case $test_type in
+BAD_ITER) accumulo shell -u root -p secret <<EOF
+config -t $table -s table.iterator.majc.validate=100,org.apache.accumulo.testing.continuous.MissingIter
+EOF
+ ;;
+BAD_TABLET)
+ # write a little bit of data
+ $ATD/bin/cingest ingest -o test.ci.common.accumulo.table=$table -o test.ci.ingest.client.entries=100000
+ # corrupt a checksum in an entry, this should cause the ValidatingIterator to fail when compacting that tablet
+ $ATD/bin/cingest corrupt -o test.ci.common.accumulo.table=$table -o test.ci.ingest.row.min=1000000000000000
+ accumulo shell -u root -p secret -e "flush -t $table"
+ ;;
+BAD_SERVICE) accumulo shell -u root -p secret <<EOF
+config -t $table -s table.compaction.dispatcher.opts.service=cs2
+EOF
+ ;;
+NORMAL) ;;
+*)
+ echo "Usage $0 <tablename> BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL"
+ exit 1
+esac
+
+mkdir -p logs
+
+if [ "$test_type" == "NORMAL" ]; then
+ # start unlimited ingest into the table
+ $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table &> logs/bulk-$table.log &
+else
+ # limit the amount of data written since tablets can not compact
+ # would not need to do this if the table.file.pause property existed in 2.1
+ $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table -o test.ci.ingest.client.entries=10000000 &> logs/bulk-$table.log &
+fi
+