blob: e19440730b45d5331b80a5f4e6d04d0f3c372279 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.demo;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
import org.apache.rya.api.instance.RyaDetails.FreeTextIndexDetails;
import org.apache.rya.api.instance.RyaDetails.JoinSelectivityDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.apache.rya.api.instance.RyaDetailsRepository;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.indexing.pcj.fluo.demo.Demo.DemoExecutionException;
import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import com.google.common.base.Optional;
import com.google.common.io.Files;
/**
* Runs {@link Demo}s that require Rya and Fluo.
*/
public class DemoDriver {
private static final Logger log = Logger.getLogger(DemoDriver.class);
private static final String RYA_TABLE_PREFIX = "demo_";
public static final String USE_MOCK_INSTANCE = ".useMockInstance";
public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
public static final String CLOUDBASE_USER = "sc.cloudbase.username";
public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
// Rya data store and connections.
private static MiniAccumuloCluster accumulo = null;
private static Connector accumuloConn = null;
private static RyaSailRepository ryaRepo = null;
private static RepositoryConnection ryaConn = null;
// Fluo data store and connections.
private static MiniFluo fluo = null;
private static FluoClient fluoClient = null;
public static void main(final String[] args) {
setupLogging();
// Setup the resources required to run the demo.
try {
log.info("Initializing resources used by the demo...");
setupResources();
} catch (final DemoInitializationException e) {
log.error("Could not initialize the demo's resources. Exiting.", e);
System.exit(-1);
}
log.info("");
// Run the demo.
try {
new FluoAndHistoricPcjsDemo().execute(accumulo, accumuloConn, RYA_TABLE_PREFIX, ryaRepo, ryaConn, fluo, fluoClient);
} catch (final DemoExecutionException e) {
log.error("An exception was thrown durring demo execution. The demo can not continue.", e);
}
// Tear down the demo environment.
log.info("Shutting down the demo...");
shutdownResources();
log.info("Demo exiting.");
}
private static void setupLogging() {
// Turn off all the loggers and customize how they write to the console.
final Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.OFF);
final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout");
ca.setLayout(new PatternLayout("%-5p - %m%n"));
// Turn the loggers used by the demo back on.
log.setLevel(Level.INFO);
}
/**
* Indicates a problem while initializing the demo's resources prevented it from starting.
*/
private static final class DemoInitializationException extends Exception {
private static final long serialVersionUID = 1L;
public DemoInitializationException(final String message, final Exception cause) {
super(message, cause);
}
}
private static void setupResources() throws DemoInitializationException {
try{
// Initialize the Mini Accumulo that will be used to store Triples and get a connection to it.
log.debug("Starting up the Mini Accumulo Cluster used by Rya.");
accumulo = startMiniAccumulo();
// Setup the Rya library to use the Mini Accumulo.
log.debug("Starting up the Rya Repository.");
ryaRepo = setupRya(accumulo);
ryaConn = ryaRepo.getConnection();
// Initialize the Mini Fluo that will be used to store created queries.
log.debug("Starting up the Mini Fluo instance.");
fluo = startMiniFluo();
fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() );
} catch(final Exception e) {
throw new DemoInitializationException("Could not run the demo because of a problem while initializing the mini resources.", e);
}
}
private static void shutdownResources() {
if(ryaConn != null) {
try {
log.debug("Shutting down Rya Connection.");
ryaConn.close();
} catch(final Exception e) {
log.error("Could not shut down the Rya Connection.", e);
}
}
if(ryaRepo != null) {
try {
log.debug("Shutting down Rya Repo.");
ryaRepo.shutDown();
} catch(final Exception e) {
log.error("Could not shut down the Rya Repo.", e);
}
}
if(accumulo != null) {
try {
log.debug("Shutting down the Mini Accumulo being used as a Rya store.");
accumulo.stop();
} catch(final Exception e) {
log.error("Could not shut down the Mini Accumulo.", e);
}
}
if(fluoClient != null) {
try {
log.debug("Shutting down Fluo Client.");
fluoClient.close();
} catch(final Exception e) {
log.error("Could not shut down the Fluo Client.", e);
}
}
if(fluo != null) {
try {
log.debug("Shutting down Mini Fluo.");
fluo.close();
} catch (final Exception e) {
log.error("Could not shut down the Mini Fluo.", e);
}
}
}
/**
* Setup a Mini Accumulo cluster that uses a temporary directory to store its data.
*
* @return A Mini Accumulo cluster.
*/
private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
final File miniDataDir = Files.createTempDir();
// Setup and start the Mini Accumulo.
final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password");
accumulo.start();
// Store a connector to the Mini Accumulo.
final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
accumuloConn = instance.getConnector("root", new PasswordToken("password"));
return accumulo;
}
/**
* Format a Mini Accumulo to be a Rya repository.
*
* @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null)
* @return The Rya repository sitting on top of the Mini Accumulo.
*/
private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException, AlreadyInitializedException, RyaDetailsRepositoryException {
checkNotNull(accumulo);
// Setup the Rya Repository that will be used to create Repository Connections.
final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
crdfdao.setConnector(accumuloConn);
// Setup Rya configuration values.
final String ryaInstanceName = "demo_";
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
conf.setTablePrefix("demo_");
conf.setDisplayQueryPlan(true);
conf.setBoolean(USE_MOCK_INSTANCE, true);
conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX);
conf.set(CLOUDBASE_USER, "root");
conf.set(CLOUDBASE_PASSWORD, "password");
conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName());
crdfdao.setConf(conf);
ryaStore.setRyaDAO(crdfdao);
final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
ryaRepo.initialize();
// Create Rya Details for the instance name.
final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, ryaInstanceName);
final RyaDetails details = RyaDetails.builder()
.setRyaInstanceName(ryaInstanceName)
.setRyaVersion("0.0.0.0")
.setFreeTextDetails( new FreeTextIndexDetails(true) )
.setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
//RYA-215 .setGeoIndexDetails( new GeoIndexDetails(true) )
.setTemporalIndexDetails( new TemporalIndexDetails(true) )
.setPCJIndexDetails(
PCJIndexDetails.builder()
.setEnabled(true) )
.setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.absent() ) )
.setProspectorDetails( new ProspectorDetails( Optional.absent() ))
.build();
detailsRepo.initialize(details);
return ryaRepo;
}
/**
* Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll
*
* @return A Mini Fluo cluster.
*/
private static MiniFluo startMiniFluo() {
final File miniDataDir = Files.createTempDir();
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
// Provide export parameters child test classes may provide to the export observer.
final HashMap<String, String> exportParams = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
ryaParams.setZookeeperServers(accumulo.getZooKeepers());
ryaParams.setExporterUsername("root");
ryaParams.setExporterPassword("password");
ryaParams.setRyaInstanceName("demo_");
final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
observers.add(exportObserverConfig);
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
config.setApplicationName("IntegrationTests");
config.setMiniDataDir(miniDataDir.getAbsolutePath());
config.addObservers(observers);
final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
return miniFluo;
}
}