blob: ed8e3821131d71743bcd02bcb47ebe8e3a80054c [file] [log] [blame]
package org.apache.blur.mapreduce.lib;
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
* This will parse a standard csv file into a {@link BlurMutate} object. Use the
* static addColumns, and setSeparator methods to configure the class.
public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
public static final String UTF_8 = "UTF-8";
public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "";
public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "";
public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "";
public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "";
public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "";
public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
public static final String HIVE_NULL = "\\N";
protected Map<String, List<String>> _columnNameMap;
protected String _separator = Base64.encodeBase64String(",".getBytes());
protected Splitter _splitter;
protected boolean _familyNotInFile;
protected String _familyFromPath;
protected boolean _autoGenerateRecordIdAsHashOfData;
protected MessageDigest _digest;
protected boolean _autoGenerateRowIdAsHashOfData;
* Add a mapping for a family to a path. This is to be used when an entire
* path is to be processed as a single family and the data itself does not
* contain the family.<br/>
* <br/>
* NOTE: the familyNotInFile property must be set before this method can be
* called.
* @param job
* the job to setup.
* @param family
* the family.
* @param path
* the path.
public static void addFamilyPath(Job job, String family, Path path) {
addFamilyPath(job.getConfiguration(), family, path);
* Add a mapping for a family to a path. This is to be used when an entire
* path is to be processed as a single family and the data itself does not
* contain the family.<br/>
* <br/>
* NOTE: the familyNotInFile property must be set before this method can be
* called.
* @param configuration
* the configuration to setup.
* @param family
* the family.
* @param path
* the path.
public static void addFamilyPath(Configuration configuration, String family, Path path) {
append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
protected static void append(Configuration configuration, String name, String value) {
Collection<String> set = configuration.getStringCollection(name);
if (set == null) {
set = new TreeSet<String>();
configuration.setStrings(name, set.toArray(new String[set.size()]));
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
* @param job
* the job to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData) {
setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
* @param configuration
* the configuration to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
boolean autoGenerateRecordIdAsHashOfData) {
configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
* Gets whether or not to generate a recordid for the record based on the
* data.
* @param configuration
* the configuration.
* @return boolean.
public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
* @param job
* the job to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData) {
setAutoGenerateRowIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
* @param configuration
* the configuration to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean autoGenerateRowIdAsHashOfData) {
configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
* Gets whether or not to generate a recordid for the record based on the
* data.
* @param configuration
* the configuration.
* @return boolean.
public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
* Sets all the family and column definitions.
* @param job
* the job to setup.
* @param strDefinition
* the string definition. <br/>
* <br/>
* Example:<br/>
* "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
* Where "cf1" is a family name that contains columns "col1", "col2"
* and "col3" and a second family of "cf2" with columns "col1",
* "col2", and "col3".
public static void setColumns(Job job, String strDefinition) {
setColumns(job.getConfiguration(), strDefinition);
* Sets all the family and column definitions.
* @param configuration
* the configuration to setup.
* @param strDefinition
* the string definition. <br/>
* <br/>
* Example:<br/>
* "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
* Where "cf1" is a family name that contains columns "col1", "col2"
* and "col3" and a second family of "cf2" with columns "col1",
* "col2", and "col3".
public static void setColumns(Configuration configuration, String strDefinition) {
Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
for (String familyDef : familyDefs) {
int indexOf = familyDef.indexOf(':');
if (indexOf < 0) {
String family = familyDef.substring(0, indexOf);
Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
List<String> colnames = new ArrayList<String>();
for (String columnName : cols) {
if (family.trim().isEmpty() || colnames.isEmpty()) {
addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
protected static void throwMalformedDefinition(String strDefinition) {
throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+ "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
* Adds the column layout for the given family.
* @param job
* the job to apply the layout.
* @param family
* the family name.
* @param columns
* the column names.
public static void addColumns(Job job, String family, String... columns) {
addColumns(job.getConfiguration(), family, columns);
* Adds the column layout for the given family.
* @param configuration
* the configuration to apply the layout.
* @param family
* the family name.
* @param columns
* the column names.
public static void addColumns(Configuration configuration, String family, String... columns) {
Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
public static Collection<String> getFamilyNames(Configuration configuration) {
return configuration.getStringCollection(BLUR_CSV_FAMILIES);
public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration configuration) {
Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
for (String family : getFamilyNames(configuration)) {
String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
columnNameMap.put(family, Arrays.asList(columnsNames));
return columnNameMap;
* Sets the separator of the file, by default it is ",".
* @param job
* the job to apply the separator change.
* @param separator
* the separator.
public static void setSeparator(Job job, String separator) {
setSeparator(job.getConfiguration(), separator);
* Sets the separator of the file, by default it is ",".
* @param configuration
* the configuration to apply the separator change.
* @param separator
* the separator.
public static void setSeparator(Configuration configuration, String separator) {
try {
configuration.set(BLUR_CSV_SEPARATOR_BASE64, Base64.encodeBase64String(separator.getBytes(UTF_8)));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
_autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
_autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
try {
_digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
_columnNameMap = getFamilyAndColumnNameMap(configuration);
_separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64, _separator)), UTF_8);
_splitter = Splitter.on(_separator);
Path fileCurrentlyProcessing = getCurrentFile(context);
Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
OUTER: for (String family : families) {
Collection<String> pathStrCollection = configuration
for (String pathStr : pathStrCollection) {
Path path = new Path(pathStr);
FileSystem fileSystem = path.getFileSystem(configuration);
path = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
if (isParent(path, fileCurrentlyProcessing)) {
_familyFromPath = family;
_familyNotInFile = true;
break OUTER;
protected boolean isParent(Path possibleParent, Path child) {
if (child == null) {
return false;
if (possibleParent.equals(child.getParent())) {
return true;
return isParent(possibleParent, child.getParent());
protected Path getCurrentFile(Context context) throws IOException {
InputSplit split = context.getInputSplit();
if (split != null && split instanceof FileSplit) {
FileSplit inputSplit = (FileSplit) split;
Path path = inputSplit.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
return path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
return null;
protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException {
BlurRecord record = _mutate.getRecord();
String str = value.toString();
Iterable<String> split = _splitter.split(str);
List<String> list = toList(split);
int offset = 0;
boolean gen = false;
if (!_autoGenerateRowIdAsHashOfData) {
} else {
byte[] bs = value.getBytes();
int length = value.getLength();
_digest.update(bs, 0, length);
record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
gen = true;
if (!_autoGenerateRecordIdAsHashOfData) {
} else {
if (gen) {
} else {
byte[] bs = value.getBytes();
int length = value.getLength();
_digest.update(bs, 0, length);
record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
String family;
if (_familyNotInFile) {
family = _familyFromPath;
} else {
family = list.get(offset++);
List<String> columnNames = _columnNameMap.get(family);
if (columnNames == null) {
throw new IOException("Family [" + family + "] is missing in the definition.");
if (list.size() - offset != columnNames.size()) {
String options = "";
if (!_autoGenerateRowIdAsHashOfData) {
options += "rowid,";
if (!_autoGenerateRecordIdAsHashOfData) {
options += "recordid,";
if (!_familyNotInFile) {
options += "family,";
String msg = "Record [" + str + "] does not match defined record [" + options + getColumnNames(columnNames)
+ "].";
throw new IOException(msg);
for (int i = 0; i < columnNames.size(); i++) {
String val = handleHiveNulls(list.get(i + offset));
if (val != null) {
record.addColumn(columnNames.get(i), val);
context.write(_key, _mutate);
protected String handleHiveNulls(String value) {
if (value.equals(HIVE_NULL)) {
return null;
return value;
public void setFamilyFromPath(String familyFromPath) {
this._familyFromPath = familyFromPath;
protected String getColumnNames(List<String> columnNames) {
StringBuilder builder = new StringBuilder();
for (String c : columnNames) {
if (builder.length() != 0) {
return builder.toString();
protected List<String> toList(Iterable<String> split) {
List<String> lst = new ArrayList<String>();
for (String s : split) {
return lst;