blob: 8be27e8aa7327727696403150350be37f2502d30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package phrasecount.query;
import java.util.Iterator;
import java.util.Map.Entry;
import com.google.common.collect.Iterators;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.hadoop.io.Text;
import phrasecount.pojos.Counts;
import phrasecount.pojos.PhraseAndCounts;
/**
* All of the code for dealing with the Accumulo table that Fluo is exporting to
*/
public class PhraseCountTable implements Iterable<PhraseAndCounts> {
static final String STAT_CF = "stat";
// name of column qualifier used to store phrase count across all documents
static final String TOTAL_PC_CQ = "totalCount";
// name of column qualifier used to store number of documents containing a phrase
static final String DOC_PC_CQ = "docCount";
public static Mutation createMutation(String phrase, long seq, Counts pc) {
Mutation mutation = new Mutation(phrase);
// use the sequence number for the Accumulo timestamp, this will cause older updates to fall
// behind newer ones
if (pc.totalPhraseCount == 0) {
mutation.putDelete(STAT_CF, TOTAL_PC_CQ, seq);
} else {
mutation.put(STAT_CF, TOTAL_PC_CQ, seq, pc.totalPhraseCount + "");
}
if (pc.docPhraseCount == 0) {
mutation.putDelete(STAT_CF, DOC_PC_CQ, seq);
} else {
mutation.put(STAT_CF, DOC_PC_CQ, seq, pc.docPhraseCount + "");
}
return mutation;
}
private Connector conn;
private String table;
public PhraseCountTable(FluoConfiguration fluoConfig, String table) throws Exception {
ZooKeeperInstance zki = new ZooKeeperInstance(
new ClientConfiguration().withZkHosts(fluoConfig.getAccumuloZookeepers())
.withInstance(fluoConfig.getAccumuloInstance()));
this.conn = zki.getConnector(fluoConfig.getAccumuloUser(),
new PasswordToken(fluoConfig.getAccumuloPassword()));
this.table = table;
}
public PhraseCountTable(Connector conn, String table) {
this.conn = conn;
this.table = table;
}
public Counts getPhraseCounts(String phrase) throws Exception {
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(phrase));
int sum = 0;
int docCount = 0;
for (Entry<Key, Value> entry : scanner) {
String cq = entry.getKey().getColumnQualifierData().toString();
if (cq.equals(TOTAL_PC_CQ)) {
sum = Integer.valueOf(entry.getValue().toString());
}
if (cq.equals(DOC_PC_CQ)) {
docCount = Integer.valueOf(entry.getValue().toString());
}
}
return new Counts(docCount, sum);
}
@Override
public Iterator<PhraseAndCounts> iterator() {
try {
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
scanner.fetchColumn(new Text(STAT_CF), new Text(TOTAL_PC_CQ));
scanner.fetchColumn(new Text(STAT_CF), new Text(DOC_PC_CQ));
RowTransform transform = new RowTransform();
return Iterators.transform(new RowIterator(scanner), transform::apply);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}