blob: c2086d259f9aff1e959e5c0e76bb7a8c8e54423c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.external;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.log.LogUtils;
import org.apache.rya.api.persist.RyaDAO;
import org.apache.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
import org.apache.rya.indexing.external.fluo.PcjUpdaterSupplierFactory;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException;
import org.eclipse.rdf4j.model.IRI;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Updates the state of the Precomputed Join indices that are used by Rya.
*/
@DefaultAnnotation(NonNull.class)
public class PrecomputedJoinIndexer extends AbstractAccumuloIndexer {
private static final Logger log = Logger
.getLogger(PrecomputedJoinIndexer.class);
/**
* This configuration object must be set before {@link #init()} is invoked.
* It is set by {@link #setConf(Configuration)}.
*/
private Optional<Configuration> conf = Optional.absent();
/**
* The Accumulo Connector that must be used when accessing an Accumulo
* storage. This value is provided by {@link #setConnector(Connector)}.
*/
private Optional<Connector> accumuloConn = Optional.absent();
/**
* Provides access to the {@link Configuration} that was provided to this
* class using {@link #setConf(Configuration)}.
*/
private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() {
@Override
public Configuration get() {
return getConf();
}
};
/**
* Provides access to the Accumulo {@link Connector} that was provided to
* this class using {@link #setConnector(Connector)}.
*/
private final Supplier<Connector> accumuloSupplier = new Supplier<Connector>() {
@Override
public Connector get() {
return accumuloConn.get();
}
};
/**
* Creates and grants access to the {@link PrecomputedJoinStorage} that will
* be used to interact with the PCJ results that are stored and used by Rya.
*/
private final PrecomputedJoinStorageSupplier pcjStorageSupplier = new PrecomputedJoinStorageSupplier(
configSupplier, new AccumuloPcjStorageSupplier(configSupplier,
accumuloSupplier));
private PrecomputedJoinStorage pcjStorage;
/**
* Creates and grants access to the {@link PrecomputedJoinUpdater}s that will
* be used to update the state stored within the PCJ tables that are stored
* in Accumulo.
*/
private Supplier<PrecomputedJoinUpdater> updaterSupplier;
@Override
public void setConf(final Configuration conf) {
this.conf = Optional.fromNullable(conf);
}
@Override
public Configuration getConf() {
return conf.get();
}
/**
* Set the connector that will be used by {@link AccumuloPcjStorage} if the
* application is configured to store the PCJs within Accumulo.
*/
@Override
public void setConnector(final Connector connector) {
checkNotNull(connector);
accumuloConn = Optional.of(connector);
}
/**
* This is invoked when the host {@link RyaDAO#init()} method is invoked.
*/
@Override
public void init() {
pcjStorage = pcjStorageSupplier.get();
updaterSupplier = new PcjUpdaterSupplierFactory(configSupplier).getSupplier();
updaterSupplier.get();
}
@Override
public void storeStatement(final RyaStatement statement) throws IOException {
checkNotNull(statement);
storeStatements(Collections.singleton(statement));
}
@Override
public void storeStatements(final Collection<RyaStatement> statements)
throws IOException {
checkNotNull(statements);
try {
updaterSupplier.get().addStatements(statements);
} catch (final PcjUpdateException e) {
throw new IOException(
"Could not update the PCJs by adding the provided statements.",
e);
}
}
@Override
public void deleteStatement(final RyaStatement statement)
throws IOException {
checkNotNull(statement);
try {
Collection<RyaStatement> statements = Collections.singleton(statement);
updaterSupplier.get().deleteStatements(statements);
} catch (final PcjUpdateException e) {
throw new IOException(
"Could not update the PCJs by removing the provided statement.",
e);
}
}
@Override
public void flush() throws IOException {
try {
updaterSupplier.get().flush();
} catch (final PcjUpdateException e) {
throw new IOException("Could not flush the PCJ Updater.", e);
}
}
@Override
public void close() {
try {
pcjStorage.close();
} catch (final PCJStorageException e) {
log.error("Could not close the PCJ Storage instance.", e);
}
try {
updaterSupplier.get().close();
} catch (final PcjUpdateException e) {
log.error("Could not close the PCJ Updater instance.", e);
}
}
/**
* This is invoked when the host {@link RyaDAO#destroy()} method is invoked.
*/
@Override
public void destroy() {
close();
}
/**
* Deletes all data from the PCJ indices that are managed by a
* {@link PrecomputedJoinStorage}.
*/
@Override
public void purge(final RdfCloudTripleStoreConfiguration configuration) {
try {
for (final String pcjId : pcjStorage.listPcjs()) {
try {
pcjStorage.purge(pcjId);
} catch (final PCJStorageException e) {
log.error(
"Could not purge the PCJ index with id: " + LogUtils.clean(pcjId),
e);
}
}
} catch (final PCJStorageException e) {
log.error(
"Could not purge the PCJ indicies because they could not be listed.",
e);
}
}
/**
* Deletes all of the PCJ indices that are managed by
* {@link PrecomputedJoinStorage}.
*/
@Override
public void dropAndDestroy() {
try {
for (String pcjId : pcjStorage.listPcjs()) {// FIXME final
try {
pcjStorage.dropPcj(pcjId);
} catch (final PCJStorageException e) {
log.error("Could not delete the PCJ index with id: "
+ LogUtils.clean(pcjId), e);
}
}
} catch (final PCJStorageException e) {
log.error(
"Could not delete the PCJ indicies because they could not be listed.",
e);
}
}
@Override
public void setMultiTableBatchWriter(final MultiTableBatchWriter writer)
throws IOException {
// We do not need to use the writer that also writes to the core RYA
// tables.
}
@Override
public void dropGraph(final RyaURI... graphs) {
log.warn("PCJ indices do not store Graph metadata, so graph results can not be dropped.");
}
@Override
public String getTableName() {
// This method makes assumptions about how PCJs are stored. It's only
// used by AccumuloRyaDAO to purge data, so it should be replaced with
// a purge() method.
log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented.");
return null;
}
@Override
public Set<IRI> getIndexablePredicates() {
return new HashSet<>();
}
}