blob: f5f670a8377c6b837884dd584ca07b8e61a97ee4 [file] [log] [blame]
package phrasecount.query;
import java.util.Iterator;
import java.util.Map.Entry;
import com.google.common.collect.Iterators;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.hadoop.io.Text;
import phrasecount.pojos.Counts;
import phrasecount.pojos.PhraseAndCounts;
/**
* All of the code for dealing with the Accumulo table that Fluo is exporting to
*/
public class PhraseCountTable implements Iterable<PhraseAndCounts> {
static final String STAT_CF = "stat";
//name of column qualifier used to store phrase count across all documents
static final String TOTAL_PC_CQ = "totalCount";
//name of column qualifier used to store number of documents containing a phrase
static final String DOC_PC_CQ = "docCount";
public static Mutation createMutation(String phrase, long seq, Counts pc) {
Mutation mutation = new Mutation(phrase);
// use the sequence number for the Accumulo timestamp, this will cause older updates to fall
// behind newer ones
if (pc.totalPhraseCount == 0)
mutation.putDelete(STAT_CF, TOTAL_PC_CQ, seq);
else
mutation.put(STAT_CF, TOTAL_PC_CQ, seq, pc.totalPhraseCount + "");
if (pc.docPhraseCount == 0)
mutation.putDelete(STAT_CF, DOC_PC_CQ, seq);
else
mutation.put(STAT_CF, DOC_PC_CQ, seq, pc.docPhraseCount + "");
return mutation;
}
private Connector conn;
private String table;
public PhraseCountTable(FluoConfiguration fluoConfig, String table) throws Exception {
ZooKeeperInstance zki = new ZooKeeperInstance(
new ClientConfiguration().withZkHosts(fluoConfig.getAccumuloZookeepers())
.withInstance(fluoConfig.getAccumuloInstance()));
this.conn = zki.getConnector(fluoConfig.getAccumuloUser(),
new PasswordToken(fluoConfig.getAccumuloPassword()));
this.table = table;
}
public PhraseCountTable(Connector conn, String table) {
this.conn = conn;
this.table = table;
}
public Counts getPhraseCounts(String phrase) throws Exception {
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(phrase));
int sum = 0;
int docCount = 0;
for (Entry<Key, Value> entry : scanner) {
String cq = entry.getKey().getColumnQualifierData().toString();
if (cq.equals(TOTAL_PC_CQ)) {
sum = Integer.valueOf(entry.getValue().toString());
}
if (cq.equals(DOC_PC_CQ)) {
docCount = Integer.valueOf(entry.getValue().toString());
}
}
return new Counts(docCount, sum);
}
@Override
public Iterator<PhraseAndCounts> iterator() {
try {
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
scanner.fetchColumn(new Text(STAT_CF), new Text(TOTAL_PC_CQ));
scanner.fetchColumn(new Text(STAT_CF), new Text(DOC_PC_CQ));
return Iterators.transform(new RowIterator(scanner), new RowTransform());
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}