| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import java.security.SecureRandom; |
| import java.util.Iterator; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.TreeSet; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.util.TextUtil; |
| import org.apache.accumulo.server.cli.ServerUtilOpts; |
| import org.apache.hadoop.io.Text; |
| |
| import com.beust.jcommander.Parameter; |
| |
| public class TestBinaryRows { |
| private static final long byteOnes; |
| |
| static { |
| // safely build Byte.SIZE number of 1s as a long; not that I think Byte.SIZE will ever be |
| // anything but 8, but just for fun |
| long b = 1; |
| for (int i = 0; i < Byte.SIZE; ++i) |
| b |= (1L << i); |
| byteOnes = b; |
| } |
| |
| static byte[] encodeLong(long l) { |
| byte[] ba = new byte[Long.SIZE / Byte.SIZE]; |
| |
| // parse long into a sequence of bytes |
| for (int i = 0; i < ba.length; ++i) |
| ba[i] = (byte) (byteOnes & (l >>> (Byte.SIZE * (ba.length - i - 1)))); |
| |
| return ba; |
| } |
| |
| static long decodeLong(byte[] ba) { |
| // validate byte array |
| if (ba.length > Long.SIZE / Byte.SIZE) |
| throw new IllegalArgumentException( |
| "Byte array of size " + ba.length + " is too big to hold a long"); |
| |
| // build the long from the bytes |
| long l = 0; |
| for (int i = 0; i < ba.length; ++i) |
| l |= (byteOnes & ba[i]) << (Byte.SIZE * (ba.length - i - 1)); |
| |
| return l; |
| } |
| |
| public static class Opts extends ServerUtilOpts { |
| @Parameter(names = "--mode", |
| description = "either 'ingest', 'delete', 'randomLookups', 'split'," |
| + " 'verify', 'verifyDeleted'", |
| required = true) |
| public String mode; |
| @Parameter(names = "--start", description = "the lowest numbered row") |
| public long start = 0; |
| @Parameter(names = "--count", description = "number of rows to ingest", required = true) |
| public long num = 0; |
| @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") |
| public String tableName; |
| } |
| |
| public static void runTest(AccumuloClient accumuloClient, Opts opts) throws Exception { |
| |
| final Text CF = new Text("cf"), CQ = new Text("cq"); |
| final byte[] CF_BYTES = "cf".getBytes(UTF_8), CQ_BYTES = "cq".getBytes(UTF_8); |
| if (opts.mode.equals("ingest") || opts.mode.equals("delete")) { |
| try (BatchWriter bw = accumuloClient.createBatchWriter(opts.tableName)) { |
| boolean delete = opts.mode.equals("delete"); |
| |
| for (long i = 0; i < opts.num; i++) { |
| byte[] row = encodeLong(i + opts.start); |
| String value = "" + (i + opts.start); |
| |
| Mutation m = new Mutation(new Text(row)); |
| if (delete) { |
| m.putDelete(CF, CQ); |
| } else { |
| m.put(CF, CQ, new Value(value)); |
| } |
| bw.addMutation(m); |
| } |
| } |
| } else if (opts.mode.equals("verifyDeleted")) { |
| try (Scanner s = accumuloClient.createScanner(opts.tableName, opts.auths)) { |
| Key startKey = |
| new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); |
| Key stopKey = |
| new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); |
| s.setBatchSize(50000); |
| s.setRange(new Range(startKey, stopKey)); |
| |
| for (Entry<Key,Value> entry : s) { |
| throw new Exception("ERROR : saw entries in range that should be deleted ( first value : " |
| + entry.getValue() + ")"); |
| } |
| } |
| } else if (opts.mode.equals("verify")) { |
| long t1 = System.currentTimeMillis(); |
| |
| try (Scanner s = accumuloClient.createScanner(opts.tableName, opts.auths)) { |
| Key startKey = |
| new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); |
| Key stopKey = |
| new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); |
| s.setRange(new Range(startKey, stopKey)); |
| |
| long i = opts.start; |
| |
| for (Entry<Key,Value> e : s) { |
| Key k = e.getKey(); |
| Value v = e.getValue(); |
| |
| checkKeyValue(i, k, v); |
| |
| i++; |
| } |
| |
| if (i != opts.start + opts.num) { |
| throw new Exception("ERROR : did not see expected number of rows, saw " + (i - opts.start) |
| + " expected " + opts.num); |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| System.out.printf("time : %9.2f secs%n", ((t2 - t1) / 1000.0)); |
| System.out.printf("rate : %9.2f entries/sec%n", opts.num / ((t2 - t1) / 1000.0)); |
| } |
| } else if (opts.mode.equals("randomLookups")) { |
| int numLookups = 1000; |
| |
| Random r = new SecureRandom(); |
| |
| long t1 = System.currentTimeMillis(); |
| |
| for (int i = 0; i < numLookups; i++) { |
| long row = ((r.nextLong() & 0x7fffffffffffffffL) % opts.num) + opts.start; |
| |
| try (Scanner s = accumuloClient.createScanner(opts.tableName, opts.auths)) { |
| Key startKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); |
| Key stopKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], 0); |
| s.setRange(new Range(startKey, stopKey)); |
| |
| Iterator<Entry<Key,Value>> si = s.iterator(); |
| |
| if (si.hasNext()) { |
| Entry<Key,Value> e = si.next(); |
| Key k = e.getKey(); |
| Value v = e.getValue(); |
| |
| checkKeyValue(row, k, v); |
| |
| if (si.hasNext()) { |
| throw new Exception("ERROR : lookup on " + row + " returned more than one result "); |
| } |
| |
| } else { |
| throw new Exception("ERROR : lookup on " + row + " failed "); |
| } |
| } |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| System.out.printf("time : %9.2f secs%n", ((t2 - t1) / 1000.0)); |
| System.out.printf("lookups : %9d keys%n", numLookups); |
| System.out.printf("rate : %9.2f lookups/sec%n", numLookups / ((t2 - t1) / 1000.0)); |
| |
| } else if (opts.mode.equals("split")) { |
| TreeSet<Text> splits = new TreeSet<>(); |
| int shift = (int) opts.start; |
| int count = (int) opts.num; |
| |
| for (long i = 0; i < count; i++) { |
| long splitPoint = i << shift; |
| |
| splits.add(new Text(encodeLong(splitPoint))); |
| System.out.printf("added split point 0x%016x %,12d%n", splitPoint, splitPoint); |
| } |
| |
| NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); |
| accumuloClient.tableOperations().create(opts.tableName, ntc); |
| |
| } else { |
| throw new Exception("ERROR : " + opts.mode + " is not a valid operation."); |
| } |
| } |
| |
| private static void checkKeyValue(long expected, Key k, Value v) throws Exception { |
| if (expected != decodeLong(TextUtil.getBytes(k.getRow()))) { |
| throw new Exception( |
| "ERROR : expected row " + expected + " saw " + decodeLong(TextUtil.getBytes(k.getRow()))); |
| } |
| |
| if (!v.toString().equals("" + expected)) { |
| throw new Exception("ERROR : expected value " + expected + " saw " + v); |
| } |
| } |
| |
| public static void main(String[] args) { |
| Opts opts = new Opts(); |
| opts.parseArgs(TestBinaryRows.class.getName(), args); |
| |
| try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { |
| runTest(client, opts); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| } |