blob: 090d22f06a2835ca3aa293843f9db1e56336aa11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.export.accumulo.util;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.accumulo.mr.MRUtils;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolverException;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.eclipse.rdf4j.model.ValueFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
/**
* Utility methods for an Accumulo Rya instance.
*/
public final class AccumuloRyaUtils {
private static final Logger log = Logger.getLogger(AccumuloRyaUtils.class);
private static final String NAMESPACE = RdfCloudTripleStoreConstants.NAMESPACE;
private static final ValueFactory VALUE_FACTORY = RdfCloudTripleStoreConstants.VALUE_FACTORY;
/**
* Ignore the meta statements indicating the Rya version and copy time values.
*/
public static final ImmutableSet<IteratorSetting> COMMON_REG_EX_FILTER_SETTINGS = ImmutableSet.of(
getVersionRegExFilterSetting()
);
/**
* Private constructor to prevent instantiation.
*/
private AccumuloRyaUtils() {
}
/**
* Creates a {@link RyaURI} for the specified local name.
* @param localName the URI's local name.
* @return the {@link RyaURI}.
*/
public static RyaURI createRyaUri(final String localName) {
return createRyaUri(NAMESPACE, localName);
}
/**
* Creates a {@link RyaURI} for the specified local name.
* @param namespace the namespace.
* @param localName the URI's local name.
* @return the {@link RyaURI}.
*/
public static RyaURI createRyaUri(final String namespace, final String localName) {
return RdfToRyaConversions.convertURI(VALUE_FACTORY.createIRI(namespace, localName));
}
/**
* Converts a {@link RyaURI} to the contained data string.
* @param the {@link RyaURI} to convert.
* @return the data value without the namespace.
*/
public static String convertRyaUriToString(final RyaURI ryaUri) {
return convertRyaUriToString(NAMESPACE, ryaUri);
}
/**
* Converts a {@link RyaURI} to the contained data string.
* @param namespace the namespace.
* @param the {@link RyaURI} to convert.
* @return the data value without the namespace.
*/
public static String convertRyaUriToString(final String namespace, final RyaURI ryaUri) {
return StringUtils.replaceOnce(ryaUri.getData(), namespace, "");
}
/**
* Creates a {@link RyaStatement} from a {@link Key}/{@link Value} pair.
* @param key the {@link Key}.
* @param value the {@link Value}.
* @param ryaTripleContext the {@link RyaTripleContext}.
* @return the converted {@link RyaStatement}.
* @throws TripleRowResolverException
*/
public static RyaStatement createRyaStatement(final Key key, final Value value, final RyaTripleContext ryaTripleContext) throws TripleRowResolverException {
final byte[] row = key.getRowData() != null && key.getRowData().toArray().length > 0 ? key.getRowData().toArray() : null;
final byte[] columnFamily = key.getColumnFamilyData() != null && key.getColumnFamilyData().toArray().length > 0 ? key.getColumnFamilyData().toArray() : null;
final byte[] columnQualifier = key.getColumnQualifierData() != null && key.getColumnQualifierData().toArray().length > 0 ? key.getColumnQualifierData().toArray() : null;
final Long timestamp = key.getTimestamp();
final byte[] columnVisibility = key.getColumnVisibilityData() != null && key.getColumnVisibilityData().toArray().length > 0 ? key.getColumnVisibilityData().toArray() : null;
final byte[] valueBytes = value != null && value.get().length > 0 ? value.get() : null;
final TripleRow tripleRow = new TripleRow(row, columnFamily, columnQualifier, timestamp, columnVisibility, valueBytes);
final RyaStatement ryaStatement = ryaTripleContext.deserializeTriple(TABLE_LAYOUT.SPO, tripleRow);
return ryaStatement;
}
/**
* Creates a {@link RegExFilter} setting to ignore the version row in a table.
* @return the {@link RegExFilter} {@link IteratorSetting}.
*/
public static IteratorSetting getVersionRegExFilterSetting() {
final IteratorSetting regex = new IteratorSetting(30, "version_regex", RegExFilter.class);
RegExFilter.setRegexs(regex, "(.*)urn:(.*)#version[\u0000|\u0001](.*)", null, null, null, false);
Filter.setNegate(regex, true);
return regex;
}
/**
* Adds all the common regex filter {@link IteratorSetting}s to the provided {@link Scanner} so
* certain metadata keys in a table are ignored.
* @param scanner the {@link Scanner} to add the regex filter {@link IteratorSetting}s to.
*/
public static void addCommonScannerIteratorsTo(final Scanner scanner) {
for (final IteratorSetting iteratorSetting : COMMON_REG_EX_FILTER_SETTINGS) {
scanner.addScanIterator(iteratorSetting);
}
}
/**
* Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}.
* This applies common iterator settings to the table scanner that ignore internal metadata keys.
* @param tablename the name of the table to scan.
* @param config the {@link Configuration}.
* @return the {@link Scanner} for the table.
* @throws IOException
*/
public static Scanner getScanner(final String tableName, final Configuration config) throws IOException {
return getScanner(tableName, config, true);
}
/**
* Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}.
* @param tablename the name of the table to scan.
* @param config the {@link Configuration}.
* @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner.
* {@code false} otherwise.
* @return the {@link Scanner} for the table.
* @throws IOException
*/
public static Scanner getScanner(final String tableName, final Configuration config, final boolean shouldAddCommonIterators) throws IOException {
try {
final String instanceName = config.get(ConfigUtils.CLOUDBASE_INSTANCE);
final String zooKeepers = config.get(ConfigUtils.CLOUDBASE_ZOOKEEPERS);
Instance instance;
if (ConfigUtils.useMockInstance(config)) {
instance = new MockInstance(instanceName);
} else {
instance = new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers));
}
final String username = ConfigUtils.getUsername(config);
final String password = ConfigUtils.getPassword(config);
final Connector connector = instance.getConnector(username, new PasswordToken(password));
final Authorizations auths = ConfigUtils.getAuthorizations(config);
final Scanner scanner = connector.createScanner(tableName, auths);
if (shouldAddCommonIterators) {
AccumuloRyaUtils.addCommonScannerIteratorsTo(scanner);
}
return scanner;
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
log.error("Error connecting to " + tableName);
throw new IOException(e);
}
}
/**
* Prints the table with the specified config and additional settings.
* This applies common iterator settings to the table scanner that ignore internal metadata keys.
* @param tableName the name of the table to print.
* @param config the {@link AccumuloRdfConfiguration}.
* @param settings the additional {@link IteratorSetting}s to add besides the common ones.
* @throws IOException
*/
public static void printTable(final String tableName, final AccumuloRdfConfiguration config, final IteratorSetting... settings) throws IOException {
printTable(tableName, config, true, settings);
}
/**
* Prints the table with the specified config and additional settings.
* @param tableName the name of the table to print.
* @param config the {@link AccumuloRdfConfiguration}.
* @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner.
* {@code false} otherwise.
* @param settings the additional {@link IteratorSetting}s to add besides the common ones.
* @throws IOException
*/
public static void printTable(final String tableName, final AccumuloRdfConfiguration config, final boolean shouldAddCommonIterators, final IteratorSetting... settings) throws IOException {
final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators);
for (final IteratorSetting setting : settings) {
scanner.addScanIterator(setting);
}
final Iterator<Entry<Key, Value>> iterator = scanner.iterator();
final String instance = config.get(MRUtils.AC_INSTANCE_PROP);
log.info("==================");
log.info("TABLE: " + tableName + " INSTANCE: " + instance);
log.info("------------------");
while (iterator.hasNext()) {
final Entry<Key, Value> entry = iterator.next();
final Key key = entry.getKey();
final Value value = entry.getValue();
final String keyString = getFormattedKeyString(key);
log.info(keyString + " - " + value);
}
log.info("==================");
}
private static String getFormattedKeyString(final Key key) {
final StringBuilder sb = new StringBuilder();
final byte[] row = key.getRow().getBytes();
final byte[] colFamily = key.getColumnFamily().getBytes();
final byte[] colQualifier = key.getColumnQualifier().getBytes();
final byte[] colVisibility = key.getColumnVisibility().getBytes();
final int maxRowDataToPrint = 256;
Key.appendPrintableString(row, 0, row.length, maxRowDataToPrint, sb);
sb.append(" ");
Key.appendPrintableString(colFamily, 0, colFamily.length, maxRowDataToPrint, sb);
sb.append(":");
Key.appendPrintableString(colQualifier, 0, colQualifier.length, maxRowDataToPrint, sb);
sb.append(" [");
Key.appendPrintableString(colVisibility, 0, colVisibility.length, maxRowDataToPrint, sb);
sb.append("]");
sb.append(" ");
sb.append(new Date(key.getTimestamp()));
//sb.append(Long.toString(key.getTimestamp()));
//sb.append(" ");
//sb.append(key.isDeleted());
return sb.toString();
}
/**
* Prints the table with pretty formatting using the specified config and additional settings.
* This applies common iterator settings to the table scanner that ignore internal metadata keys.
* @param tableName the name of the table to print.
* @param config the {@link AccumuloRdfConfiguration}.
* @param settings the additional {@link IteratorSetting}s to add besides the common ones.
* @throws IOException
*/
public static void printTablePretty(final String tableName, final Configuration config, final IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException {
printTablePretty(tableName, config, true, settings);
}
/**
* Prints the table with pretty formatting using the specified config and additional settings.
* @param tableName the name of the table to print.
* @param config the {@link AccumuloRdfConfiguration}.
* @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner.
* {@code false} otherwise.
* @param settings the additional {@link IteratorSetting}s to add besides the common ones.
* @throws IOException
*/
public static void printTablePretty(final String tableName, final Configuration config, final boolean shouldAddCommonIterators, final IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException {
final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators);
for (final IteratorSetting setting : settings) {
scanner.addScanIterator(setting);
}
final String format = "| %-64s | %-24s | %-28s | %-20s | %-20s | %-10s |";
final int totalFormatLength = String.format(format, 1, 2, 3, 4, 5, 6).length();
final String instance = config.get(MRUtils.AC_INSTANCE_PROP);
log.info(StringUtils.rightPad("==================", totalFormatLength, "="));
log.info(StringUtils.rightPad("| TABLE: " + tableName + " INSTANCE: " + instance, totalFormatLength - 1) + "|");
log.info(StringUtils.rightPad("------------------", totalFormatLength, "-"));
log.info(String.format(format, "--Row--", "--ColumnVisibility--", "--Timestamp--", "--ColumnFamily--", "--ColumnQualifier--", "--Value--"));
log.info(StringUtils.rightPad("|-----------------", totalFormatLength - 1, "-") + "|");
for (final Entry<Key, Value> entry : scanner) {
final Key k = entry.getKey();
final String rowString = Key.appendPrintableString(k.getRow().getBytes(), 0, k.getRow().getLength(), Constants.MAX_DATA_TO_PRINT, new StringBuilder()).toString();
log.info(String.format(format, rowString, k.getColumnVisibility(), new Date(k.getTimestamp()), k.getColumnFamily(), k.getColumnQualifier(), entry.getValue()));
}
log.info(StringUtils.rightPad("==================", totalFormatLength, "="));
}
/**
* Adds authorizations to a user's authorizations list.
* @param user the name of the user to add authorizations for.
* @param secOps the {@link SecurityOperations}.
* @param auths the {@link Authorizations} to add
* @return the {@link Authorizations}.
* @throws AccumuloException
* @throws AccumuloSecurityException
*/
public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException {
final List<String> authList = new ArrayList<>();
for (final byte[] authBytes : auths.getAuthorizations()) {
final String auth = new String(authBytes, StandardCharsets.UTF_8);
authList.add(auth);
}
return addUserAuths(user, secOps, authList.toArray(new String[0]));
}
/**
* Adds authorizations to a user's authorizations list.
* @param user the name of the user to add authorizations for.
* @param secOps the {@link SecurityOperations}.
* @param auths the list of authorizations to add
* @return the {@link Authorizations}.
* @throws AccumuloException
* @throws AccumuloSecurityException
*/
public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final String... auths) throws AccumuloException, AccumuloSecurityException {
final Authorizations currentUserAuths = secOps.getUserAuthorizations(user);
final List<byte[]> authList = new ArrayList<>();
for (final byte[] currentAuth : currentUserAuths.getAuthorizations()) {
authList.add(currentAuth);
}
for (final String newAuth : auths) {
authList.add(newAuth.getBytes(StandardCharsets.UTF_8));
}
final Authorizations result = new Authorizations(authList);
return result;
}
/**
* Removes the specified authorizations from the user.
* @param userName the name of the user to change authorizations for.
* @param secOps the {@link SecurityOperations} to change.
* @param authsToRemove the comma-separated string of authorizations to remove.
* @throws AccumuloSecurityException
* @throws AccumuloException
*/
public static void removeUserAuths(final String userName, final SecurityOperations secOps, final String authsToRemove) throws AccumuloException, AccumuloSecurityException {
final Authorizations currentUserAuths = secOps.getUserAuthorizations(userName);
final List<String> authList = convertAuthStringToList(currentUserAuths.toString());
final List<String> authsToRemoveList = convertAuthStringToList(authsToRemove);
authList.removeAll(authsToRemoveList);
final String authString = Joiner.on(",").join(authList);
final Authorizations newAuths = new Authorizations(authString);
secOps.changeUserAuthorizations(userName, newAuths);
}
/**
* Convert the comma-separated string of authorizations into a list of authorizations.
* @param authString the comma-separated string of authorizations.
* @return a {@link List} of authorization strings.
*/
public static List<String> convertAuthStringToList(final String authString) {
final List<String> authList = new ArrayList<>();
if (authString != null) {
final String[] authSplit = authString.split(",");
authList.addAll(Arrays.asList(authSplit));
}
return authList;
}
/**
* Sets up a {@link Connector} with the specified config.
* @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}.
* @return the {@link Connector}.
*/
public static Connector setupConnector(final AccumuloRdfConfiguration accumuloRdfConfiguration) {
Connector connector = null;
try {
connector = ConfigUtils.getConnector(accumuloRdfConfiguration);
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Error creating connector", e);
}
return connector;
}
/**
* Sets up a {@link AccumuloRyaDAO} with the specified connector.
* @param connector the {@link Connector}.
* @return the {@link AccumuloRyaDAO}.
*/
public static AccumuloRyaDAO setupDao(final AccumuloRdfConfiguration accumuloRdfConfiguration) {
final Connector connector = setupConnector(accumuloRdfConfiguration);
return setupDao(connector, accumuloRdfConfiguration);
}
/**
* Sets up a {@link AccumuloRyaDAO} with the specified connector and config.
* @param connector the {@link Connector}.
* @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}.
* @return the {@link AccumuloRyaDAO}.
*/
public static AccumuloRyaDAO setupDao(final Connector connector, final AccumuloRdfConfiguration accumuloRdfConfiguration) {
final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO();
accumuloRyaDao.setConnector(connector);
accumuloRyaDao.setConf(accumuloRdfConfiguration);
try {
accumuloRyaDao.init();
} catch (final RyaDAOException e) {
log.error("Error initializing DAO", e);
}
return accumuloRyaDao;
}
}