blob: fabf5120a55840e2ac3e226ac8628fa30df502a5 [file] [log] [blame]
package org.apache.rya.indexing.pcj.fluo.integration;
import static java.util.Objects.requireNonNull;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
import org.junit.BeforeClass;
import org.junit.Test;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.repository.sail.SailRepositoryConnection;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class FluoLatencyIT extends KafkaExportITBase {
private static ValueFactory vf;
private static DatatypeFactory dtf;
@BeforeClass
public static void init() throws DatatypeConfigurationException {
vf = new ValueFactoryImpl();
dtf = DatatypeFactory.newInstance();
}
@Test
public void resultsExported() throws Exception {
final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
+ " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}";
try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
String pcjId = FluoQueryUtils.createNewPcjId();
FluoConfiguration conf = super.getFluoConfiguration();
new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
long start = System.currentTimeMillis();
int numReturned = 0;
int numObs = 10;
int numTypes = 5;
int numExpected = 0;
int increment = numObs*numTypes;
while (System.currentTimeMillis() - start < 60000) {
List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
conn.add(statements);
numExpected += increment;
System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
+ getNumFluoEntries(fluoClient));
numReturned += readAllResults(pcjId).size();
System.out
.println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
// FluoITHelper.printFluoTable(conf);
Thread.sleep(30000);
}
}
}
/**
* Generates (numObservationsPerType x numTypes) statements of the form:
*
* <pre>
* urn:obs_n uri:hasTime zonedTime
* urn:obs_n uri:hasObsType typePrefix_m
* </pre>
*
* Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
* observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
*
* @param numObservationsPerType - The quantity of observations per type to generate.
* @param numTypes - The number of types to generate observations for.
* @param typePrefix - The prefix to be used for the type literal in the statement.
* @param observationOffset - The offset to be used for determining the value of n in the above statements.
* @param zonedTime - The time to be used for all observations generated.
* @return A new list of all generated Statements.
*/
public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
final long observationOffset, final ZonedDateTime zonedTime) {
final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
final List<Statement> statements = Lists.newArrayList();
for (long i = 0; i < numObservationsPerType; i++) {
for (int j = 0; j < numTypes; j++) {
final long observationId = observationOffset + i * numTypes + j;
// final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
// final String obsId = "urn:obs_" + observationId;
final String obsId = "urn:obs_" + String.format("%020d", observationId);
final String type = typePrefix + j;
// logger.info(obsId + " " + type + " " + litTime);
statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
}
}
return statements;
}
private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
requireNonNull(pcjId);
// Read all of the results from the Kafka topic.
final Set<VisibilityBindingSet> results = new HashSet<>();
try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
results.add(recordIterator.next().value());
}
}
return results;
}
private int getNumAccEntries(String tableName) throws TableNotFoundException {
Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
int count = 0;
for (Map.Entry<Key, Value> entry : scanner) {
count++;
}
return count;
}
private int getNumFluoEntries(FluoClient client) {
Transaction tx = client.newTransaction();
CellScanner scanner = tx.scanner().build();
int count = 0;
for (RowColumnValue rcv : scanner) {
count++;
}
return count;
}
}