blob: 9b3752b30359dc0d706f5ed6c116ca53d8cd75de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.accumulo;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.MAX_TIME;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.rya.accumulo.experimental.AccumuloIndexer;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.layout.TableLayoutStrategy;
import org.apache.rya.api.persist.RyaDAO;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.RyaNamespaceManager;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.Namespace;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
private final AtomicBoolean isInitialized = new AtomicBoolean();
private final AtomicBoolean flushEachUpdate = new AtomicBoolean(true);
private Connector connector;
private BatchWriterConfig batchWriterConfig;
private MultiTableBatchWriter mt_bw;
// Do not flush these individually
private BatchWriter bw_spo;
private BatchWriter bw_po;
private BatchWriter bw_osp;
private BatchWriter bw_ns;
private List<AccumuloIndexer> secondaryIndexers;
private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
private RyaTableMutationsFactory ryaTableMutationsFactory;
private TableLayoutStrategy tableLayoutStrategy;
private AccumuloRyaQueryEngine queryEngine;
private RyaTripleContext ryaContext;
@Override
public boolean isInitialized() throws RyaDAOException {
return isInitialized.get();
}
@Override
public void init() throws RyaDAOException {
if (isInitialized.get()) {
return;
}
try {
checkNotNull(conf);
checkNotNull(connector);
if(batchWriterConfig == null){
batchWriterConfig = new BatchWriterConfig();
batchWriterConfig.setMaxMemory(MAX_MEMORY);
batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS);
batchWriterConfig.setMaxWriteThreads(NUM_THREADS);
}
tableLayoutStrategy = conf.getTableLayoutStrategy();
ryaContext = RyaTripleContext.getInstance(conf);
ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext);
secondaryIndexers = conf.getAdditionalIndexers();
flushEachUpdate.set(conf.flushEachUpdate());
final TableOperations tableOperations = connector.tableOperations();
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs());
for (final AccumuloIndexer index : secondaryIndexers) {
index.setConf(conf);
}
mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig);
//get the batch writers for tables
bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo());
bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
for (final AccumuloIndexer index : secondaryIndexers) {
index.setConnector(connector);
index.setMultiTableBatchWriter(mt_bw);
index.init();
}
queryEngine = new AccumuloRyaQueryEngine(connector, conf);
checkVersion();
isInitialized.set(true);
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public String getVersion() throws RyaDAOException {
String version = null;
final CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
if (versIter.hasNext()) {
version = versIter.next().getObject().getData();
}
versIter.close();
return version;
}
@Override
public void add(final RyaStatement statement) throws RyaDAOException {
commit(Iterators.singletonIterator(statement));
}
@Override
public void add(final Iterator<RyaStatement> iter) throws RyaDAOException {
commit(iter);
}
@Override
public void delete(final RyaStatement stmt, final AccumuloRdfConfiguration aconf) throws RyaDAOException {
this.delete(Iterators.singletonIterator(stmt), aconf);
}
@Override
public void delete(final Iterator<RyaStatement> statements, final AccumuloRdfConfiguration conf) throws RyaDAOException {
try {
while (statements.hasNext()) {
final RyaStatement stmt = statements.next();
//query first
final CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
while (query.hasNext()) {
deleteSingleRyaStatement(query.next());
}
for (final AccumuloIndexer index : secondaryIndexers) {
index.deleteStatement(stmt);
}
}
if (flushEachUpdate.get()) {
mt_bw.flush();
}
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public void dropGraph(final AccumuloRdfConfiguration conf, final RyaIRI... graphs) throws RyaDAOException {
BatchDeleter bd_spo = null;
BatchDeleter bd_po = null;
BatchDeleter bd_osp = null;
try {
bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations());
bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations());
bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations());
bd_spo.setRanges(Collections.singleton(new Range()));
bd_po.setRanges(Collections.singleton(new Range()));
bd_osp.setRanges(Collections.singleton(new Range()));
for (final RyaIRI graph : graphs){
bd_spo.fetchColumnFamily(new Text(graph.getData()));
bd_po.fetchColumnFamily(new Text(graph.getData()));
bd_osp.fetchColumnFamily(new Text(graph.getData()));
}
bd_spo.delete();
bd_po.delete();
bd_osp.delete();
//TODO indexers do not support delete-UnsupportedOperation Exception will be thrown
// for (AccumuloIndex index : secondaryIndexers) {
// index.dropGraph(graphs);
// }
} catch (final Exception e) {
throw new RyaDAOException(e);
} finally {
if (bd_spo != null) {
bd_spo.close();
}
if (bd_po != null) {
bd_po.close();
}
if (bd_osp != null) {
bd_osp.close();
}
}
}
protected void deleteSingleRyaStatement(final RyaStatement stmt) throws IOException, MutationsRejectedException {
final Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
}
protected void commit(final Iterator<RyaStatement> commitStatements) throws RyaDAOException {
try {
//TODO: Should have a lock here in case we are adding and committing at the same time
while (commitStatements.hasNext()) {
final RyaStatement stmt = commitStatements.next();
final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
bw_spo.addMutations(spo);
bw_po.addMutations(po);
bw_osp.addMutations(osp);
for (final AccumuloIndexer index : secondaryIndexers) {
index.storeStatement(stmt);
}
}
if (flushEachUpdate.get()) {
mt_bw.flush();
}
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public void destroy() throws RyaDAOException {
if (!isInitialized.get()) {
return;
}
//TODO: write lock
try {
isInitialized.set(false);
mt_bw.flush();
mt_bw.close();
} catch (final Exception e) {
throw new RyaDAOException(e);
}
for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.destroy();
} catch(final Exception e) {
logger.warn("Failed to destroy indexer", e);
}
}
}
@Override
public void addNamespace(final String pfx, final String namespace) throws RyaDAOException {
try {
final Mutation m = new Mutation(new Text(pfx));
m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes(StandardCharsets.UTF_8)));
bw_ns.addMutation(m);
if (flushEachUpdate.get()) {
mt_bw.flush();
}
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public String getNamespace(final String pfx) throws RyaDAOException {
try {
final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
ALL_AUTHORIZATIONS);
scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
scanner.setRange(new Range(new Text(pfx)));
final Iterator<Map.Entry<Key, Value>> iterator = scanner
.iterator();
if (iterator.hasNext()) {
return new String(iterator.next().getValue().get(), StandardCharsets.UTF_8);
}
} catch (final Exception e) {
throw new RyaDAOException(e);
}
return null;
}
@Override
public void removeNamespace(final String pfx) throws RyaDAOException {
try {
final Mutation del = new Mutation(new Text(pfx));
del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
bw_ns.addMutation(del);
if (flushEachUpdate.get()) {
mt_bw.flush();
}
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException {
try {
final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
ALL_AUTHORIZATIONS);
scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
final Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
return new AccumuloNamespaceTableIterator(result);
} catch (final Exception e) {
throw new RyaDAOException(e);
}
}
@Override
public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() {
return this;
}
@Override
public void purge(final RdfCloudTripleStoreConfiguration configuration) {
for (final String tableName : getTables()) {
try {
purge(tableName, configuration.getAuths());
compact(tableName);
} catch (final TableNotFoundException e) {
logger.error(e.getMessage());
} catch (final MutationsRejectedException e) {
logger.error(e.getMessage());
}
}
for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.purge(configuration);
} catch(final Exception e) {
logger.error("Failed to purge indexer", e);
}
}
}
@Override
public void dropAndDestroy() throws RyaDAOException {
for (final String tableName : getTables()) {
try {
if (tableName != null) {
drop(tableName);
}
} catch (final AccumuloSecurityException e) {
logger.error(e.getMessage());
throw new RyaDAOException(e);
} catch (final AccumuloException e) {
logger.error(e.getMessage());
throw new RyaDAOException(e);
} catch (final TableNotFoundException e) {
logger.warn(e.getMessage());
}
}
destroy();
for(final AccumuloIndexer indexer : this.secondaryIndexers) {
try {
indexer.dropAndDestroy();
} catch(final Exception e) {
logger.error("Failed to drop and destroy indexer", e);
}
}
}
public synchronized Connector getConnector() {
return connector;
}
public synchronized void setConnector(final Connector connector) {
this.connector = connector;
}
public BatchWriterConfig getBatchWriterConfig(){
return batchWriterConfig;
}
public void setBatchWriterConfig(final BatchWriterConfig batchWriterConfig) {
this.batchWriterConfig = batchWriterConfig;
}
protected MultiTableBatchWriter getMultiTableBatchWriter(){
return mt_bw;
}
@Override
public synchronized AccumuloRdfConfiguration getConf() {
return conf;
}
@Override
public synchronized void setConf(final AccumuloRdfConfiguration conf) {
this.conf = conf;
}
public RyaTableMutationsFactory getRyaTableMutationsFactory() {
return ryaTableMutationsFactory;
}
public void setRyaTableMutationsFactory(final RyaTableMutationsFactory ryaTableMutationsFactory) {
this.ryaTableMutationsFactory = ryaTableMutationsFactory;
}
@Override
public AccumuloRyaQueryEngine getQueryEngine() {
return queryEngine;
}
public void setQueryEngine(final AccumuloRyaQueryEngine queryEngine) {
this.queryEngine = queryEngine;
}
@Override
public void flush() throws RyaDAOException {
try {
mt_bw.flush();
flushIndexers();
} catch (final MutationsRejectedException e) {
throw new RyaDAOException(e);
}
}
private void flushIndexers() throws RyaDAOException {
for (final AccumuloIndexer indexer : secondaryIndexers) {
try {
indexer.flush();
} catch (final IOException e) {
logger.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
}
}
}
protected String[] getTables() {
// core tables
final List<String> tableNames = Lists.newArrayList(
tableLayoutStrategy.getSpo(),
tableLayoutStrategy.getPo(),
tableLayoutStrategy.getOsp(),
tableLayoutStrategy.getNs(),
tableLayoutStrategy.getEval());
// Additional Tables
for (final AccumuloIndexer index : secondaryIndexers) {
tableNames.add(index.getTableName());
}
return tableNames.toArray(new String[]{});
}
private void purge(final String tableName, final String[] auths) throws TableNotFoundException, MutationsRejectedException {
if (tableExists(tableName)) {
logger.info("Purging accumulo table: " + tableName);
final BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
try {
batchDeleter.setRanges(Collections.singleton(new Range()));
batchDeleter.delete();
} finally {
batchDeleter.close();
}
}
}
private void compact(final String tableName) {
logger.info("Requesting major compaction for table " + tableName);
try {
connector.tableOperations().compact(tableName, null, null, true, false);
} catch (final Exception e) {
logger.error(e.getMessage());
}
}
private boolean tableExists(final String tableName) {
return getConnector().tableOperations().exists(tableName);
}
private BatchDeleter createBatchDeleter(final String tableName, final Authorizations authorizations) throws TableNotFoundException {
return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
}
private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
final String version = getVersion();
if (version == null) {
//adding to core Rya tables but not Indexes
final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
bw_spo.addMutations(spo);
bw_po.addMutations(po);
bw_osp.addMutations(osp);
}
//TODO: Do a version check here
}
protected RyaStatement getVersionRyaStatement() {
return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA);
}
private void drop(final String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
logger.info("Dropping cloudbase table: " + tableName);
connector.tableOperations().delete(tableName);
}
}