blob: f826b7149e70fefdeb69b37e88872feab9ef5b87 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
* Contains constants and static methods for interacting with a
* {@link Configuration} and handling options likely to be relevant to Rya
* MapReduce jobs. Defines constant property names associated with Accumulo and
* Rya options, and some convenience methods to get and set these properties
* with respect to a given Configuration.
public class MRUtils {
* Property name for the name of a MapReduce job.
public static final String JOB_NAME_PROP = "";
* Property name for the Accumulo username.
public static final String AC_USERNAME_PROP = "ac.username";
* Property name for the Accumulo password.
public static final String AC_PWD_PROP = "ac.pwd";
* Property name for the list of zookeepers.
public static final String AC_ZK_PROP = "ac.zk";
* Property name for the Accumulo instance name.
public static final String AC_INSTANCE_PROP = "ac.instance";
* Property name for whether to run against a mock Accumulo instance.
public static final String AC_MOCK_PROP = "ac.mock";
* Property name for TTL; allows using an age-off filter on Accumulo input.
public static final String AC_TTL_PROP = "ac.ttl";
* Property name for scan authorizations when reading data from Accumulo.
public static final String AC_AUTH_PROP = "ac.auth";
* Property name for default visibility when writing data to Accumulo.
public static final String AC_CV_PROP = "";
* Property name for whether to read Accumulo data directly from HDFS
* as opposed to through Accumulo itself.
public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput";
* Property name for the table layout to use when reading data from Rya.
public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
* Property name for the Rya table prefix, identifying the Rya
* instance to work with.
public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
* Property name for the RDF serialization format to use, when using RDF
* files.
public static final String FORMAT_PROP = "rdf.format";
* Property name for a file input path, if using file input.
public static final String INPUT_PATH = "input";
* Property name for specifying a default named graph to use when writing
* new statements.
public static final String NAMED_GRAPH_PROP = "rdf.graph";
public static final String AC_TABLE_PROP = "ac.table";
public static final String HADOOP_IO_SORT_MB = "io.sort.mb";
public static final ValueFactory VF = SimpleValueFactory.getInstance();
* Gets the TTL from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The TTL that will be applied as an age-off filter for Accumulo
* input data, or null if not set.
public static String getACTtl(Configuration conf) {
return conf.get(AC_TTL_PROP);
* Gets the username from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo username, or null if not set.
public static String getACUserName(Configuration conf) {
return conf.get(AC_USERNAME_PROP);
* Gets the password from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo password, or null if not set.
public static String getACPwd(Configuration conf) {
return conf.get(AC_PWD_PROP);
* Gets the zookeepers from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured zookeeper list, or null if not set.
public static String getACZK(Configuration conf) {
return conf.get(AC_ZK_PROP);
* Gets the instance name from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo instance name, or null if not set.
public static String getACInstance(Configuration conf) {
return conf.get(AC_INSTANCE_PROP);
* Gets whether to use a mock instance from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param defaultValue Default choice if the mock property hasn't been
* explicitly set in the Configuration.
* @return True if a mock instance should be used, false to connect to
* a running Accumulo.
public static boolean getACMock(Configuration conf, boolean defaultValue) {
return conf.getBoolean(AC_MOCK_PROP, defaultValue);
* Gets the table prefix from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Rya table prefix, or null if not set.
public static String getTablePrefix(Configuration conf) {
return conf.get(TABLE_PREFIX_PROPERTY);
* Gets the table layout that determines which Rya table to scan for input.
* @param conf Configuration containing MapReduce tool options.
* @param defaultLayout The layout to use if the Configuration doesn't
* specify any layout.
* @return The configured layout to use for reading statements from Rya.
public static TABLE_LAYOUT getTableLayout(Configuration conf, TABLE_LAYOUT defaultLayout) {
return TABLE_LAYOUT.valueOf(conf.get(TABLE_LAYOUT_PROP, defaultLayout.toString()));
* Gets the RDF serialization format to use for parsing RDF files.
* @param conf Configuration containing MapReduce tool options.
* @return The configured RDFFormat, or null if not set.
public static RDFFormat getRDFFormat(Configuration conf) {
return RdfFormatUtils.getRdfFormatFromName(conf.get(FORMAT_PROP));
* Sets the username in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo username, used for input and/or output.
public static void setACUserName(Configuration conf, String str) {
conf.set(AC_USERNAME_PROP, str);
* Sets the password in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo password string, used for input and/or output.
public static void setACPwd(Configuration conf, String str) {
conf.set(AC_PWD_PROP, str);
* Sets the zookeepers in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str List of zookeepers to use to connect to Accumulo.
public static void setACZK(Configuration conf, String str) {
conf.set(AC_ZK_PROP, str);
* Sets the instance in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo instance name, for input and/or output.
public static void setACInstance(Configuration conf, String str) {
conf.set(AC_INSTANCE_PROP, str);
* Sets the TTL in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str TTL for Accumulo data. Rows older than this won't be scanned
* as input.
public static void setACTtl(Configuration conf, String str) {
conf.set(AC_TTL_PROP, str);
* Sets whether to connect to a mock Accumulo instance.
* @param conf Configuration containing MapReduce tool options.
* @param mock true to use a mock instance, false to attempt to connect
* to a running Accumulo instance.
public static void setACMock(Configuration conf, boolean mock) {
conf.setBoolean(AC_MOCK_PROP, mock);
* Sets the Rya table prefix in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param prefix Prefix of the Rya tables to use for input and/or output.
public static void setTablePrefix(Configuration conf, String prefix) {
conf.set(TABLE_PREFIX_PROPERTY, prefix);
* Sets the table layout in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param layout The Rya core table to scan when using Rya for input.
public static void setTableLayout(Configuration conf, TABLE_LAYOUT layout) {
conf.set(TABLE_LAYOUT_PROP, layout.toString());
* Sets the RDF serialization format in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param format The expected format of any RDF text data.
public static void setRDFFormat(Configuration conf, RDFFormat format) {
conf.set(FORMAT_PROP, format.getName());
* Static class for accessing properties associated with Accumulo input
* formats. Can allow input formats that don't extend
* {@link InputFormatBase} to still use the same Accumulo input
* configuration options.
public static class AccumuloProps extends InputFormatBase {
* @throws UnsupportedOperationException always. This class should only be used to access properties.
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
throw new UnsupportedOperationException("Accumulo Props just holds properties");
public static Instance getInstance(JobContext conf) {
return InputFormatBase.getInstance(conf);
public static AuthenticationToken getPassword(JobContext conf) {
return InputFormatBase.getAuthenticationToken(conf);
public static String getUsername(JobContext conf) {
return InputFormatBase.getPrincipal(conf);
public static String getTablename(JobContext conf) {
return InputFormatBase.getInputTableName(conf);