blob: 22bbfe688a197ef6fe9c6cb8adc34fc6a87a7bd9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.validation.ValidationException;
/**
* Implement DBSplitter over text strings.
*/
public class TextSplitter extends BigDecimalSplitter {
public static final String ALLOW_TEXT_SPLITTER_PROPERTY = "org.apache.sqoop.splitter.allow_text_splitter";
private static final Log LOG = LogFactory.getLog(TextSplitter.class);
private boolean useNCharStrings = false;
/**
* This method needs to determine the splits between two user-provided
* strings. In the case where the user's strings are 'A' and 'Z', this is
* not hard; we could create two splits from ['A', 'M') and ['M', 'Z'], 26
* splits for strings beginning with each letter, etc.
*
* If a user has provided us with the strings "Ham" and "Haze", however, we
* need to create splits that differ in the third letter.
*
* The algorithm used is as follows:
* Since there are 2**16 unicode characters, we interpret characters as
* digits in base 65536. Given a string 's' containing characters s_0, s_1
* .. s_n, we interpret the string as the number: 0.s_0 s_1 s_2.. s_n in
* base 65536. Having mapped the low and high strings into floating-point
* values, we then use the BigDecimalSplitter to establish the even split
* points, then map the resulting floating point values back into strings.
*/
public List<InputSplit> split(Configuration conf, ResultSet results,
String colName) throws SQLException, ValidationException {
if (!conf.getBoolean(ALLOW_TEXT_SPLITTER_PROPERTY, false)) {
throw new ValidationException("Generating splits for a textual index column " + "allowed only in case of \"-D"
+ ALLOW_TEXT_SPLITTER_PROPERTY + "=true\" property " + "passed as a parameter");
}
LOG.warn("Generating splits for a textual index column.");
LOG.warn("If your database sorts in a case-insensitive order, "
+ "this may result in a partial import or duplicate records.");
LOG.warn("You are strongly encouraged to choose an integral split column.");
String minString = results.getString(1);
String maxString = results.getString(2);
boolean minIsNull = false;
// If the min value is null, switch it to an empty string instead for
// purposes of interpolation. Then add [null, null] as a special case
// split.
if (null == minString) {
minString = "";
minIsNull = true;
}
if (null == maxString) {
// If the max string is null, then the min string has to be null too.
// Just return a special split for this case.
List<InputSplit> splits = new ArrayList<InputSplit>();
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return splits;
}
// Use this as a hint. May need an extra task if the size doesn't
// divide cleanly.
int numSplits = ConfigurationHelper.getConfNumMaps(conf);
String lowClausePrefix = colName + " >= " + (useNCharStrings ? "N'" : "'");
String highClausePrefix = colName + " < " + (useNCharStrings ? "N'" : "'");
// If there is a common prefix between minString and maxString, establish
// it and pull it out of minString and maxString.
int maxPrefixLen = Math.min(minString.length(), maxString.length());
int sharedLen;
for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
char c1 = minString.charAt(sharedLen);
char c2 = maxString.charAt(sharedLen);
if (c1 != c2) {
break;
}
}
// The common prefix has length 'sharedLen'. Extract it from both.
String commonPrefix = minString.substring(0, sharedLen);
minString = minString.substring(sharedLen);
maxString = maxString.substring(sharedLen);
List<String> splitStrings = split(numSplits, minString, maxString,
commonPrefix);
List<InputSplit> splits = new ArrayList<InputSplit>();
// Convert the list of split point strings into an actual set of
// InputSplits.
String start = splitStrings.get(0);
for (int i = 1; i < splitStrings.size(); i++) {
String end = splitStrings.get(i);
if (i == splitStrings.size() - 1) {
// This is the last one; use a closed interval.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start + "'", colName
+ " <= " + (useNCharStrings ? "N'" : "'") + end + "'"));
} else {
// Normal open-interval case.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
lowClausePrefix + start + "'", highClausePrefix + end + "'"));
}
start = end;
}
if (minIsNull) {
// Add the special null split at the end.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
}
return splits;
}
public List<String> split(int numSplits, String minString,
String maxString, String commonPrefix) throws SQLException, ValidationException {
BigDecimal minVal = stringToBigDecimal(minString);
BigDecimal maxVal = stringToBigDecimal(maxString);
if (minVal.compareTo(maxVal) > 0) {
throw new ValidationException( minVal + " is greater than " + maxVal);
}
List<BigDecimal> splitPoints = split(
new BigDecimal(numSplits), minVal, maxVal);
List<String> splitStrings = new ArrayList<String>();
// Convert the BigDecimal splitPoints into their string representations.
for (BigDecimal bd : splitPoints) {
splitStrings.add(commonPrefix + bigDecimalToString(bd));
}
// Make sure that our user-specified boundaries are the first and last
// entries in the array.
if (splitStrings.size() == 0
|| !splitStrings.get(0).equals(commonPrefix + minString)) {
splitStrings.add(0, commonPrefix + minString);
}
if (splitStrings.size() == 1
|| !splitStrings.get(splitStrings.size() - 1).equals(
commonPrefix + maxString)) {
splitStrings.add(commonPrefix + maxString);
}
return splitStrings;
}
private static final BigDecimal ONE_PLACE = new BigDecimal(65536);
// Maximum number of characters to convert. This is to prevent rounding
// errors or repeating fractions near the very bottom from getting out of
// control. Note that this still gives us a huge number of possible splits.
private static final int MAX_CHARS = 8;
/**
* Return a BigDecimal representation of string 'str' suitable for use in a
* numerically-sorting order.
*/
public BigDecimal stringToBigDecimal(String str) {
// Start with 1/65536 to compute the first digit.
BigDecimal curPlace = ONE_PLACE;
BigDecimal result = BigDecimal.ZERO;
int len = Math.min(str.length(), MAX_CHARS);
for (int i = 0; i < len; i++) {
int codePoint = str.codePointAt(i);
result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
// advance to the next less significant place. e.g., 1/(65536^2) for the
// second char.
curPlace = curPlace.multiply(ONE_PLACE);
}
return result;
}
/**
* Return the string encoded in a BigDecimal.
* Repeatedly multiply the input value by 65536; the integer portion after
* such a multiplication represents a single character in base 65536.
* Convert that back into a char and create a string out of these until we
* have no data left.
*/
public String bigDecimalToString(BigDecimal bd) {
BigDecimal cur = bd.stripTrailingZeros();
StringBuilder sb = new StringBuilder();
for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
cur = cur.multiply(ONE_PLACE);
int curCodePoint = cur.intValue();
if (0 == curCodePoint) {
break;
}
cur = cur.subtract(new BigDecimal(curCodePoint));
sb.append(Character.toChars(curCodePoint));
}
return sb.toString();
}
public void setUseNCharStrings(boolean use) {
useNCharStrings = use;
}
public boolean isUseNCharStrings() {
return useNCharStrings;
}
}