blob: 939ac759667d9549a6dd4ffd605c87d80faf6ff7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.dirlist;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.examples.cli.BatchWriterOpts;
import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
import org.apache.accumulo.examples.cli.ScannerOpts;
import org.apache.hadoop.io.Text;
import com.beust.jcommander.Parameter;
/**
* Computes recursive counts over file system information and stores them back into the same
* Accumulo table.
*/
public class FileCount {
private int entriesScanned;
private int inserts;
private final ScannerOpts scanOpts;
private final BatchWriterOpts bwOpts;
private final AccumuloClient client;
private final String tableName;
private final Authorizations auths;
private final ColumnVisibility visibility;
private static class CountValue {
int dirCount = 0;
int fileCount = 0;
int recursiveDirCount = 0;
int recusiveFileCount = 0;
void set(Value val) {
String[] sa = val.toString().split(",");
dirCount = Integer.parseInt(sa[0]);
fileCount = Integer.parseInt(sa[1]);
recursiveDirCount = Integer.parseInt(sa[2]);
recusiveFileCount = Integer.parseInt(sa[3]);
}
Value toValue() {
return new Value(
(dirCount + "," + fileCount + "," + recursiveDirCount + "," + recusiveFileCount)
.getBytes());
}
void incrementFiles() {
fileCount++;
recusiveFileCount++;
}
void incrementDirs() {
dirCount++;
recursiveDirCount++;
}
public void clear() {
dirCount = 0;
fileCount = 0;
recursiveDirCount = 0;
recusiveFileCount = 0;
}
public void incrementRecursive(CountValue other) {
recursiveDirCount += other.recursiveDirCount;
recusiveFileCount += other.recusiveFileCount;
}
}
private int findMaxDepth(Scanner scanner, int min, int max) {
int mid = min + (max - min) / 2;
return findMaxDepth(scanner, min, mid, max);
}
private int findMaxDepth(Scanner scanner, int min, int mid, int max) {
// check to see if the mid-point exist
if (max < min)
return -1;
scanner.setRange(
new Range(String.format("%03d", mid), true, String.format("%03d", mid + 1), false));
if (scanner.iterator().hasNext()) {
// this depth exist, check to see if a larger depth exist
int ret = findMaxDepth(scanner, mid + 1, max);
if (ret == -1)
return mid; // this must the max
else
return ret;
} else {
// this depth does not exist, look lower
return findMaxDepth(scanner, min, mid - 1);
}
}
private int findMaxDepth(Scanner scanner) {
// do binary search to find max depth
int origBatchSize = scanner.getBatchSize();
scanner.setBatchSize(100);
int depth = findMaxDepth(scanner, 0, 64, 999);
scanner.setBatchSize(origBatchSize);
return depth;
}
// find the count column and consume a row
private Entry<Key,Value> findCount(Entry<Key,Value> entry, Iterator<Entry<Key,Value>> iterator,
CountValue cv) {
Key key = entry.getKey();
Text currentRow = key.getRow();
if (key.compareColumnQualifier(QueryUtil.COUNTS_COLQ) == 0)
cv.set(entry.getValue());
while (iterator.hasNext()) {
entry = iterator.next();
entriesScanned++;
key = entry.getKey();
if (key.compareRow(currentRow) != 0)
return entry;
if (key.compareColumnFamily(QueryUtil.DIR_COLF) == 0
&& key.compareColumnQualifier(QueryUtil.COUNTS_COLQ) == 0) {
cv.set(entry.getValue());
}
}
return null;
}
private Entry<Key,Value> consumeRow(Entry<Key,Value> entry, Iterator<Entry<Key,Value>> iterator) {
Key key = entry.getKey();
Text currentRow = key.getRow();
while (iterator.hasNext()) {
entry = iterator.next();
entriesScanned++;
key = entry.getKey();
if (key.compareRow(currentRow) != 0)
return entry;
}
return null;
}
private String extractDir(Key key) {
String row = key.getRowData().toString();
return row.substring(3, row.lastIndexOf('/'));
}
private Mutation createMutation(int depth, String dir, CountValue countVal) {
Mutation m = new Mutation(String.format("%03d%s", depth, dir));
m.put(QueryUtil.DIR_COLF, QueryUtil.COUNTS_COLQ, visibility, countVal.toValue());
return m;
}
private void calculateCounts(Scanner scanner, int depth, BatchWriter batchWriter)
throws Exception {
scanner.setRange(
new Range(String.format("%03d", depth), true, String.format("%03d", depth + 1), false));
CountValue countVal = new CountValue();
Iterator<Entry<Key,Value>> iterator = scanner.iterator();
String currentDir = null;
Entry<Key,Value> entry = null;
if (iterator.hasNext()) {
entry = iterator.next();
entriesScanned++;
}
while (entry != null) {
Key key = entry.getKey();
String dir = extractDir(key);
if (currentDir == null) {
currentDir = dir;
} else if (!currentDir.equals(dir)) {
batchWriter.addMutation(createMutation(depth - 1, currentDir, countVal));
inserts++;
currentDir = dir;
countVal.clear();
}
// process a whole row
if (key.compareColumnFamily(QueryUtil.DIR_COLF) == 0) {
CountValue tmpCount = new CountValue();
entry = findCount(entry, iterator, tmpCount);
if (tmpCount.dirCount == 0 && tmpCount.fileCount == 0) {
// in this case the higher depth will not insert anything if the
// dir has no children, so insert something here
Mutation m = new Mutation(key.getRow());
m.put(QueryUtil.DIR_COLF, QueryUtil.COUNTS_COLQ, visibility, tmpCount.toValue());
batchWriter.addMutation(m);
inserts++;
}
countVal.incrementRecursive(tmpCount);
countVal.incrementDirs();
} else {
entry = consumeRow(entry, iterator);
countVal.incrementFiles();
}
}
if (currentDir != null) {
batchWriter.addMutation(createMutation(depth - 1, currentDir, countVal));
inserts++;
}
}
public FileCount(AccumuloClient client, String tableName, Authorizations auths,
ColumnVisibility cv, ScannerOpts scanOpts, BatchWriterOpts bwOpts) {
this.client = client;
this.tableName = tableName;
this.auths = auths;
this.visibility = cv;
this.scanOpts = scanOpts;
this.bwOpts = bwOpts;
}
public void run() throws Exception {
entriesScanned = 0;
inserts = 0;
try (Scanner scanner = client.createScanner(tableName, auths);
BatchWriter bw = client.createBatchWriter(tableName, bwOpts.getBatchWriterConfig())) {
scanner.setBatchSize(scanOpts.scanBatchSize);
long t1 = System.currentTimeMillis();
int depth = findMaxDepth(scanner);
long t2 = System.currentTimeMillis();
for (int d = depth; d > 0; d--) {
calculateCounts(scanner, d, bw);
// must flush so next depth can read what prev depth wrote
bw.flush();
}
long t3 = System.currentTimeMillis();
System.out.printf("Max depth : %d%n", depth);
System.out.printf("Time to find max depth : %,d ms%n", (t2 - t1));
System.out.printf("Time to compute counts : %,d ms%n", (t3 - t2));
System.out.printf("Entries scanned : %,d %n", entriesScanned);
System.out.printf("Counts inserted : %,d %n", inserts);
}
}
public static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--vis", description = "use a given visibility for the new counts",
converter = VisibilityConverter.class)
ColumnVisibility visibility = new ColumnVisibility();
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
ScannerOpts scanOpts = new ScannerOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
String programName = FileCount.class.getName();
opts.parseArgs(programName, args, scanOpts, bwOpts);
try (AccumuloClient client = opts.createAccumuloClient()) {
FileCount fileCount = new FileCount(client, opts.getTableName(), opts.auths, opts.visibility,
scanOpts, bwOpts);
fileCount.run();
}
}
}