blob: f826b7149e70fefdeb69b37e88872feab9ef5b87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.accumulo.mr;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.rio.RDFFormat;
/**
* Contains constants and static methods for interacting with a
* {@link Configuration} and handling options likely to be relevant to Rya
* MapReduce jobs. Defines constant property names associated with Accumulo and
* Rya options, and some convenience methods to get and set these properties
* with respect to a given Configuration.
*/
public class MRUtils {
/**
* Property name for the name of a MapReduce job.
*/
public static final String JOB_NAME_PROP = "mapred.job.name";
/**
* Property name for the Accumulo username.
*/
public static final String AC_USERNAME_PROP = "ac.username";
/**
* Property name for the Accumulo password.
*/
public static final String AC_PWD_PROP = "ac.pwd";
/**
* Property name for the list of zookeepers.
*/
public static final String AC_ZK_PROP = "ac.zk";
/**
* Property name for the Accumulo instance name.
*/
public static final String AC_INSTANCE_PROP = "ac.instance";
/**
* Property name for whether to run against a mock Accumulo instance.
*/
public static final String AC_MOCK_PROP = "ac.mock";
/**
* Property name for TTL; allows using an age-off filter on Accumulo input.
*/
public static final String AC_TTL_PROP = "ac.ttl";
/**
* Property name for scan authorizations when reading data from Accumulo.
*/
public static final String AC_AUTH_PROP = "ac.auth";
/**
* Property name for default visibility when writing data to Accumulo.
*/
public static final String AC_CV_PROP = "ac.cv";
/**
* Property name for whether to read Accumulo data directly from HDFS
* as opposed to through Accumulo itself.
*/
public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput";
/**
* Property name for the table layout to use when reading data from Rya.
*/
public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
/**
* Property name for the Rya table prefix, identifying the Rya
* instance to work with.
*/
public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
/**
* Property name for the RDF serialization format to use, when using RDF
* files.
*/
public static final String FORMAT_PROP = "rdf.format";
/**
* Property name for a file input path, if using file input.
*/
public static final String INPUT_PATH = "input";
/**
* Property name for specifying a default named graph to use when writing
* new statements.
*/
public static final String NAMED_GRAPH_PROP = "rdf.graph";
public static final String AC_TABLE_PROP = "ac.table";
public static final String HADOOP_IO_SORT_MB = "io.sort.mb";
public static final ValueFactory VF = SimpleValueFactory.getInstance();
/**
* Gets the TTL from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The TTL that will be applied as an age-off filter for Accumulo
* input data, or null if not set.
*/
public static String getACTtl(Configuration conf) {
return conf.get(AC_TTL_PROP);
}
/**
* Gets the username from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo username, or null if not set.
*/
public static String getACUserName(Configuration conf) {
return conf.get(AC_USERNAME_PROP);
}
/**
* Gets the password from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo password, or null if not set.
*/
public static String getACPwd(Configuration conf) {
return conf.get(AC_PWD_PROP);
}
/**
* Gets the zookeepers from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured zookeeper list, or null if not set.
*/
public static String getACZK(Configuration conf) {
return conf.get(AC_ZK_PROP);
}
/**
* Gets the instance name from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Accumulo instance name, or null if not set.
*/
public static String getACInstance(Configuration conf) {
return conf.get(AC_INSTANCE_PROP);
}
/**
* Gets whether to use a mock instance from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param defaultValue Default choice if the mock property hasn't been
* explicitly set in the Configuration.
* @return True if a mock instance should be used, false to connect to
* a running Accumulo.
*/
public static boolean getACMock(Configuration conf, boolean defaultValue) {
return conf.getBoolean(AC_MOCK_PROP, defaultValue);
}
/**
* Gets the table prefix from a given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @return The configured Rya table prefix, or null if not set.
*/
public static String getTablePrefix(Configuration conf) {
return conf.get(TABLE_PREFIX_PROPERTY);
}
/**
* Gets the table layout that determines which Rya table to scan for input.
* @param conf Configuration containing MapReduce tool options.
* @param defaultLayout The layout to use if the Configuration doesn't
* specify any layout.
* @return The configured layout to use for reading statements from Rya.
*/
public static TABLE_LAYOUT getTableLayout(Configuration conf, TABLE_LAYOUT defaultLayout) {
return TABLE_LAYOUT.valueOf(conf.get(TABLE_LAYOUT_PROP, defaultLayout.toString()));
}
/**
* Gets the RDF serialization format to use for parsing RDF files.
* @param conf Configuration containing MapReduce tool options.
* @return The configured RDFFormat, or null if not set.
*/
public static RDFFormat getRDFFormat(Configuration conf) {
return RdfFormatUtils.getRdfFormatFromName(conf.get(FORMAT_PROP));
}
/**
* Sets the username in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo username, used for input and/or output.
*/
public static void setACUserName(Configuration conf, String str) {
conf.set(AC_USERNAME_PROP, str);
}
/**
* Sets the password in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo password string, used for input and/or output.
*/
public static void setACPwd(Configuration conf, String str) {
conf.set(AC_PWD_PROP, str);
}
/**
* Sets the zookeepers in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str List of zookeepers to use to connect to Accumulo.
*/
public static void setACZK(Configuration conf, String str) {
conf.set(AC_ZK_PROP, str);
}
/**
* Sets the instance in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str Accumulo instance name, for input and/or output.
*/
public static void setACInstance(Configuration conf, String str) {
conf.set(AC_INSTANCE_PROP, str);
}
/**
* Sets the TTL in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param str TTL for Accumulo data. Rows older than this won't be scanned
* as input.
*/
public static void setACTtl(Configuration conf, String str) {
conf.set(AC_TTL_PROP, str);
}
/**
* Sets whether to connect to a mock Accumulo instance.
* @param conf Configuration containing MapReduce tool options.
* @param mock true to use a mock instance, false to attempt to connect
* to a running Accumulo instance.
*/
public static void setACMock(Configuration conf, boolean mock) {
conf.setBoolean(AC_MOCK_PROP, mock);
}
/**
* Sets the Rya table prefix in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param prefix Prefix of the Rya tables to use for input and/or output.
*/
public static void setTablePrefix(Configuration conf, String prefix) {
conf.set(TABLE_PREFIX_PROPERTY, prefix);
}
/**
* Sets the table layout in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param layout The Rya core table to scan when using Rya for input.
*/
public static void setTableLayout(Configuration conf, TABLE_LAYOUT layout) {
conf.set(TABLE_LAYOUT_PROP, layout.toString());
}
/**
* Sets the RDF serialization format in the given Configuration.
* @param conf Configuration containing MapReduce tool options.
* @param format The expected format of any RDF text data.
*/
public static void setRDFFormat(Configuration conf, RDFFormat format) {
conf.set(FORMAT_PROP, format.getName());
}
/**
* Static class for accessing properties associated with Accumulo input
* formats. Can allow input formats that don't extend
* {@link InputFormatBase} to still use the same Accumulo input
* configuration options.
*/
@SuppressWarnings("rawtypes")
public static class AccumuloProps extends InputFormatBase {
/**
* @throws UnsupportedOperationException always. This class should only be used to access properties.
*/
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
throw new UnsupportedOperationException("Accumulo Props just holds properties");
}
public static Instance getInstance(JobContext conf) {
return InputFormatBase.getInstance(conf);
}
public static AuthenticationToken getPassword(JobContext conf) {
return InputFormatBase.getAuthenticationToken(conf);
}
public static String getUsername(JobContext conf) {
return InputFormatBase.getPrincipal(conf);
}
public static String getTablename(JobContext conf) {
return InputFormatBase.getInputTableName(conf);
}
}
}