blob: 96020bb7db8de3531780423b263a407368d399ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.core.ingest;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.cli.ClientOnDefaultTable;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.TabletServerBatchWriter;
import org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.beust.jcommander.Parameter;
public class TestIngest {
public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2");
public static class Opts extends ClientOnDefaultTable {
@Parameter(names = "--createTable")
public boolean createTable = false;
@Parameter(names = "--splits", description = "the number of splits to use when creating the table")
public int numsplits = 1;
@Parameter(names = "--start", description = "the starting row number")
public int startRow = 0;
@Parameter(names = "--rows", description = "the number of rows to ingest")
public int rows = 100000;
@Parameter(names = "--cols", description = "the number of columns to ingest per row")
public int cols = 1;
@Parameter(names = "--random", description = "insert random rows and use the given number to seed the psuedo-random number generator")
public Integer random = null;
@Parameter(names = "--size", description = "the size of the value to ingest")
public int dataSize = 1000;
@Parameter(names = "--delete", description = "delete values instead of inserting them")
public boolean delete = false;
@Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values")
public long timestamp = -1;
@Parameter(names = "--rfile", description = "generate data into a file that can be imported")
public String outputFile = null;
@Parameter(names = "--stride", description = "the difference between successive row ids")
public int stride;
@Parameter(names = {"-cf", "--columnFamily"}, description = "place columns in this column family")
public String columnFamily = "colf";
@Parameter(names = {"-cv", "--columnVisibility"}, description = "place columns in this column family", converter = VisibilityConverter.class)
public ColumnVisibility columnVisibility = new ColumnVisibility();
public Configuration conf = null;
public FileSystem fs = null;
public Opts() {
super("test_ingest");
}
}
public static void createTable(AccumuloClient client, Opts args) throws AccumuloException, AccumuloSecurityException, TableExistsException {
if (args.createTable) {
TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows, args.numsplits);
if (!client.tableOperations().exists(args.getTableName()))
client.tableOperations().create(args.getTableName());
try {
client.tableOperations().addSplits(args.getTableName(), splits);
} catch (TableNotFoundException ex) {
// unlikely
throw new RuntimeException(ex);
}
}
}
public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) {
long splitSize = (end - start) / numsplits;
long pos = start + splitSize;
TreeSet<Text> splits = new TreeSet<>();
while (pos < end) {
splits.add(new Text(String.format("row_%010d", pos)));
pos += splitSize;
}
return splits;
}
public static byte[][] generateValues(int dataSize) {
byte[][] bytevals = new byte[10][];
byte[] letters = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
for (int i = 0; i < 10; i++) {
bytevals[i] = new byte[dataSize];
for (int j = 0; j < dataSize; j++)
bytevals[i][j] = letters[i];
}
return bytevals;
}
private static byte ROW_PREFIX[] = "row_".getBytes(UTF_8);
private static byte COL_PREFIX[] = "col_".getBytes(UTF_8);
public static Text generateRow(int rowid, int startRow) {
return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, ROW_PREFIX));
}
public static byte[] genRandomValue(Random random, byte dest[], int seed, int row, int col) {
random.setSeed((row ^ seed) ^ col);
random.nextBytes(dest);
toPrintableChars(dest);
return dest;
}
public static void toPrintableChars(byte[] dest) {
// transform to printable chars
for (int i = 0; i < dest.length; i++) {
dest[i] = (byte) (((0xff & dest[i]) % 92) + ' ');
}
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(TestIngest.class.getName(), args, bwOpts);
String name = TestIngest.class.getSimpleName();
DistributedTrace.enable(name);
try {
opts.startTracing(name);
if (opts.debug)
Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE);
// test batch update
ingest(opts.getClient(), opts, bwOpts);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
opts.stopTracing();
DistributedTrace.disable();
}
}
public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, BatchWriterOpts bwOpts) throws IOException, AccumuloException,
AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, TableExistsException {
long stopTime;
byte[][] bytevals = generateValues(opts.dataSize);
byte randomValue[] = new byte[opts.dataSize];
Random random = new Random();
long bytesWritten = 0;
createTable(client, opts);
BatchWriter bw = null;
FileSKVWriter writer = null;
if (opts.outputFile != null) {
Configuration conf = CachedConfiguration.getInstance();
writer = FileOperations.getInstance().newWriterBuilder().forFile(opts.outputFile + "." + RFile.EXTENSION, fs, conf)
.withTableConfiguration(DefaultConfiguration.getInstance()).build();
writer.startDefaultLocalityGroup();
} else {
bw = client.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
client.securityOperations().changeUserAuthorizations(opts.getPrincipal(), AUTHS);
}
Text labBA = new Text(opts.columnVisibility.getExpression());
long startTime = System.currentTimeMillis();
for (int i = 0; i < opts.rows; i++) {
int rowid;
if (opts.stride > 0) {
rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride);
} else {
rowid = i;
}
Text row = generateRow(rowid, opts.startRow);
Mutation m = new Mutation(row);
for (int j = 0; j < opts.cols; j++) {
Text colf = new Text(opts.columnFamily);
Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
if (writer != null) {
Key key = new Key(row, colf, colq, labBA);
if (opts.timestamp >= 0) {
key.setTimestamp(opts.timestamp);
} else {
key.setTimestamp(startTime);
}
if (opts.delete) {
key.setDeleted(true);
} else {
key.setDeleted(false);
}
bytesWritten += key.getSize();
if (opts.delete) {
writer.append(key, new Value(new byte[0]));
} else {
byte value[];
if (opts.random != null) {
value = genRandomValue(random, randomValue, opts.random.intValue(), rowid + opts.startRow, j);
} else {
value = bytevals[j % bytevals.length];
}
Value v = new Value(value);
writer.append(key, v);
bytesWritten += v.getSize();
}
} else {
Key key = new Key(row, colf, colq, labBA);
bytesWritten += key.getSize();
if (opts.delete) {
if (opts.timestamp >= 0)
m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
else
m.putDelete(colf, colq, opts.columnVisibility);
} else {
byte value[];
if (opts.random != null) {
value = genRandomValue(random, randomValue, opts.random.intValue(), rowid + opts.startRow, j);
} else {
value = bytevals[j % bytevals.length];
}
bytesWritten += value.length;
if (opts.timestamp >= 0) {
m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true));
} else {
m.put(colf, colq, opts.columnVisibility, new Value(value, true));
}
}
}
}
if (bw != null)
bw.addMutation(m);
}
if (writer != null) {
writer.close();
} else if (bw != null) {
try {
bw.close();
} catch (MutationsRejectedException e) {
if (e.getSecurityErrorCodes().size() > 0) {
for (Entry<TabletId,Set<SecurityErrorCode>> entry : e.getSecurityErrorCodes().entrySet()) {
System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to " + entry.getValue());
}
}
if (e.getConstraintViolationSummaries().size() > 0) {
for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) {
System.err.println("ERROR : Constraint violates : " + cvs);
}
}
throw e;
}
}
stopTime = System.currentTimeMillis();
int totalValues = opts.rows * opts.cols;
double elapsed = (stopTime - startTime) / 1000.0;
System.out.printf("%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs %n", totalValues,
(int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed), elapsed);
}
public static void ingest(AccumuloClient c, Opts opts, BatchWriterOpts batchWriterOpts) throws MutationsRejectedException, IOException, AccumuloException,
AccumuloSecurityException, TableNotFoundException, TableExistsException {
ingest(c, FileSystem.get(CachedConfiguration.getInstance()), opts, batchWriterOpts);
}
}