blob: a1d097e99c922d998d42372893008b9716be8a35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.healthprobe;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class Monitor {
private static final Logger log = LoggerFactory.getLogger(Monitor.class);
static long distance = 1l;
static List<TabletId> tablets;
static boolean cacheTablets = true;
public static void main(String[] args) throws Exception {
MonitorOpts opts = new MonitorOpts();
opts.parseArgs(Monitor.class.getName(), args);
distance = opts.distance;
Authorizations auth = new Authorizations();
if (opts.auth != "") {
auth = new Authorizations(opts.auth);
}
try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
Scanner scanner = client.createScanner(opts.tableName, auth)) {
if (opts.isolate) {
scanner.enableIsolation();
}
int scannerSleepMs = opts.sleep_ms;
LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl()
: new IterativeLoopControl(opts.scan_iterations);
while (scanning_condition.keepScanning()) {
Random tablet_index_generator = new Random();
TabletId pickedTablet = pickTablet(client.tableOperations(), opts.tableName,
tablet_index_generator);
Range range = pickedTablet.toRange();
scanner.setRange(range);
if (opts.batch_size > 0) {
scanner.setBatchSize(opts.batch_size);
}
try {
long startTime = System.nanoTime();
long count = consume(scanner, opts.distance);
long stopTime = System.nanoTime();
MDC.put("StartTime", String.valueOf(startTime));
MDC.put("TabletId", String.valueOf(pickedTablet));
MDC.put("TableName", String.valueOf(opts.tableName));
MDC.put("TotalTime", String.valueOf((stopTime - startTime)));
MDC.put("StartRow", String.valueOf(range.getStartKey()));
MDC.put("EndRow", String.valueOf(range.getEndKey()));
MDC.put("TotalRecords", String.valueOf(count));
log.info("SCN starttime={} tabletindex={} tablename={} totaltime={} totalrecords={}",
startTime, tablet_index_generator, opts.tableName, (stopTime - startTime), count);
if (scannerSleepMs > 0) {
sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error(String.format(
"Exception while scanning range %s. Check the state of Accumulo for errors.", range),
e);
}
}
}
}
public static int consume(Iterable<Entry<Key,Value>> scanner, long numberOfRows) {
RowIterator rowIter = new RowIterator(scanner);
int count = 0;
while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> itr = rowIter.next();
count++;
if (count >= numberOfRows) {
break;
}
}
return count;
}
public static TabletId pickTablet(TableOperations tops, String table, Random r)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
if (cacheTablets) {
Locations locations = tops.locate(table, Collections.singleton(new Range()));
tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet());
cacheTablets = false;
}
int index = r.nextInt(tablets.size());
TabletId tabletId = tablets.get(index);
return tabletId;
}
/*
* These interfaces + implementations are used to determine how many times the scanner should look
* up a random tablet and scan it.
*/
static interface LoopControl {
public boolean keepScanning();
}
// Does a finite number of iterations
static class IterativeLoopControl implements LoopControl {
private final int max;
private int current;
public IterativeLoopControl(int max) {
this.max = max;
this.current = 0;
}
@Override
public boolean keepScanning() {
if (current < max) {
++current;
return true;
} else {
return false;
}
}
}
// Does an infinite number of iterations
static class ContinuousLoopControl implements LoopControl {
@Override
public boolean keepScanning() {
return true;
}
}
}