blob: 63a12e0932e2fda31bfbf87cee28cca79e60bf6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.shard;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.examples.cli.ClientOpts;
import com.beust.jcommander.Parameter;
/**
* This program indexes a set of documents given on the command line into a shard table.
*
* What it writes to the table is row = partition id, column family = term, column qualifier =
* document id.
*/
public class Index {
static String genPartition(int partition) {
return String.format("%08x", Math.abs(partition));
}
public static void index(int numPartitions, String docId, String doc, String splitRegex,
BatchWriter bw) throws Exception {
String[] tokens = doc.split(splitRegex);
String partition = genPartition(doc.hashCode() % numPartitions);
Mutation m = new Mutation(partition);
HashSet<String> tokensSeen = new HashSet<>();
for (String token : tokens) {
token = token.toLowerCase();
if (!tokensSeen.contains(token)) {
tokensSeen.add(token);
m.put(token, docId, new Value(new byte[0]));
}
}
if (m.size() > 0)
bw.addMutation(m);
}
public static void index(int numPartitions, File src, String splitRegex, BatchWriter bw)
throws Exception {
if (src.isDirectory()) {
File[] files = src.listFiles();
if (files != null) {
for (File child : files) {
index(numPartitions, child, splitRegex, bw);
}
}
} else {
FileReader fr = new FileReader(src);
StringBuilder sb = new StringBuilder();
char[] data = new char[4096];
int len;
while ((len = fr.read(data)) != -1) {
sb.append(data, 0, len);
}
fr.close();
index(numPartitions, src.getAbsolutePath(), sb.toString(), splitRegex, bw);
}
}
static class IndexOpts extends ClientOpts {
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
private String tableName;
@Parameter(names = "--partitions", required = true,
description = "the number of shards to create")
int partitions;
@Parameter(required = true, description = "<file> { <file> ... }")
List<String> files = new ArrayList<>();
}
public static void main(String[] args) throws Exception {
IndexOpts opts = new IndexOpts();
opts.parseArgs(Index.class.getName(), args);
String splitRegex = "\\W+";
try (AccumuloClient client = Accumulo.newClient().from(opts.getClientPropsPath()).build();
BatchWriter bw = client.createBatchWriter(opts.tableName)) {
for (String filename : opts.files) {
index(opts.partitions, new File(filename), splitRegex, bw);
}
}
}
}