blob: eceb65872b4b4fc5088a79ba989b4e6a94e98013 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.PhoenixJobCounters;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
/**
* Mapper that reads from the data table and checks the rows against the index table
*/
public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, Text, Text> {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexScrutinyMapper.class);
private Connection connection;
private List<ColumnInfo> targetTblColumnMetadata;
private long batchSize;
// holds a batch of rows from the table the mapper is iterating over
// Each row is a pair - the row TS, and the row values
private List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>();
private String targetTableQuery;
private int numTargetPkCols;
private boolean outputInvalidRows;
private OutputFormat outputFormat = OutputFormat.FILE;
private String qSourceTable;
private String qTargetTable;
private long executeTimestamp;
private int numSourcePkCols;
private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
private List<ColumnInfo> sourceTblColumnMetadata;
// used to write results to the output table
private Connection outputConn;
private PreparedStatement outputUpsertStmt;
private long outputMaxRows;
private MessageDigest md5;
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
super.setup(context);
final Configuration configuration = context.getConfiguration();
try {
// get a connection with correct CURRENT_SCN (so incoming writes don't throw off the
// scrutiny)
final Properties overrideProps = new Properties();
String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
connection.setAutoCommit(false);
batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
outputInvalidRows =
PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
// get the index table and column names
String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
final String qIndexTable =
PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
// set the target table based on whether we're running the MR over the data or index
// table
SourceTable sourceTable =
PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
SourceTargetColumnNames columnNames =
SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
? new SourceTargetColumnNames.DataSourceColNames(pdataTable,
pindexTable)
: new SourceTargetColumnNames.IndexSourceColNames(pdataTable,
pindexTable);
qSourceTable = columnNames.getQualifiedSourceTableName();
qTargetTable = columnNames.getQualifiedTargetTableName();
List<String> targetColNames = columnNames.getTargetColNames();
List<String> sourceColNames = columnNames.getSourceColNames();
List<String> targetPkColNames = columnNames.getTargetPkColNames();
String targetPksCsv =
Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
numSourcePkCols = columnNames.getSourcePkColNames().size();
numTargetPkCols = targetPkColNames.size();
if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties());
String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery);
}
outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
// Create the query against the target table
// Our query projection should be all the index column names (or their data table
// equivalent
// name)
targetTableQuery =
QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(), targetPksCsv,
Hint.NO_INDEX, false) + " IN ";
targetTblColumnMetadata =
PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames);
sourceTblColumnMetadata =
PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
LOGGER.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
} catch (SQLException | NoSuchAlgorithmException e) {
tryClosingResourceSilently(this.outputUpsertStmt);
tryClosingResourceSilently(this.connection);
tryClosingResourceSilently(this.outputConn);
throw new RuntimeException(e);
}
}
private static void tryClosingResourceSilently(AutoCloseable res) {
if (res != null) {
try {
res.close();
} catch (Exception e) {
LOGGER.error("Closing resource: " + res + " failed :", e);
}
}
}
@Override
protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
throws IOException, InterruptedException {
try {
final List<Object> values = record.getValues();
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
currentBatchValues.add(new Pair<>(record.getRowTs(), values));
if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize != 0) {
// if we haven't hit the batch size, just report progress and move on to next record
context.progress();
return;
} else {
// otherwise, process the batch
processBatch(context);
}
context.progress(); // Make sure progress is reported to Application Master.
} catch (SQLException | IllegalArgumentException e) {
LOGGER.error(" Error while read/write of a record ", e);
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
throw new IOException(e);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
tryClosingResourceSilently(this.outputUpsertStmt);
IOException throwException = null;
if (connection != null) {
try {
processBatch(context);
connection.close();
} catch (SQLException e) {
LOGGER.error("Error while closing connection in the PhoenixIndexMapper class ", e);
throwException = new IOException(e);
}
}
tryClosingResourceSilently(this.outputConn);
if (throwException != null) {
throw throwException;
}
}
private void processBatch(Context context)
throws SQLException, IOException, InterruptedException {
if (currentBatchValues.size() == 0) return;
context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
// our query selection filter should be the PK columns of the target table (index or data
// table)
String inClause =
QueryUtil.constructParameterizedInClause(numTargetPkCols,
currentBatchValues.size());
String indexQuery = targetTableQuery + inClause;
try (PreparedStatement targetStatement = connection.prepareStatement(indexQuery)) {
// while we build the PreparedStatement, we also maintain a hash of the target table
// PKs,
// which we use to join against the results of the query on the target table
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
buildTargetStatement(targetStatement);
// fetch results from the target table and output invalid rows
queryTargetTable(context, targetStatement, targetPkToSourceValues);
// any source values we have left over are invalid (e.g. data table rows without
// corresponding index row)
context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
.increment(targetPkToSourceValues.size());
if (outputInvalidRows) {
for (Pair<Long, List<Object>> sourceRowWithoutTargetRow : targetPkToSourceValues.values()) {
List<Object> valuesWithoutTarget = sourceRowWithoutTargetRow.getSecond();
if (OutputFormat.FILE.equals(outputFormat)) {
context.write(
new Text(Arrays.toString(valuesWithoutTarget.toArray())),
new Text("Target row not found"));
} else if (OutputFormat.TABLE.equals(outputFormat)) {
writeToOutputTable(context, valuesWithoutTarget, null, sourceRowWithoutTargetRow.getFirst(), -1L);
}
}
}
if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
outputUpsertStmt.executeBatch(); // write out invalid rows to output table
outputConn.commit();
}
currentBatchValues.clear();
}
}
private Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement)
throws SQLException {
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
new HashMap<>(currentBatchValues.size());
int rsIndex = 1;
for (Pair<Long, List<Object>> batchTsRow : currentBatchValues) {
List<Object> batchRow = batchTsRow.getSecond();
// our original query against the source table (which provided the batchRow) projected
// with the data table PK cols first, so the first numTargetPkCols form the PK
String targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols));
targetPkToSourceValues.put(targetPkHash, batchTsRow);
for (int i = 0; i < numTargetPkCols; i++) {
ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
Object value = batchRow.get(i);
if (value == null) {
targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType());
} else {
targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType());
}
}
}
return targetPkToSourceValues;
}
private void queryTargetTable(Context context, PreparedStatement targetStatement,
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues)
throws SQLException, IOException, InterruptedException {
ResultSet targetResultSet = targetStatement.executeQuery();
while (targetResultSet.next()) {
indxWritable.readFields(targetResultSet);
List<Object> targetValues = indxWritable.getValues();
// first grab the PK and try to join against the source input
// the query is such that first numTargetPkCols of the resultSet is the PK
List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
for (int i = 0; i < numTargetPkCols; i++) {
Object pkPart = targetResultSet.getObject(i + 1);
pkObjects.add(pkPart);
}
Long targetTS = targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp();
String targetPk = getPkHash(pkObjects);
// use the pk to fetch the source table column values
Pair<Long, List<Object>> sourceTsValues = targetPkToSourceValues.get(targetPk);
Long sourceTS = sourceTsValues.getFirst();
List<Object> sourceValues = sourceTsValues.getSecond();
// compare values starting after the PK (i.e. covered columns)
boolean isIndexedCorrectly =
compareValues(numTargetPkCols, targetValues, sourceValues, context);
if (isIndexedCorrectly) {
context.getCounter(PhoenixScrutinyJobCounters.VALID_ROW_COUNT).increment(1);
} else {
context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).increment(1);
if (outputInvalidRows) {
outputInvalidRow(context, sourceValues, targetValues, sourceTS, targetTS);
}
}
targetPkToSourceValues.remove(targetPk);
}
}
private void outputInvalidRow(Context context, List<Object> sourceValues,
List<Object> targetValues, long sourceTS, long targetTS) throws SQLException, IOException, InterruptedException {
if (OutputFormat.FILE.equals(outputFormat)) {
context.write(new Text(Arrays.toString(sourceValues.toArray())),
new Text(Arrays.toString(targetValues.toArray())));
} else if (OutputFormat.TABLE.equals(outputFormat)) {
writeToOutputTable(context, sourceValues, targetValues, sourceTS, targetTS);
}
}
// pass in null targetValues if the target row wasn't found
private void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
throws SQLException {
if (context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue() > outputMaxRows) {
return;
}
int index = 1;
outputUpsertStmt.setString(index++, qSourceTable); // SOURCE_TABLE
outputUpsertStmt.setString(index++, qTargetTable); // TARGET_TABLE
outputUpsertStmt.setLong(index++, executeTimestamp); // SCRUTINY_EXECUTE_TIME
outputUpsertStmt.setString(index++, getPkHash(sourceValues.subList(0, numSourcePkCols))); // SOURCE_ROW_PK_HASH
outputUpsertStmt.setLong(index++, sourceTS); // SOURCE_TS
outputUpsertStmt.setLong(index++, targetTS); // TARGET_TS
outputUpsertStmt.setBoolean(index++, targetValues != null); // HAS_TARGET_ROW
index = setStatementObjects(sourceValues, index, sourceTblColumnMetadata);
if (targetValues != null) {
index = setStatementObjects(targetValues, index, targetTblColumnMetadata);
} else { // for case where target row wasn't found, put nulls in prepared statement
for (int i = 0; i < sourceValues.size(); i++) {
outputUpsertStmt.setNull(index++, targetTblColumnMetadata.get(i).getSqlType());
}
}
outputUpsertStmt.addBatch();
}
private int setStatementObjects(List<Object> values, int index, List<ColumnInfo> colMetadata)
throws SQLException {
for (int i = 0; i < values.size(); i++) {
Object value = values.get(i);
ColumnInfo colInfo = colMetadata.get(i);
if (value != null) {
outputUpsertStmt.setObject(index++, value, colInfo.getSqlType());
} else {
outputUpsertStmt.setNull(index++, colInfo.getSqlType());
}
}
return index;
}
private boolean compareValues(int startIndex, List<Object> targetValues,
List<Object> sourceValues, Context context) throws SQLException {
if (targetValues == null || sourceValues == null) return false;
for (int i = startIndex; i < sourceValues.size(); i++) {
Object targetValue = targetValues.get(i);
Object sourceValue = sourceValues.get(i);
if (targetValue != null) {
if (sourceValue.getClass().isArray()) {
if (compareArrayTypes(sourceValue, targetValue)) {
continue;
}
} else {
if (targetValue.equals(sourceValue)) {
continue;
}
}
context.getCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT)
.increment(1);
return false;
}
}
return true;
}
private boolean compareArrayTypes(Object sourceValue, Object targetValue) {
if (sourceValue.getClass().getComponentType().equals(byte.class)) {
return Arrays.equals((byte[]) sourceValue, (byte[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(char.class)) {
return Arrays.equals((char[]) sourceValue, (char[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(boolean.class)) {
return Arrays.equals((boolean[]) sourceValue, (boolean[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(double.class)) {
return Arrays.equals((double[]) sourceValue, (double[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(int.class)) {
return Arrays.equals((int[]) sourceValue, (int[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(short.class)) {
return Arrays.equals((short[]) sourceValue, (short[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(long.class)) {
return Arrays.equals((long[]) sourceValue, (long[]) targetValue);
} else if (sourceValue.getClass().getComponentType().equals(float.class)) {
return Arrays.equals((float[]) sourceValue, (float[]) targetValue);
}
return false;
}
private String getPkHash(List<Object> pkObjects) {
try {
for (int i = 0; i < pkObjects.size(); i++) {
md5.update(sourceTblColumnMetadata.get(i).getPDataType().toBytes(pkObjects.get(i)));
}
return Hex.encodeHexString(md5.digest());
} finally {
md5.reset();
}
}
}