blob: 13df56d035e16b455f2bd4f5ebee8d38dec6fade [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.phoenix.end2end;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Tests for the {@link IndexScrutinyTool}
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
private String dataTableDdl;
private String indexTableDdl;
private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
private static final String
INDEX_UPSERT_SQL =
"UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
private static final String DELETE_SQL = "DELETE FROM %s ";
private String schemaName;
private String dataTableName;
private String dataTableFullName;
private String indexTableName;
private String indexTableFullName;
private Connection conn;
private PreparedStatement dataTableUpsertStmt;
private PreparedStatement indexTableUpsertStmt;
private long testTime;
private Properties props;
@Parameterized.Parameters public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
"CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
"CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
"CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
}
public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
this.dataTableDdl = dataTableDdl;
this.indexTableDdl = indexTableDdl;
}
/**
* Create the test data and index tables
*/
@Before public void setup() throws SQLException {
generateUniqueTableNames();
createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName));
props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
conn.setAutoCommit(false);
testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
}
@After public void teardown() throws SQLException {
if (conn != null) {
conn.close();
}
}
/**
* Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
*/
@Test public void testValidIndex() throws Exception {
// insert two rows
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
int numDataRows = countRows(conn, dataTableFullName);
int numIndexRows = countRows(conn, indexTableFullName);
// scrutiny should report everything as ok
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
// make sure row counts weren't modified by scrutiny
assertEquals(numDataRows, countRows(conn, dataTableFullName));
assertEquals(numIndexRows, countRows(conn, indexTableFullName));
}
@Test public void testScrutinyOnArrayTypes() throws Exception {
String dataTableName = generateUniqueName();
String indexTableName = generateUniqueName();
String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, VB VARBINARY)";
String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
try (Connection conn =
DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
conn.createStatement().execute(String.format(indexTableDDL, indexTableName, dataTableName));
// insert two rows
PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData, dataTableName));
upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
conn.commit();
List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
// Now insert a different varbinary row
upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
conn.commit();
PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex, indexTableName));
upsertIndexStmt.setString(1, "name-3");
upsertIndexStmt.setInt(2, 3);
upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
upsertIndexStmt.executeUpdate();
conn.commit();
completedJobs = runScrutiny(null, dataTableName, indexTableName);
job = completedJobs.get(0);
assertTrue(job.isSuccessful());
counters = job.getCounters();
assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
}
}
/**
* Tests running a scrutiny while updates and deletes are happening.
* Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
*/
@Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") public void testScrutinyWhileTakingWrites() throws Exception {
int id = 0;
while (id < 1000) {
int index = 1;
dataTableUpsertStmt.setInt(index++, id);
dataTableUpsertStmt.setString(index++, "name-" + id);
dataTableUpsertStmt.setInt(index++, id);
dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
dataTableUpsertStmt.executeUpdate();
id++;
}
conn.commit();
//CURRENT_SCN for scrutiny
long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
// launch background upserts and deletes
final Random random = new Random(0);
Runnable backgroundUpserts = new Runnable() {
@Override public void run() {
int idToUpsert = random.nextInt(1000);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
PreparedStatement
dataPS =
conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
}
};
Runnable backgroundDeletes = new Runnable() {
@Override public void run() {
int idToDelete = random.nextInt(1000);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String
deleteSql =
String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete;
conn.createStatement().executeUpdate(deleteSql);
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
}
};
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS);
scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS);
// scrutiny should report everything as ok
List<Job>
completedJobs =
runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
scheduledThreadPool.shutdown();
scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
}
/**
* Tests an index with the same # of rows as the data table, but one of the index rows is
* incorrect Scrutiny should report the invalid rows.
*/
@Test public void testEqualRowCountIndexIncorrect() throws Exception {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable the index and insert another row which is not indexed
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
// insert a bad row into the index
upsertIndexRow("badName", 2, 9999);
conn.commit();
// scrutiny should report the bad row
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
}
/**
* Tests an index where the index pk is correct (indexed col values are indexed correctly), but
* a covered index value is incorrect. Scrutiny should report the invalid row
*/
@Test public void testCoveredValueIncorrect() throws Exception {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable index and insert another data row
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
// insert a bad index row for the above data row
upsertIndexRow("name-2", 2, 9999);
conn.commit();
// scrutiny should report the bad row
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
}
/**
* Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
* scrutiny with batchsize of 10,
*/
@Test public void testBatching() throws Exception {
// insert 1001 data and index rows
int numTestRows = 1001;
for (int i = 0; i < numTestRows; i++) {
upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
}
conn.commit();
disableIndex();
// randomly delete some rows from the index
Random random = new Random();
for (int i = 0; i < 100; i++) {
int idToDelete = random.nextInt(numTestRows);
deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
}
conn.commit();
int numRows = countRows(conn, indexTableFullName);
int numDeleted = numTestRows - numRows;
// run scrutiny with batch size of 10
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
assertEquals(numTestRows / 10 + numTestRows % 10,
getCounterValue(counters, BATCHES_PROCESSED_COUNT));
}
/**
* Tests when there are more data table rows than index table rows Scrutiny should report the
* number of incorrect rows
*/
@Test public void testMoreDataRows() throws Exception {
upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
conn.commit();
disableIndex();
// these rows won't have a corresponding index row
upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
conn.commit();
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
}
/**
* Tests when there are more index table rows than data table rows Scrutiny should report the
* number of incorrect rows when run with the index as the source table
*/
@Test public void testMoreIndexRows() throws Exception {
upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
conn.commit();
disableIndex();
// these index rows won't have a corresponding data row
upsertIndexRow("name-2", 2, 95124);
upsertIndexRow("name-3", 3, 95125);
conn.commit();
List<Job>
completedJobs =
runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
}
/**
* Tests running with both the index and data tables as the source table If we have an
* incorrectly indexed row, it should be reported in each direction
*/
@Test public void testBothDataAndIndexAsSource() throws Exception {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable the index and insert another row which is not indexed
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
// insert a bad row into the index
upsertIndexRow("badName", 2, 9999);
conn.commit();
List<Job>
completedJobs =
runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH);
assertEquals(2, completedJobs.size());
for (Job job : completedJobs) {
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
}
}
/**
* Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
*/
@Test public void testOutputInvalidRowsToFile() throws Exception {
insertOneValid_OneBadVal_OneMissingTarget();
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
runScrutiny(argValues);
// check the output files
Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
List<Path> paths = Lists.newArrayList();
Path firstPart = null;
for (FileStatus outputFile : fs.listStatus(outputPath)) {
if (outputFile.getPath().getName().startsWith("part")) {
if (firstPart == null) {
firstPart = outputFile.getPath();
} else {
paths.add(outputFile.getPath());
}
}
}
if (dataTableDdl.contains("SALT_BUCKETS")) {
fs.concat(firstPart, paths.toArray(new Path[0]));
}
Path outputFilePath = firstPart;
assertTrue(fs.exists(outputFilePath));
FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
TreeSet<String> lines = Sets.newTreeSet();
try {
String line = null;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
} finally {
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(fsDataInputStream);
}
Iterator<String> lineIterator = lines.iterator();
assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, "
+ new Timestamp(testTime).toString() + ", 9999]", lineIterator.next());
assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
lineIterator.next());
}
/**
* Tests writing of results to the output table
*/
@Test public void testOutputInvalidRowsToTable() throws Exception {
insertOneValid_OneBadVal_OneMissingTarget();
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
List<Job> completedJobs = runScrutiny(argValues);
// check that the output table contains the invalid rows
long
scrutinyTimeMillis =
PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
String
invalidRowsQuery =
IndexScrutinyTableOutput
.getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
assertTrue(rs.next());
assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
assertEquals(2, rs.getInt("ID"));
assertEquals(2, rs.getInt(":ID"));
assertEquals(95123, rs.getInt("ZIP"));
assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
assertTrue(rs.next());
assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
assertEquals(3, rs.getInt("ID"));
assertEquals(null, rs.getObject(":ID")); // null for missing target row
assertFalse(rs.next());
// check that the job results were written correctly to the metadata table
assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
}
/**
* Tests that the config for max number of output rows is observed
*/
@Test public void testMaxOutputRows() throws Exception {
insertOneValid_OneBadVal_OneMissingTarget();
// set max to 1. There are two bad rows, but only 1 should get written to output table
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
List<Job> completedJobs = runScrutiny(argValues);
long
scrutinyTimeMillis =
PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
String
invalidRowsQuery =
IndexScrutinyTableOutput
.getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
assertTrue(rs.next());
if (dataTableDdl.contains("SALT_BUCKETS")) {
assertTrue(rs.next());
assertFalse(rs.next());
} else {
assertFalse(rs.next());
}
}
private SourceTargetColumnNames getColNames() throws SQLException {
PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
SourceTargetColumnNames
columnNames =
new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
return columnNames;
}
// inserts one valid data/index row, one data row with a missing index row,
// and one data row with an index row that has a bad covered col val
private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable the index and insert another row which is not indexed
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
conn.commit();
// insert a bad index row for one of the above data rows
upsertIndexRow("name-2", 2, 9999);
conn.commit();
}
private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
ResultSet rs;
ResultSet
metadataRs =
IndexScrutinyTableOutput
.queryAllMetadata(conn, dataTableFullName, indexTableFullName,
scrutinyTimeMillis);
assertTrue(metadataRs.next());
List<? extends Object>
expected =
Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
1L, 2L, 1L, 1L,
"[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
"[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
invalidRowsQuery);
if (dataTableDdl.contains("SALT_BUCKETS")) {
expected =
Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
2L, 1L, 2L,
"[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
"[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
invalidRowsQuery);
}
assertRsValues(metadataRs, expected);
String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
rs = conn.createStatement().executeQuery(missingTargetQuery);
assertTrue(rs.next());
assertEquals(3, rs.getInt("ID"));
assertFalse(rs.next());
String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
rs = conn.createStatement().executeQuery(badCoveredColQuery);
assertTrue(rs.next());
assertEquals(2, rs.getInt("ID"));
assertFalse(rs.next());
}
// assert the result set contains the expected values in the given order
private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
for (int i = 0; i < expected.size(); i++) {
assertEquals(expected.get(i), rs.getObject(i + 1));
}
}
private void generateUniqueTableNames() {
schemaName = generateUniqueName();
dataTableName = generateUniqueName();
dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
indexTableName = generateUniqueName();
indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
}
private void upsertIndexRow(String name, int id, int zip) throws SQLException {
indexTableUpsertStmt.setString(1, name);
indexTableUpsertStmt.setInt(2, id); // id
indexTableUpsertStmt.setInt(3, zip); // bad zip
indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
indexTableUpsertStmt.executeUpdate();
}
private void disableIndex() throws SQLException {
conn.createStatement().execute(
String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
conn.commit();
}
private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) {
return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
}
private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
false, null, null, null, scrutinyTS));
}
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName)
throws Exception {
return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
}
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
Long batchSize) throws Exception {
return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
}
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
Long batchSize, SourceTable sourceTable) throws Exception {
final String[]
cmdArgs =
getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
false, null, null, null, Long.MAX_VALUE);
return runScrutiny(cmdArgs);
}
private void upsertRow(PreparedStatement stmt, int id, String name, int zip) throws SQLException {
int index = 1;
// insert row
stmt.setInt(index++, id);
stmt.setString(index++, name);
stmt.setInt(index++, zip);
stmt.setTimestamp(index++, new Timestamp(testTime));
stmt.executeUpdate();
}
private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws SQLException {
int index = 1;
// insert row
stmt.setInt(index++, id);
stmt.setString(index++, name);
stmt.setBytes(index++, val);
stmt.executeUpdate();
}
private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
return deleteStmt.executeUpdate();
}
}