blob: 9b210375e48f246a3dcd9f3158abf4e8077a782a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.merkle.cli;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.cli.ClientOpts;
import org.apache.accumulo.testing.merkle.RangeSerialization;
import org.apache.accumulo.testing.merkle.skvi.DigestIterator;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Iterables;
/**
* Read from a table, compute a Merkle tree and output it to a table. Each key-value pair in the
* destination table is a leaf node of the Merkle tree.
*/
public class GenerateHashes {
private static final Logger log = LoggerFactory.getLogger(GenerateHashes.class);
public static class GenerateHashesOpts extends ClientOpts {
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
String tableName;
@Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use")
private String hashName;
@Parameter(names = {"-o", "--output"}, required = true,
description = "output table name, expected to exist and be writable")
private String outputTableName;
@Parameter(names = {"-nt", "--numThreads"}, required = false,
description = "number of concurrent threads calculating digests")
private int numThreads = 4;
@Parameter(names = {"-iter", "--iterator"}, required = false,
description = "Should we push down logic with an iterator")
private boolean iteratorPushdown = false;
@Parameter(names = {"-s", "--splits"}, required = false,
description = "File of splits to use for merkle tree")
private String splitsFile = null;
String getHashName() {
return hashName;
}
String getOutputTableName() {
return outputTableName;
}
int getNumThreads() {
return numThreads;
}
boolean isIteratorPushdown() {
return iteratorPushdown;
}
String getSplitsFile() {
return splitsFile;
}
}
Collection<Range> getRanges(AccumuloClient client, String tableName, String splitsFile)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException,
FileNotFoundException {
if (null == splitsFile) {
log.info("Using table split points");
Collection<Text> endRows = client.tableOperations().listSplits(tableName);
return endRowsToRanges(endRows);
} else {
log.info("Using provided split points");
ArrayList<Text> splits = new ArrayList<>();
String line;
try (java.util.Scanner file = new java.util.Scanner(new File(splitsFile), UTF_8.name())) {
while (file.hasNextLine()) {
line = file.nextLine();
if (!line.isEmpty()) {
splits.add(new Text(line));
}
}
}
Collections.sort(splits);
return endRowsToRanges(splits);
}
}
public void run(GenerateHashesOpts opts) throws TableNotFoundException, AccumuloSecurityException,
AccumuloException, NoSuchAlgorithmException, FileNotFoundException {
try (AccumuloClient client = opts.createClient()) {
Collection<Range> ranges = getRanges(client, opts.tableName, opts.getSplitsFile());
run(client, opts.tableName, opts.getOutputTableName(), opts.getHashName(),
opts.getNumThreads(), opts.isIteratorPushdown(), ranges);
}
}
public void run(final AccumuloClient client, final String inputTableName,
final String outputTableName, final String digestName, int numThreads,
final boolean iteratorPushdown, final Collection<Range> ranges)
throws TableNotFoundException, AccumuloException, NoSuchAlgorithmException {
if (!client.tableOperations().exists(outputTableName)) {
throw new IllegalArgumentException(outputTableName + " does not exist, please create it");
}
// Get some parallelism
ExecutorService svc = Executors.newFixedThreadPool(numThreads);
try (BatchWriter bw = client.createBatchWriter(outputTableName)) {
for (final Range range : ranges) {
final MessageDigest digest = getDigestAlgorithm(digestName);
svc.execute(() -> {
Scanner s;
try {
s = client.createScanner(inputTableName, Authorizations.EMPTY);
} catch (Exception e) {
log.error("Could not get scanner for " + inputTableName, e);
throw new RuntimeException(e);
}
s.setRange(range);
Value v;
Mutation m;
if (iteratorPushdown) {
IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class);
cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName);
s.addScanIterator(cfg);
// The scanner should only ever return us one
// Key-Value, otherwise this approach won't work
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
v = entry.getValue();
m = RangeSerialization.toMutation(range, v);
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (Entry<Key,Value> entry : s) {
DataOutputStream out = new DataOutputStream(baos);
try {
entry.getKey().write(out);
entry.getValue().write(out);
} catch (Exception e) {
log.error("Error writing {}", entry, e);
throw new RuntimeException(e);
}
digest.update(baos.toByteArray());
baos.reset();
}
v = new Value(digest.digest());
m = RangeSerialization.toMutation(range, v);
}
// Log some progress
log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range,
Hex.encodeHexString(v.get()));
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
log.error("Could not write mutation", e);
throw new RuntimeException(e);
}
});
}
svc.shutdown();
// Wait indefinitely for the scans to complete
while (!svc.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(
"Interrupted while waiting for executor service to gracefully complete. Exiting now");
svc.shutdownNow();
return;
}
}
}
}
private TreeSet<Range> endRowsToRanges(Collection<Text> endRows) {
ArrayList<Text> sortedEndRows = new ArrayList<>(endRows);
Collections.sort(sortedEndRows);
Text prevEndRow = null;
TreeSet<Range> ranges = new TreeSet<>();
for (Text endRow : sortedEndRows) {
if (null == prevEndRow) {
ranges.add(new Range(null, false, endRow, true));
} else {
ranges.add(new Range(prevEndRow, false, endRow, true));
}
prevEndRow = endRow;
}
ranges.add(new Range(prevEndRow, false, null, false));
return ranges;
}
private MessageDigest getDigestAlgorithm(String digestName) throws NoSuchAlgorithmException {
return MessageDigest.getInstance(digestName);
}
public static void main(String[] args) throws Exception {
GenerateHashesOpts opts = new GenerateHashesOpts();
opts.parseArgs(GenerateHashes.class.getName(), args);
if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) {
throw new IllegalArgumentException(
"Cannot use iterator pushdown with anything other than table split points");
}
GenerateHashes generate = new GenerateHashes();
generate.run(opts);
}
}