| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test; |
| |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.conf.ClientProperty; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.PartialKey; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.trace.TraceUtil; |
| import org.apache.hadoop.io.Text; |
| import org.apache.htrace.Sampler; |
| import org.apache.htrace.Span; |
| import org.apache.htrace.Trace; |
| import org.apache.htrace.TraceScope; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.beust.jcommander.Parameter; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| public class VerifyIngest { |
| |
| private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class); |
| |
| public static int getRow(Key k) { |
| return Integer.parseInt(k.getRow().toString().split("_")[1]); |
| } |
| |
| public static int getCol(Key k) { |
| return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]); |
| } |
| |
| public static class VerifyParams extends TestIngest.IngestParams { |
| public boolean useGet = false; |
| |
| public VerifyParams(Properties props) { |
| super(props); |
| } |
| |
| public VerifyParams(Properties props, String table) { |
| super(props, table); |
| } |
| |
| public VerifyParams(Properties props, String table, int rows) { |
| super(props, table, rows); |
| } |
| } |
| |
| public static class Opts extends TestIngest.Opts { |
| @Parameter(names = "-useGet", description = "fetches values one at a time, instead of scanning") |
| public boolean useGet = false; |
| |
| public VerifyParams getVerifyParams() { |
| VerifyParams params = new VerifyParams(getClientProps(), tableName); |
| populateIngestPrams(params); |
| params.useGet = useGet; |
| return params; |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Opts opts = new Opts(); |
| opts.parseArgs(VerifyIngest.class.getName(), args); |
| if (opts.trace) { |
| TraceUtil.enableClientTraces(null, null, new Properties()); |
| } |
| try (TraceScope clientSpan = |
| Trace.startSpan(VerifyIngest.class.getSimpleName(), Sampler.ALWAYS)) { |
| Span span = clientSpan.getSpan(); |
| if (span != null) |
| span.addKVAnnotation("cmdLine", Arrays.asList(args).toString()); |
| |
| try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { |
| verifyIngest(client, opts.getVerifyParams()); |
| } |
| |
| } finally { |
| TraceUtil.disable(); |
| } |
| } |
| |
| @SuppressFBWarnings(value = "PREDICTABLE_RANDOM", |
| justification = "predictable random is okay for testing") |
| public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| byte[][] bytevals = TestIngest.generateValues(params.dataSize); |
| |
| Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2"); |
| String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps); |
| accumuloClient.securityOperations().changeUserAuthorizations(principal, labelAuths); |
| |
| int expectedRow = params.startRow; |
| int expectedCol = 0; |
| int recsRead = 0; |
| |
| long bytesRead = 0; |
| long t1 = System.currentTimeMillis(); |
| |
| byte[] randomValue = new byte[params.dataSize]; |
| Random random = new Random(); |
| |
| Key endKey = new Key(new Text("row_" + String.format("%010d", params.rows + params.startRow))); |
| |
| int errors = 0; |
| |
| while (expectedRow < (params.rows + params.startRow)) { |
| |
| if (params.useGet) { |
| Text rowKey = new Text("row_" + String.format("%010d", expectedRow + params.startRow)); |
| Text colf = new Text(params.columnFamily); |
| Text colq = new Text("col_" + String.format("%07d", expectedCol)); |
| |
| try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) { |
| scanner.setBatchSize(1); |
| Key startKey = new Key(rowKey, colf, colq); |
| Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL)); |
| scanner.setRange(range); |
| |
| byte[] val = null; // t.get(rowKey, column); |
| |
| Iterator<Entry<Key,Value>> iter = scanner.iterator(); |
| |
| if (iter.hasNext()) { |
| val = iter.next().getValue().get(); |
| } |
| |
| byte[] ev; |
| if (params.random != null) { |
| ev = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow, |
| expectedCol); |
| } else { |
| ev = bytevals[expectedCol % bytevals.length]; |
| } |
| |
| if (val == null) { |
| log.error("Did not find {} {} {}", rowKey, colf, colq); |
| errors++; |
| } else { |
| recsRead++; |
| bytesRead += val.length; |
| Value value = new Value(val); |
| if (value.compareTo(ev) != 0) { |
| log.error("unexpected value ({} {} {} : saw {} expected {}", rowKey, colf, colq, |
| value, new Value(ev)); |
| errors++; |
| } |
| } |
| |
| expectedCol++; |
| if (expectedCol >= params.cols) { |
| expectedCol = 0; |
| expectedRow++; |
| } |
| } |
| } else { |
| |
| Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow))); |
| |
| try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) { |
| scanner.setRange(new Range(startKey, endKey)); |
| for (int j = 0; j < params.cols; j++) { |
| scanner.fetchColumn(new Text(params.columnFamily), |
| new Text("col_" + String.format("%07d", j))); |
| } |
| |
| int recsReadBefore = recsRead; |
| |
| for (Entry<Key,Value> entry : scanner) { |
| |
| recsRead++; |
| |
| bytesRead += entry.getKey().getLength(); |
| bytesRead += entry.getValue().getSize(); |
| |
| int rowNum = getRow(entry.getKey()); |
| int colNum = getCol(entry.getKey()); |
| |
| if (rowNum != expectedRow) { |
| log.error("rowNum != expectedRow {} != {}", rowNum, expectedRow); |
| errors++; |
| expectedRow = rowNum; |
| } |
| |
| if (colNum != expectedCol) { |
| log.error("colNum != expectedCol {} != {} rowNum : {}", colNum, expectedCol, |
| rowNum); |
| errors++; |
| } |
| |
| if (expectedRow >= (params.rows + params.startRow)) { |
| log.error( |
| "expectedRow ({}) >= (ingestArgs.rows + ingestArgs.startRow) ({}), get" |
| + " batch returned data passed end key", |
| expectedRow, (params.rows + params.startRow)); |
| errors++; |
| break; |
| } |
| |
| byte[] value; |
| if (params.random != null) { |
| value = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow, |
| colNum); |
| } else { |
| value = bytevals[colNum % bytevals.length]; |
| } |
| |
| if (entry.getValue().compareTo(value) != 0) { |
| log.error("unexpected value, rowNum : {} colNum : {}", rowNum, colNum); |
| log.error(" saw = {} expected = {}", new String(entry.getValue().get()), |
| new String(value)); |
| errors++; |
| } |
| |
| if (params.timestamp >= 0 && entry.getKey().getTimestamp() != params.timestamp) { |
| log.error("unexpected timestamp {}, rowNum : {} colNum : {}", |
| entry.getKey().getTimestamp(), rowNum, colNum); |
| errors++; |
| } |
| |
| expectedCol++; |
| if (expectedCol >= params.cols) { |
| expectedCol = 0; |
| expectedRow++; |
| } |
| |
| } |
| |
| if (recsRead == recsReadBefore) { |
| log.warn("Scan returned nothing, breaking..."); |
| break; |
| } |
| } |
| } |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| if (errors > 0) { |
| throw new AccumuloException("saw " + errors + " errors "); |
| } |
| |
| if (expectedRow == (params.rows + params.startRow)) { |
| System.out.printf( |
| "%,12d records read | %,8d records/sec | %,12d bytes read |" |
| + " %,8d bytes/sec | %6.3f secs %n", |
| recsRead, (int) ((recsRead) / ((t2 - t1) / 1000.0)), bytesRead, |
| (int) (bytesRead / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0); |
| } else { |
| throw new AccumuloException("Did not read expected number of rows. Saw " |
| + (expectedRow - params.startRow) + " expected " + params.rows); |
| } |
| } |
| |
| } |