blob: 1da96284408fe1b05cfd95318969581b6b65c15b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.sail.config.RyaSailFactory;
import org.apache.zookeeper.ClientCnxn;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import org.eclipse.rdf4j.queryrender.sparql.SPARQLQueryRenderer;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConnection;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
/**
* Demonstrates how a {@link RyaClient} may be used to interact with an instance
* of Accumulo to install and manage a Rya instance.
*/
public class RyaClientExample {
private static final Logger log = Logger.getLogger(RyaClientExample.class);
public static void main(final String[] args) throws Exception {
final String accumuloUsername = "root";
final String accumuloPassword = "password";
MiniAccumuloCluster cluster = null;
MiniFluo fluo = null;
Sail ryaSail = null;
try {
// Setup a Mini Accumulo Cluster to host the Rya instance.
log.info("Setting up the Mini Accumulo Cluster used by this example.");
final File miniDataDir = Files.createTempDir();
final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, accumuloPassword);
cluster = new MiniAccumuloCluster(cfg);
cluster.start();
// Setup a Mini Fluo application that will be used to incrementally update the PCJ indicies.
log.info("Setting up the Mini Fluo application used by this example.");
final String fluoAppName = "demoInstance_pcjUpdater";
fluo = makeMiniFluo(accumuloUsername, accumuloPassword, cluster.getInstanceName(), cluster.getZooKeepers(), fluoAppName);
// Give the root user the 'U' authorizations.
final Connector connector = cluster.getConnector(accumuloUsername, accumuloPassword);
connector.securityOperations().changeUserAuthorizations(accumuloUsername, new Authorizations("U"));
// Setup a Rya Client that is able to interact with the mini cluster.
final AccumuloConnectionDetails connectionDetails =
new AccumuloConnectionDetails(accumuloUsername, accumuloPassword.toCharArray(), cluster.getInstanceName(), cluster.getZooKeepers());
final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, connector);
// Install an instance of Rya that has all of the secondary indexers turned on.
final String ryaInstanceName = "demoInstance_";
final InstallConfiguration installConfig = InstallConfiguration.builder()
.setEnableTableHashPrefix(true)
.setEnableEntityCentricIndex(true)
.setEnableGeoIndex(true)
.setEnableFreeTextIndex(true)
.setEnableTemporalIndex(true)
.setEnablePcjIndex(true)
.setFluoPcjAppName(fluoAppName)
.build();
ryaClient.getInstall().install(ryaInstanceName, installConfig);
// Add a PCJ index.
final String sparql =
"SELECT ?patron ?employee " +
"WHERE { " +
"?patron <http://talksTo> ?employee. " +
"?employee <http://worksAt> <http://CoffeeShop>. " +
"}";
// Load some statements into the Rya instance.
final AccumuloIndexingConfiguration conf = AccumuloIndexingConfiguration.builder()
.setAuths("U")
.setAccumuloUser(accumuloUsername)
.setAccumuloPassword(accumuloPassword)
.setAccumuloInstance(cluster.getInstanceName())
.setAccumuloZooKeepers(cluster.getZooKeepers())
.setRyaPrefix(ryaInstanceName)
.setPcjUpdaterFluoAppName(fluoAppName)
.build();
ryaSail = RyaSailFactory.getInstance(conf);
final ValueFactory vf = ryaSail.getValueFactory();
final List<Statement> statements = Lists.newArrayList(
vf.createStatement(vf.createIRI("http://Eve"), vf.createIRI("http://talksTo"), vf.createIRI("http://Charlie")),
vf.createStatement(vf.createIRI("http://David"), vf.createIRI("http://talksTo"), vf.createIRI("http://Alice")),
vf.createStatement(vf.createIRI("http://Alice"), vf.createIRI("http://worksAt"), vf.createIRI("http://CoffeeShop")),
vf.createStatement(vf.createIRI("http://Bob"), vf.createIRI("http://worksAt"), vf.createIRI("http://CoffeeShop")),
vf.createStatement(vf.createIRI("http://George"), vf.createIRI("http://talksTo"), vf.createIRI("http://Frank")),
vf.createStatement(vf.createIRI("http://Frank"), vf.createIRI("http://worksAt"), vf.createIRI("http://CoffeeShop")),
vf.createStatement(vf.createIRI("http://Eve"), vf.createIRI("http://talksTo"), vf.createIRI("http://Bob")),
vf.createStatement(vf.createIRI("http://Charlie"), vf.createIRI("http://worksAt"), vf.createIRI("http://CoffeeShop")));
SailConnection ryaConn = ryaSail.getConnection();
log.info("");
log.info("Loading the following statements:");
ryaConn.begin();
for(final Statement statement : statements) {
log.info(" " + statement.toString());
ryaConn.addStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
}
log.info("");
ryaConn.close();
fluo.waitForObservers();
// Execute the SPARQL query and print the results.
log.info("Executing the following query: ");
prettyLogSparql(sparql);
log.info("");
final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
ryaConn = ryaSail.getConnection();
final CloseableIteration<? extends BindingSet, QueryEvaluationException> result = ryaConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
log.info("Results:");
while(result.hasNext()) {
log.info(" " + result.next());
}
log.info("");
} finally {
if(ryaSail != null) {
log.info("Shutting down the Rya Sail instance.");
ryaSail.shutDown();
}
if(fluo != null) {
try {
log.info("Shutting down the Mini Fluo instance.");
fluo.close();
} catch (final Exception e) {
log.error("Could not shut down the Mini Fluo instance.", e);
}
}
if(cluster != null) {
log.info("Sutting down the Mini Accumulo Cluster.");
cluster.stop();
}
}
}
private static void setupLogging() {
// Turn off all the loggers and customize how they write to the console.
final Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.OFF);
final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout");
ca.setLayout(new PatternLayout("%-5p - %m%n"));
// Turn the logger used by the demo back on.
log.setLevel(Level.INFO);
Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
}
private static void prettyLogSparql(final String sparql) {
try {
// Pretty print.
final String[] lines = prettyFormatSparql(sparql);
for(final String line : lines) {
log.info(line);
}
} catch (final Exception e) {
// Pretty print failed, so ugly print instead.
log.info(sparql);
}
}
private static String[] prettyFormatSparql(final String sparql) throws Exception {
final SPARQLParser parser = new SPARQLParser();
final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer();
final ParsedQuery pq = parser.parseQuery(sparql, null);
final String prettySparql = renderer.render(pq);
return StringUtils.split(prettySparql, '\n');
}
private static MiniFluo makeMiniFluo(final String username, final String password, final String instanceName, final String zookeepers, final String fluoAppName) throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
observers.add(new ObserverSpecification(BatchObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
// Provide export parameters child test classes may provide to the
// export observer.
final HashMap<String, String> params = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(params);
ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName(instanceName);
ryaParams.setZookeeperServers(zookeepers);
ryaParams.setExporterUsername(username);
ryaParams.setExporterPassword(password);
ryaParams.setRyaInstanceName(fluoAppName);
final ObserverSpecification exportObserverConfig = new ObserverSpecification(
QueryResultObserver.class.getName(), params);
observers.add(exportObserverConfig);
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
config.setMiniStartAccumulo(false);
config.setAccumuloInstance(instanceName);
config.setAccumuloUser(username);
config.setAccumuloPassword(password);
config.setInstanceZookeepers(zookeepers + "/fluo");
config.setAccumuloZookeepers(zookeepers);
config.setApplicationName(fluoAppName);
config.setAccumuloTable("fluo" + fluoAppName);
config.addObservers(observers);
FluoFactory.newAdmin(config).initialize(
new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
return FluoFactory.newMiniFluo(config);
}
}