blob: b60b56fc792b2824af988182cf9fa829bd63a445 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.pcj.fluo.test.base;
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.sail.config.RyaSailFactory;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.Sail;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Sets;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
/**
* The base Integration Test class used for Fluo applications that export to a
* Kakfa topic.
*/
public class KafkaExportITBase extends AccumuloExportITBase {
protected static final String RYA_INSTANCE_NAME = "test_";
private static final String ZKHOST = "127.0.0.1";
private static final String BROKERHOST = "127.0.0.1";
private static final String BROKERPORT = "9092";
private ZkUtils zkUtils;
private KafkaServer kafkaServer;
private EmbeddedZookeeper zkServer;
private ZkClient zkClient;
// The Rya instance statements are written to that will be fed into the Fluo
// app.
private RyaSailRepository ryaSailRepo = null;
private AccumuloRyaDAO dao = null;
/**
* Add info about the Kafka queue/topic to receive the export.
*/
@Override
protected void preFluoInitHook() throws Exception {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
observers.add(new ObserverSpecification(BatchObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
// Configure the export observer to export new PCJ results to the mini
// accumulo cluster.
final HashMap<String, String> exportParams = new HashMap<>();
final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
kafkaParams.setUseKafkaBindingSetExporter(true);
kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
kafkaConstructParams.setUseKafkaSubgraphExporter(true);
final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
observers.add(exportObserverConfig);
// Add the observers to the Fluo Configuration.
super.getFluoConfiguration().addObservers(observers);
}
/**
* setup mini kafka and call the super to setup mini fluo
*/
@Before
public void setupKafka() throws Exception {
// Install an instance of Rya on the Accumulo cluster.
installRyaInstance();
// Setup Kafka.
zkServer = new EmbeddedZookeeper();
final String zkConnect = ZKHOST + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
final Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
final KafkaConfig config = new KafkaConfig(brokerProps);
final Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
}
@After
public void teardownRya() {
final MiniAccumuloCluster cluster = getMiniAccumuloCluster();
final String instanceName = cluster.getInstanceName();
final String zookeepers = cluster.getZooKeepers();
// Uninstall the instance of Rya.
final RyaClient ryaClient = AccumuloRyaClientFactory.build(
new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
super.getAccumuloConnector());
try {
ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
// Shutdown the repo.
if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
if(dao != null ) {dao.destroy();}
} catch (final Exception e) {
System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
}
}
@After
public void clearCaches() {
StatementPatternIdCacheSupplier.clear();
MetadataCacheSupplier.clear();
}
private void installRyaInstance() throws Exception {
final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
final String instanceName = cluster.getInstanceName();
final String zookeepers = cluster.getZooKeepers();
// Install the Rya instance to the mini accumulo cluster.
final RyaClient ryaClient = AccumuloRyaClientFactory.build(
new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
super.getAccumuloConnector());
ryaClient.getInstall().install(RYA_INSTANCE_NAME,
InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
.setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
.setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
// Connect to the Rya instance that was just installed.
final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
final Sail sail = RyaSailFactory.getInstance(conf);
dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
ryaSailRepo = new RyaSailRepository(sail);
}
protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
conf.setTablePrefix(RYA_INSTANCE_NAME);
// Accumulo connection information.
conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
conf.setAuths("");
// PCJ configuration information.
conf.set(ConfigUtils.USE_PCJ, "true");
conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
conf.setDisplayQueryPlan(true);
return conf;
}
/**
* @return A {@link RyaSailRepository} that is connected to the Rya instance
* that statements are loaded into.
*/
protected RyaSailRepository getRyaSailRepository() throws Exception {
return ryaSailRepo;
}
/**
* @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
* visibilities can be added to the Rya Instance
*/
protected AccumuloRyaDAO getRyaDAO() {
return dao;
}
/**
* Close all the Kafka mini server and mini-zookeeper
*/
@After
public void teardownKafka() {
if(kafkaServer != null) {kafkaServer.shutdown();}
if(zkClient != null) {zkClient.close();}
if(zkServer != null) {zkServer.shutdown();}
}
/**
* Test kafka without rya code to make sure kafka works in this environment.
* If this test fails then its a testing environment issue, not with Rya.
* Source: https://github.com/asmaier/mini-kafka
*/
@Test
public void embeddedKafkaTest() throws Exception {
// create topic
final String topic = "testTopic";
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
// setup producer
final Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
// setup consumer
final Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// to make sure the consumer starts from the beginning of the topic
consumerProps.put("auto.offset.reset", "earliest");
final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(topic));
// send message
final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
producer.send(data);
producer.close();
// starting consumer
final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
assertEquals(1, records.count());
final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
assertEquals(42, (int) record.key());
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
consumer.close();
}
protected KafkaConsumer<String, VisibilityBindingSet> makeConsumer(final String TopicName) {
// setup consumer
final Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoVisibilityBindingSetSerializer.class.getName());
// to make sure the consumer starts from the beginning of the topic
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TopicName));
return consumer;
}
protected String loadDataAndCreateQuery(final String sparql, final Collection<Statement> statements) throws Exception {
requireNonNull(sparql);
requireNonNull(statements);
// Register the PCJ with Rya.
final Instance accInstance = super.getAccumuloConnector().getInstance();
final Connector accumuloConn = super.getAccumuloConnector();
final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
loadData(statements);
// The PCJ Id is the topic name the results will be written to.
return pcjId;
}
protected void loadData(final Collection<Statement> statements) throws Exception {
requireNonNull(statements);
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
ryaConn.begin();
ryaConn.add(statements);
ryaConn.commit();
ryaConn.close();
// Wait for the Fluo application to finish computing the end result.
super.getMiniFluo().waitForObservers();
}
}