blob: 85f6e15cde3e7a04648204be3ea2b22f3ee9eab7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.periodic.notification.application;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.eclipse.rdf4j.query.BindingSet;
/**
* Factory for creating a {@link PeriodicNotificationApplication}.
*/
public class PeriodicNotificationApplicationFactory {
/**
* Create a PeriodicNotificationApplication.
* @param conf - Configuration object that specifies the parameters needed to create the application
* @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
* @throws PeriodicApplicationException
*/
public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf);
final Properties kafkaProducerProps = getKafkaProducerProperties(conf);
final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
FluoClient fluo = null;
try {
final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
addRegisteredNotices(coordinator, fluo.newSnapshot());
final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
.setProcessor(processor).setPruner(pruner).build();
} catch (AccumuloException | AccumuloSecurityException e) {
throw new PeriodicApplicationException(e.getMessage());
}
}
private static void addRegisteredNotices(final NotificationCoordinatorExecutor coord, final Snapshot sx) {
coord.start();
final PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
provider.processRegisteredNotifications(coord, sx);
}
private static NotificationCoordinatorExecutor getCoordinator(final int numThreads, final BlockingQueue<TimestampedNotification> notifications) {
return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
}
private static KafkaExporterExecutor getExporter(final int numThreads, final Properties props, final BlockingQueue<BindingSetRecord> bindingSets) {
final KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
return new KafkaExporterExecutor(producer, numThreads, bindingSets);
}
private static PeriodicQueryPrunerExecutor getPruner(final PeriodicQueryResultStorage storage, final FluoClient fluo, final int numThreads,
final BlockingQueue<NodeBin> bins) {
return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
}
private static NotificationProcessorExecutor getProcessor(final PeriodicQueryResultStorage periodicStorage,
final BlockingQueue<TimestampedNotification> notifications, final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets,
final int numThreads) {
return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
}
private static KafkaNotificationProvider getProvider(final int numThreads, final String topic, final NotificationCoordinatorExecutor coord,
final Properties props) {
return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord,
numThreads);
}
private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(final PeriodicNotificationApplicationConfiguration conf)
throws AccumuloException, AccumuloSecurityException {
final Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
final Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
final String ryaInstance = conf.getTablePrefix();
return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
}
private static Properties getKafkaConsumerProperties(final PeriodicNotificationApplicationConfiguration conf) {
final Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000"); // reduce this value to 30 seconds for the scenario where we subscribe before the topic exists.
return kafkaProps;
}
private static Properties getKafkaProducerProperties(final PeriodicNotificationApplicationConfiguration conf) {
final Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
return kafkaProps;
}
}