blob: ed8e3821131d71743bcd02bcb47ebe8e3a80054c [file] [log] [blame]
package org.apache.blur.mapreduce.lib;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.google.common.base.Splitter;
/**
* This will parse a standard csv file into a {@link BlurMutate} object. Use the
* static addColumns, and setSeparator methods to configure the class.
*/
public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
public static final String UTF_8 = "UTF-8";
public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.row.id.as.hash.of.data";
public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
public static final String HIVE_NULL = "\\N";
protected Map<String, List<String>> _columnNameMap;
protected String _separator = Base64.encodeBase64String(",".getBytes());
protected Splitter _splitter;
protected boolean _familyNotInFile;
protected String _familyFromPath;
protected boolean _autoGenerateRecordIdAsHashOfData;
protected MessageDigest _digest;
protected boolean _autoGenerateRowIdAsHashOfData;
/**
* Add a mapping for a family to a path. This is to be used when an entire
* path is to be processed as a single family and the data itself does not
* contain the family.<br/>
* <br/>
*
* NOTE: the familyNotInFile property must be set before this method can be
* called.
*
* @param job
* the job to setup.
* @param family
* the family.
* @param path
* the path.
*/
public static void addFamilyPath(Job job, String family, Path path) {
addFamilyPath(job.getConfiguration(), family, path);
}
/**
* Add a mapping for a family to a path. This is to be used when an entire
* path is to be processed as a single family and the data itself does not
* contain the family.<br/>
* <br/>
*
* NOTE: the familyNotInFile property must be set before this method can be
* called.
*
* @param configuration
* the configuration to setup.
* @param family
* the family.
* @param path
* the path.
*/
public static void addFamilyPath(Configuration configuration, String family, Path path) {
append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
}
protected static void append(Configuration configuration, String name, String value) {
Collection<String> set = configuration.getStringCollection(name);
if (set == null) {
set = new TreeSet<String>();
}
set.add(value);
configuration.setStrings(name, set.toArray(new String[set.size()]));
}
/**
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
*
* @param job
* the job to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
*/
public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData) {
setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
}
/**
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
*
* @param configuration
* the configuration to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
*/
public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
boolean autoGenerateRecordIdAsHashOfData) {
configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
}
/**
* Gets whether or not to generate a recordid for the record based on the
* data.
*
* @param configuration
* the configuration.
* @return boolean.
*/
public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
}
/**
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
*
* @param job
* the job to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
*/
public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData) {
setAutoGenerateRowIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
}
/**
* If set to true the record id will be automatically generated as a hash of
* the data that the record contains.
*
* @param configuration
* the configuration to setup.
* @param autoGenerateRecordIdAsHashOfData
* boolean.
*/
public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean autoGenerateRowIdAsHashOfData) {
configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
}
/**
* Gets whether or not to generate a recordid for the record based on the
* data.
*
* @param configuration
* the configuration.
* @return boolean.
*/
public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
}
/**
* Sets all the family and column definitions.
*
* @param job
* the job to setup.
* @param strDefinition
* the string definition. <br/>
* <br/>
* Example:<br/>
* "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
* Where "cf1" is a family name that contains columns "col1", "col2"
* and "col3" and a second family of "cf2" with columns "col1",
* "col2", and "col3".
*/
public static void setColumns(Job job, String strDefinition) {
setColumns(job.getConfiguration(), strDefinition);
}
/**
* Sets all the family and column definitions.
*
* @param configuration
* the configuration to setup.
* @param strDefinition
* the string definition. <br/>
* <br/>
* Example:<br/>
* "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
* Where "cf1" is a family name that contains columns "col1", "col2"
* and "col3" and a second family of "cf2" with columns "col1",
* "col2", and "col3".
*/
public static void setColumns(Configuration configuration, String strDefinition) {
Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
for (String familyDef : familyDefs) {
int indexOf = familyDef.indexOf(':');
if (indexOf < 0) {
throwMalformedDefinition(strDefinition);
}
String family = familyDef.substring(0, indexOf);
Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
List<String> colnames = new ArrayList<String>();
for (String columnName : cols) {
colnames.add(columnName);
}
if (family.trim().isEmpty() || colnames.isEmpty()) {
throwMalformedDefinition(strDefinition);
}
addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
}
}
protected static void throwMalformedDefinition(String strDefinition) {
throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+ "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
}
/**
* Adds the column layout for the given family.
*
* @param job
* the job to apply the layout.
* @param family
* the family name.
* @param columns
* the column names.
*/
public static void addColumns(Job job, String family, String... columns) {
addColumns(job.getConfiguration(), family, columns);
}
/**
* Adds the column layout for the given family.
*
* @param configuration
* the configuration to apply the layout.
* @param family
* the family name.
* @param columns
* the column names.
*/
public static void addColumns(Configuration configuration, String family, String... columns) {
Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
families.add(family);
configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
}
public static Collection<String> getFamilyNames(Configuration configuration) {
return configuration.getStringCollection(BLUR_CSV_FAMILIES);
}
public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration configuration) {
Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
for (String family : getFamilyNames(configuration)) {
String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
columnNameMap.put(family, Arrays.asList(columnsNames));
}
return columnNameMap;
}
/**
* Sets the separator of the file, by default it is ",".
*
* @param job
* the job to apply the separator change.
* @param separator
* the separator.
*/
public static void setSeparator(Job job, String separator) {
setSeparator(job.getConfiguration(), separator);
}
/**
* Sets the separator of the file, by default it is ",".
*
* @param configuration
* the configuration to apply the separator change.
* @param separator
* the separator.
*/
public static void setSeparator(Configuration configuration, String separator) {
try {
configuration.set(BLUR_CSV_SEPARATOR_BASE64, Base64.encodeBase64String(separator.getBytes(UTF_8)));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration configuration = context.getConfiguration();
_autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
_autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
try {
_digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
}
_columnNameMap = getFamilyAndColumnNameMap(configuration);
_separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64, _separator)), UTF_8);
_splitter = Splitter.on(_separator);
Path fileCurrentlyProcessing = getCurrentFile(context);
Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
OUTER: for (String family : families) {
Collection<String> pathStrCollection = configuration
.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
for (String pathStr : pathStrCollection) {
Path path = new Path(pathStr);
FileSystem fileSystem = path.getFileSystem(configuration);
path = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
if (isParent(path, fileCurrentlyProcessing)) {
_familyFromPath = family;
_familyNotInFile = true;
break OUTER;
}
}
}
}
protected boolean isParent(Path possibleParent, Path child) {
if (child == null) {
return false;
}
if (possibleParent.equals(child.getParent())) {
return true;
}
return isParent(possibleParent, child.getParent());
}
protected Path getCurrentFile(Context context) throws IOException {
InputSplit split = context.getInputSplit();
if (split != null && split instanceof FileSplit) {
FileSplit inputSplit = (FileSplit) split;
Path path = inputSplit.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
return path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
}
return null;
}
@Override
protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException {
BlurRecord record = _mutate.getRecord();
record.clearColumns();
String str = value.toString();
Iterable<String> split = _splitter.split(str);
List<String> list = toList(split);
int offset = 0;
boolean gen = false;
if (!_autoGenerateRowIdAsHashOfData) {
record.setRowId(list.get(offset++));
} else {
_digest.reset();
byte[] bs = value.getBytes();
int length = value.getLength();
_digest.update(bs, 0, length);
record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
gen = true;
}
if (!_autoGenerateRecordIdAsHashOfData) {
record.setRecordId(list.get(offset++));
} else {
if (gen) {
record.setRecordId(record.getRowId());
} else {
_digest.reset();
byte[] bs = value.getBytes();
int length = value.getLength();
_digest.update(bs, 0, length);
record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
}
}
String family;
if (_familyNotInFile) {
family = _familyFromPath;
} else {
family = list.get(offset++);
}
record.setFamily(family);
List<String> columnNames = _columnNameMap.get(family);
if (columnNames == null) {
throw new IOException("Family [" + family + "] is missing in the definition.");
}
if (list.size() - offset != columnNames.size()) {
String options = "";
if (!_autoGenerateRowIdAsHashOfData) {
options += "rowid,";
}
if (!_autoGenerateRecordIdAsHashOfData) {
options += "recordid,";
}
if (!_familyNotInFile) {
options += "family,";
}
String msg = "Record [" + str + "] does not match defined record [" + options + getColumnNames(columnNames)
+ "].";
throw new IOException(msg);
}
for (int i = 0; i < columnNames.size(); i++) {
String val = handleHiveNulls(list.get(i + offset));
if (val != null) {
record.addColumn(columnNames.get(i), val);
_columnCounter.increment(1);
}
}
_key.set(record.getRowId());
_mutate.setMutateType(MUTATE_TYPE.REPLACE);
context.write(_key, _mutate);
_recordCounter.increment(1);
context.progress();
}
protected String handleHiveNulls(String value) {
if (value.equals(HIVE_NULL)) {
return null;
}
return value;
}
public void setFamilyFromPath(String familyFromPath) {
this._familyFromPath = familyFromPath;
}
protected String getColumnNames(List<String> columnNames) {
StringBuilder builder = new StringBuilder();
for (String c : columnNames) {
if (builder.length() != 0) {
builder.append(',');
}
builder.append(c);
}
return builder.toString();
}
protected List<String> toList(Iterable<String> split) {
List<String> lst = new ArrayList<String>();
for (String s : split) {
lst.add(s);
}
return lst;
}
}