blob: 1c8c78b93f497cc103b1c5390b9353631b089828 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.pcj.fluo.test.base;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.UnknownHostException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloInstall;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.apache.rya.sail.config.RyaSailFactory;
import org.apache.rya.test.accumulo.MiniAccumuloClusterInstance;
import org.apache.rya.test.accumulo.MiniAccumuloSingleton;
import org.apache.rya.test.accumulo.RyaTestInstanceRule;
import org.apache.zookeeper.ClientCnxn;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
/**
* Integration tests that ensure the Fluo application processes PCJs results
* correctly.
* <p>
* This class is being ignored because it doesn't contain any unit tests.
*/
public abstract class FluoITBase {
private static final Logger log = Logger.getLogger(FluoITBase.class);
// Mini Accumulo Cluster
private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance();
private static MiniAccumuloCluster cluster;
private static String instanceName = null;
private static String zookeepers = null;
protected static Connector accumuloConn = null;
// Fluo data store and connections.
protected MiniFluo fluo = null;
protected FluoConfiguration fluoConfig = null;
protected FluoClient fluoClient = null;
// Rya data store and connections.
protected RyaSailRepository ryaRepo = null;
protected RepositoryConnection ryaConn = null;
@Rule
public RyaTestInstanceRule testInstance = new RyaTestInstanceRule();
@BeforeClass
public static void beforeClass() throws Exception {
Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
// Setup and start the Mini Accumulo.
cluster = clusterInstance.getCluster();
// Store a connector to the Mini Accumulo.
instanceName = cluster.getInstanceName();
zookeepers = cluster.getZooKeepers();
final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword()));
}
@Before
public void setupMiniResources() throws Exception {
// Initialize the Mini Fluo that will be used to store created queries.
fluoConfig = createFluoConfig();
preFluoInitHook();
FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions()
.setClearTable(true)
.setClearZookeeper(true));
postFluoInitHook();
fluo = FluoFactory.newMiniFluo(fluoConfig);
fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
// Initialize the Rya that will be used by the tests.
ryaRepo = setupRya();
ryaConn = ryaRepo.getConnection();
}
@After
public void shutdownMiniResources() {
if (ryaConn != null) {
try {
log.info("Shutting down Rya Connection.");
ryaConn.close();
log.info("Rya Connection shut down.");
} catch (final Exception e) {
log.error("Could not shut down the Rya Connection.", e);
}
}
if (ryaRepo != null) {
try {
log.info("Shutting down Rya Repo.");
ryaRepo.shutDown();
log.info("Rya Repo shut down.");
} catch (final Exception e) {
log.error("Could not shut down the Rya Repo.", e);
}
}
if (fluoClient != null) {
try {
log.info("Shutting down Fluo Client.");
fluoClient.close();
log.info("Fluo Client shut down.");
} catch (final Exception e) {
log.error("Could not shut down the Fluo Client.", e);
}
}
if (fluo != null) {
try {
log.info("Shutting down Mini Fluo.");
fluo.close();
log.info("Mini Fluo shut down.");
} catch (final Exception e) {
log.error("Could not shut down the Mini Fluo.", e);
}
}
StatementPatternIdCacheSupplier.clear();
MetadataCacheSupplier.clear();
}
protected void preFluoInitHook() throws Exception {
}
protected void postFluoInitHook() throws Exception {
}
protected MiniAccumuloCluster getMiniAccumuloCluster() {
return cluster;
}
protected MiniFluo getMiniFluo() {
return fluo;
}
public RyaSailRepository getRyaSailRepository() {
return ryaRepo;
}
public Connector getAccumuloConnector() {
return accumuloConn;
}
public String getRyaInstanceName() {
return testInstance.getRyaInstanceName();
}
protected String getUsername() {
return clusterInstance.getUsername();
}
protected String getPassword() {
return clusterInstance.getPassword();
}
protected FluoConfiguration getFluoConfiguration() {
return fluoConfig;
}
public AccumuloConnectionDetails createConnectionDetails() {
return new AccumuloConnectionDetails(
clusterInstance.getUsername(),
clusterInstance.getPassword().toCharArray(),
clusterInstance.getInstanceName(),
clusterInstance.getZookeepers());
}
private FluoConfiguration createFluoConfig() {
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
config.setMiniStartAccumulo(false);
config.setAccumuloInstance(instanceName);
config.setAccumuloUser(clusterInstance.getUsername());
config.setAccumuloPassword(clusterInstance.getPassword());
config.setInstanceZookeepers(zookeepers + "/fluo");
config.setAccumuloZookeepers(zookeepers);
config.setApplicationName(getRyaInstanceName());
config.setAccumuloTable("fluo" + getRyaInstanceName());
return config;
}
/**
* Sets up a Rya instance.
*/
protected RyaSailRepository setupRya()
throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException,
RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException {
checkNotNull(instanceName);
checkNotNull(zookeepers);
// Setup Rya configuration values.
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
conf.setTablePrefix(getRyaInstanceName());
conf.setDisplayQueryPlan(true);
conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername());
conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword());
conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName());
conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers());
conf.set(ConfigUtils.USE_PCJ, "true");
conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName());
conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
// Install the test instance of Rya.
final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn);
final InstallConfiguration installConfig = InstallConfiguration.builder()
.setEnableTableHashPrefix(true)
.setEnableEntityCentricIndex(true)
.setEnableFreeTextIndex(true)
.setEnableTemporalIndex(true)
.setEnablePcjIndex(true)
.setEnableGeoIndex(true)
.setFluoPcjAppName(getRyaInstanceName())
.build();
install.install(getRyaInstanceName(), installConfig);
// Connect to the instance of Rya that was just installed.
final Sail sail = RyaSailFactory.getInstance(conf);
final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
return ryaRepo;
}
}