blob: cd29e1e3296d394a656ebe724178b7bb2d26935e [file] [log] [blame]
package org.apache.rya.accumulo.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.rya.accumulo.AccumuloRdfConstants;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.eclipse.rdf4j.rio.RDFFormat;
import com.google.common.base.Preconditions;
/**
* Base class for MapReduce tools that interact with Accumulo-backed Rya. Holds
* a {@link Configuration} to keep track of connection parameters.
* <p>
* Can be configured to read input from Rya, either as
* {@link RyaStatementWritable}s or as Accumulo rows, or to read statements from
* RDF files.
* <p>
* Can be configured to send output either by inserting RyaStatementWritables to
* a Rya instance, or by writing arbitrary
* {@link org.apache.accumulo.core.data.Mutation}s directly to Accumulo tables.
*/
public abstract class AbstractAccumuloMRTool implements Tool {
static int DEFAULT_IO_SORT_MB = 256;
protected Configuration conf;
// Connection parameters
protected String zk;
protected String instance;
protected String userName;
protected String pwd;
protected Authorizations authorizations;
protected boolean mock = false;
protected boolean hdfsInput = false;
protected String ttl;
protected String tablePrefix;
protected TABLE_LAYOUT rdfTableLayout;
/**
* Gets the Configuration containing any relevant options.
* @return This Tool's Configuration object.
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Set this Tool's Configuration.
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
}
/**
* Initializes configuration parameters, checking that required parameters
* are found and ensuring that options corresponding to multiple property
* names are set consistently. Requires at least that the username,
* password, and instance name are all configured. Zookeeper hosts must be
* configured if not using a mock instance. Table prefix, if not provided,
* will be set to {@link RdfCloudTripleStoreConstants#TBL_PRFX_DEF}. Should
* be called before configuring input/output. See {@link MRUtils} for
* configuration properties.
*/
protected void init() {
// Load configuration parameters
zk = MRUtils.getACZK(conf);
instance = MRUtils.getACInstance(conf);
userName = MRUtils.getACUserName(conf);
pwd = MRUtils.getACPwd(conf);
mock = MRUtils.getACMock(conf, false);
ttl = MRUtils.getACTtl(conf);
tablePrefix = MRUtils.getTablePrefix(conf);
rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.OSP);
hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, false);
// Set authorizations if specified
String authString = conf.get(MRUtils.AC_AUTH_PROP);
if (authString != null && !authString.isEmpty()) {
authorizations = new Authorizations(authString.split(","));
conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency
}
else {
authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
}
// Set table prefix to the default if not set
if (tablePrefix == null) {
tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
MRUtils.setTablePrefix(conf, tablePrefix);
}
// Check for required configuration parameters
Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set.");
Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set.");
Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set.");
Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
// If connecting to real accumulo, set additional parameters and require zookeepers
if (!mock) {
Preconditions.checkNotNull(zk, "Zookeeper hosts not set (" + MRUtils.AC_ZK_PROP + ")");
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
if (conf.get(MRUtils.HADOOP_IO_SORT_MB) == null) {
conf.setInt(MRUtils.HADOOP_IO_SORT_MB, DEFAULT_IO_SORT_MB);
}
conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
}
// Ensure consistency between alternative configuration properties
conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
conf.set(ConfigUtils.CLOUDBASE_USER, userName);
conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix);
}
/**
* Sets up Accumulo input for a job: the job receives
* ({@link org.apache.accumulo.core.data.Key},
* {@link org.apache.accumulo.core.data.Value}) pairs from the table
* specified by the configuration (using
* {@link MRUtils#TABLE_PREFIX_PROPERTY} and
* {@link MRUtils#TABLE_LAYOUT_PROP}).
* @param job MapReduce Job to configure
* @throws AccumuloSecurityException if connecting to Accumulo with the
* given username and password fails.
*/
protected void setupAccumuloInput(Job job) throws AccumuloSecurityException {
// set up accumulo input
if (!hdfsInput) {
job.setInputFormatClass(AccumuloInputFormat.class);
} else {
job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
}
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix);
AccumuloInputFormat.setInputTableName(job, tableName);
AccumuloInputFormat.setScanAuthorizations(job, authorizations);
if (mock) {
AccumuloInputFormat.setMockInstance(job, instance);
} else {
ClientConfiguration clientConfig = ClientConfiguration.loadDefault()
.withInstance(instance).withZkHosts(zk);
AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
}
if (ttl != null) {
IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName());
AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
AccumuloInputFormat.addIterator(job, setting);
}
}
/**
* Sets up Rya input for a job: the job receives
* ({@link org.apache.hadoop.io.LongWritable}, {@link RyaStatementWritable})
* pairs from a Rya instance. Uses the same configuration properties to
* connect as direct Accumulo input, but returns statement data instead of
* row data.
* @param job Job to configure
* @throws AccumuloSecurityException if connecting to Accumulo with the
* given username and password fails.
*/
protected void setupRyaInput(Job job) throws AccumuloSecurityException {
setupAccumuloInput(job);
job.setInputFormatClass(RyaInputFormat.class);
}
/**
* Sets up RDF file input for a job: the job receives
* ({@link org.apache.hadoop.io.LongWritable}, {@link RyaStatementWritable})
* pairs from RDF file(s) found at the specified path.
* @param job Job to configure
* @param commaSeparatedPaths a comma separated list of files or directories
* @param defaultFormat Default RDF serialization format, can be
* overridden by {@link MRUtils#FORMAT_PROP}
* @throws IOException if there's an error interacting with the
* {@link org.apache.hadoop.fs.FileSystem}.
*/
protected void setupFileInputs(Job job, String commaSeparatedPaths, RDFFormat defaultFormat) throws IOException {
RDFFormat format = MRUtils.getRDFFormat(conf);
if (format == null) {
format = defaultFormat;
}
RdfFileInputFormat.addInputPaths(job, commaSeparatedPaths);
RdfFileInputFormat.setRDFFormat(job, format);
job.setInputFormatClass(RdfFileInputFormat.class);
}
/**
* Sets up Accumulo output for a job: allows the job to write (String,
* Mutation) pairs, where the Mutation will be written to the table named by
* the String.
* @param job Job to configure
* @param outputTable Default table to send output to
* @throws AccumuloSecurityException if connecting to Accumulo with the
* given username and password fails
*/
protected void setupAccumuloOutput(Job job, String outputTable) throws AccumuloSecurityException {
AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, outputTable);
if (mock) {
AccumuloOutputFormat.setMockInstance(job, instance);
} else {
ClientConfiguration clientConfig = ClientConfiguration.loadDefault()
.withInstance(instance).withZkHosts(zk);
AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
}
job.setOutputFormatClass(AccumuloOutputFormat.class);
}
/**
* Sets up Rya output for a job: allows the job to write
* {@link RyaStatementWritable} data, which will in turn be input into the
* configured Rya instance. To perform secondary indexing, use the
* configuration variables in {@link ConfigUtils}.
* @param job Job to configure
* @throws AccumuloSecurityException if connecting to Accumulo with the
* given username and password fails
*/
protected void setupRyaOutput(Job job) throws AccumuloSecurityException {
job.setOutputFormatClass(RyaOutputFormat.class);
job.setOutputValueClass(RyaStatementWritable.class);
// Specify default visibility of output rows, if given
RyaOutputFormat.setDefaultVisibility(job, conf.get(MRUtils.AC_CV_PROP));
// Specify named graph, if given
RyaOutputFormat.setDefaultContext(job, conf.get(MRUtils.NAMED_GRAPH_PROP));
// Set the output prefix
RyaOutputFormat.setTablePrefix(job, tablePrefix);
// Determine which indexers to use based on the config
RyaOutputFormat.setFreeTextEnabled(job, ConfigUtils.getUseFreeText(conf));
RyaOutputFormat.setTemporalEnabled(job, ConfigUtils.getUseTemporal(conf));
RyaOutputFormat.setEntityEnabled(job, ConfigUtils.getUseEntity(conf));
// Configure the Accumulo connection
AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
if (mock) {
RyaOutputFormat.setMockInstance(job, instance);
} else {
ClientConfiguration clientConfig = ClientConfiguration.loadDefault()
.withInstance(instance).withZkHosts(zk);
AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
}
}
/**
* Connects to Accumulo, using the stored connection parameters.
* @return A Connector to an Accumulo instance, which could be a mock
* instance.
* @throws AccumuloException if connecting to Accumulo fails.
* @throws AccumuloSecurityException if authenticating with Accumulo fails.
*/
protected Connector getConnector() throws AccumuloSecurityException, AccumuloException {
Instance zooKeeperInstance;
if (mock) {
zooKeeperInstance = new MockInstance(instance);
}
else {
zooKeeperInstance = new ZooKeeperInstance(instance, zk);
}
return zooKeeperInstance.getConnector(userName, new PasswordToken(pwd));
}
}