blob: 8f4bcf17a2f2bfcd04fa994716ce2a9bcc417a26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.ToNumberPolicy;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
import static org.apache.phoenix.util.MetaDataUtil.getViewIndexPhysicalName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CDCBaseIT extends ParallelStatsDisabledIT {
static final HashSet<PTable.CDCChangeScope> CHANGE_IMG =
new HashSet<>(Arrays.asList(PTable.CDCChangeScope.CHANGE));
static final HashSet<PTable.CDCChangeScope> PRE_POST_IMG = new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST));
protected ManualEnvironmentEdge injectEdge;
protected Gson gson = new GsonBuilder()
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.create();
protected Calendar cal = Calendar.getInstance();
protected void createTable(Connection conn, String table_sql)
throws Exception {
createTable(conn, table_sql, null, false, null, false, null);
}
protected void createTable(Connection conn, String table_sql,
PTable.QualifierEncodingScheme encodingScheme)
throws Exception {
createTable(conn, table_sql, encodingScheme, false, null, false, null);
}
protected void createTable(Connection conn, String table_sql,
PTable.QualifierEncodingScheme encodingScheme, boolean multitenant)
throws Exception {
createTable(conn, table_sql, encodingScheme, multitenant, null, false, null);
}
protected void createTable(Connection conn, String table_sql,
PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
Integer nSaltBuckets, boolean immutable, PTable.ImmutableStorageScheme immutableStorageScheme)
throws Exception {
createTable(conn, table_sql, encodingScheme, multitenant, nSaltBuckets, null, immutable, immutableStorageScheme);
}
protected void createTable(Connection conn, String table_sql,
PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
Integer nSaltBuckets, PTable.IndexType indexType, boolean immutable,
PTable.ImmutableStorageScheme immutableStorageScheme)
throws Exception {
createTable(conn, table_sql, new HashMap<String, Object>() {{
put(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName(), encodingScheme != null ?
new Byte(encodingScheme.getSerializedMetadataValue()) : null);
put(TableProperty.MULTI_TENANT.getPropertyName(), multitenant);
put(TableProperty.SALT_BUCKETS.getPropertyName(), nSaltBuckets);
put(TableProperty.INDEX_TYPE.getPropertyName(), indexType);
put(TableProperty.IMMUTABLE_ROWS.getPropertyName(), immutable);
put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(), immutableStorageScheme != null ?
immutableStorageScheme.name() : null);
}});
}
protected void createTable(Connection conn, String table_sql,
Map<String,Object> tableProps) throws Exception {
List<String> props = new ArrayList<>();
Byte encodingScheme = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
if (encodingScheme != null && encodingScheme !=
QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES) {
props.add(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName() + "=" + encodingScheme);
}
Boolean multitenant = (Boolean) TableProperty.MULTI_TENANT.getValue(tableProps);
if (multitenant != null && multitenant) {
props.add(TableProperty.MULTI_TENANT.getPropertyName() + "=" + multitenant);
}
Integer nSaltBuckets = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
if (nSaltBuckets != null) {
props.add(TableProperty.SALT_BUCKETS.getPropertyName() + "=" + nSaltBuckets);
}
PTable.IndexType indexType = (PTable.IndexType) TableProperty.INDEX_TYPE.getValue(
tableProps);
if (indexType != null && indexType == PTable.IndexType.LOCAL) {
props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" +
(indexType == PTable.IndexType.LOCAL ? "l" : "g"));
}
if (nSaltBuckets != null) {
props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" + indexType);
}
Boolean immutableTable = (Boolean) TableProperty.IMMUTABLE_ROWS.getValue(tableProps);
if (immutableTable) {
props.add(TableProperty.IMMUTABLE_ROWS.getPropertyName() + "=true");
}
PTable.ImmutableStorageScheme immutableStorageScheme =
(PTable.ImmutableStorageScheme) TableProperty
.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
if (immutableStorageScheme != null) {
props.add(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName() + "="
+ immutableStorageScheme.name());
}
table_sql += " " + String.join(", ", props);
conn.createStatement().execute(table_sql);
}
protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
String cdc_sql) throws Exception {
createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, null);
}
protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
String cdc_sql, PTable.IndexType indexType) throws Exception{
createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, indexType);
}
protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
Integer nSaltBuckets) throws Exception {
createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, nSaltBuckets, null);
}
protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
Integer nSaltBuckets, PTable.IndexType indexType) throws Exception {
// For CDC, multitenancy gets derived automatically via the parent table.
createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, indexType, false, null);
String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
tableName = SchemaUtil.getTableNameFromFullName(tableName);
IndexToolIT.runIndexTool(false, schemaName, tableName,
"\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
String indexFullName = SchemaUtil.getTableName(schemaName,
CDCUtil.getCDCIndexName(cdcName));
TestUtil.waitForIndexState(conn, indexFullName, PIndexState.ACTIVE);
}
protected void assertCDCState(Connection conn, String cdcName, String expInclude,
int idxType) throws SQLException {
try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
"system.catalog WHERE table_name = '" + cdcName +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(true, rs.next());
assertEquals(expInclude, rs.getString(1));
}
try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
"system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(true, rs.next());
assertEquals(idxType, rs.getInt(1));
}
}
protected void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
String tableName, String datatableName)
throws SQLException {
Properties props = new Properties();
String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
Connection conn = DriverManager.getConnection(getUrl(), props);
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
PTable cdcTable = PhoenixRuntime.getTable(conn, cdcFullName);
assertEquals(expIncludeScopes, cdcTable.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(cdcTable));
assertNull(cdcTable.getIndexState()); // Index state should be null for CDC.
assertNull(cdcTable.getIndexType()); // This is not an index.
assertEquals(tableName, cdcTable.getParentName().getString());
String indexFullName = SchemaUtil.getTableName(schemaName,
CDCUtil.getCDCIndexName(cdcName));
assertEquals(cdcTable.getPhysicalName().getString(), tableName == datatableName ?
indexFullName : getViewIndexPhysicalName(datatableName));
}
protected void assertSaltBuckets(Connection conn, String tableName, Integer nbuckets)
throws SQLException {
PTable table = PhoenixRuntime.getTable(conn, tableName);
assertSaltBuckets(table, nbuckets);
}
protected void assertSaltBuckets(PTable table, Integer nbuckets) {
if (nbuckets == null || nbuckets == 0) {
assertNull(table.getBucketNum());
} else {
assertEquals(nbuckets, table.getBucketNum());
}
}
protected void assertNoResults(Connection conn, String cdcName) throws SQLException {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from " + cdcName);
assertFalse(rs.next());
}
}
protected Connection newConnection() throws SQLException {
return newConnection(null);
}
protected Connection newConnection(String tenantId) throws SQLException {
Properties props = new Properties();
// Uncomment these only while debugging.
//props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
//props.put("hbase.client.scanner.timeout.period", "6000000");
//props.put("phoenix.query.timeoutMs", "6000000");
//props.put("zookeeper.session.timeout", "6000000");
//props.put("hbase.rpc.timeout", "6000000");
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
return DriverManager.getConnection(getUrl(), props);
}
private Map<String, Object> addChange(Connection conn, Map preImage,
long changeTS, String changeType, String tableName,
Map<String, Object> pks, Map<String, Object> values)
throws SQLException {
if (conn != null) {
String sql;
if (changeType == CDC_DELETE_EVENT_TYPE) {
String predicates = pks.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).
collect(Collectors.joining(", "));
sql = "DELETE FROM " + tableName + " WHERE " + predicates;
}
else {
String columnList = Stream.concat(pks.keySet().stream(),
values.keySet().stream()).collect(Collectors.joining(", "));
String valueList =
Stream.concat(pks.values().stream(), values.values().stream())
.map(v -> String.valueOf(v)).collect(Collectors.joining(", "));
sql = "UPSERT INTO " + tableName + " (" + columnList + ") VALUES (" + valueList + ")";
}
cal.setTimeInMillis(changeTS);
injectEdge.setValue(changeTS);
try (Statement stmt = conn.createStatement()) {
stmt.execute(sql);
}
}
Map<String, Object> cdcChange = new HashMap<>();
cdcChange.put(CDC_EVENT_TYPE, changeType);
cdcChange.put(CDC_PRE_IMAGE, preImage);
if (changeType == CDC_UPSERT_EVENT_TYPE) {
Map<String, Object> changeImage = new HashMap<>();
changeImage.putAll(values);
cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
Map<String, Object> postImage = new HashMap<>();
postImage.putAll(preImage);
postImage.putAll(changeImage);
cdcChange.put(CDC_POST_IMAGE, postImage);
}
return cdcChange;
}
// FIXME: Add the following with consecutive upserts on the sake PK (no delete in between):
// - with different values
// - with a null
// - missing columns
protected List<ChangeRow> generateChanges(long startTS, String[] tenantids, String tableName,
String datatableNameForDDL, CommitAdapter committer)
throws Exception {
List<ChangeRow> changes = new ArrayList<>();
EnvironmentEdgeManager.injectEdge(injectEdge);
injectEdge.setValue(startTS);
boolean dropV3Done = false;
committer.init();
Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
for (String tid: tenantids) {
try (Connection conn = committer.getConnection(tid)) {
c1 = addChange(conn, new HashMap(), startTS,
CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
put("V1", 100L);
put("V2", 1000L);
put("B.VB", 10000L);
}});
changes.add(new ChangeRow(tid, startTS, pk1, c1));
c2 = addChange(conn, new HashMap(), startTS,
CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
put("V1", 200L);
put("V2", 2000L);
}});
changes.add(new ChangeRow(tid, startTS, pk2, c2));
committer.commit(conn);
c3 = addChange(conn, new HashMap(), startTS +=100,
CDC_UPSERT_EVENT_TYPE,
tableName, pk3, new TreeMap<String, Object>() {{
put("V1", 300L);
put("V2", null);
put("B.VB", null);
}});
changes.add(new ChangeRow(tid, startTS, pk3, c3));
committer.commit(conn);
c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE),
startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 101L);
}});
changes.add(new ChangeRow(tid, startTS, pk1, c4));
committer.commit(conn);
}
if (datatableNameForDDL != null && !dropV3Done) {
try (Connection conn = newConnection()) {
conn.createStatement().execute("ALTER TABLE " + datatableNameForDDL +
" DROP COLUMN v3");
}
dropV3Done = true;
}
try (Connection conn = newConnection(tid)) {
c5 = addChange(conn, (Map) c4.get(CDC_POST_IMAGE), startTS +=100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
changes.add(new ChangeRow(tid, startTS, pk1, c5));
committer.commit(conn);
c6 = addChange(conn, new HashMap(),
startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 102L);
put("V2", 1002L);
}});
changes.add(new ChangeRow(tid, startTS, pk1, c6));
committer.commit(conn);
c7 = addChange(conn, (Map) c6.get(CDC_POST_IMAGE), startTS +=100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
changes.add(new ChangeRow(tid, startTS, pk1, c7));
committer.commit(conn);
c8 = addChange(conn, (Map) c2.get(CDC_POST_IMAGE),
startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk2,
new TreeMap<String, Object>() {{
put("V1", 201L);
put("V2", null);
put("B.VB", 20001L);
}});
changes.add(new ChangeRow(tid, startTS, pk2, c8));
committer.commit(conn);
c9 = addChange(conn, new HashMap(),
startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 103L);
put("V2", 1003L);
}});
changes.add(new ChangeRow(tid, startTS, pk1, c9));
committer.commit(conn);
c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS +=100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
changes.add(new ChangeRow(tid, startTS, pk1, c10));
committer.commit(conn);
c11 = addChange(conn, new HashMap(),
startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 104L);
put("V2", 1004L);
}});
changes.add(new ChangeRow(tid, startTS, pk1, c11));
committer.commit(conn);
c12 = addChange(conn, (Map) c11.get(CDC_POST_IMAGE), startTS +=100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
changes.add(new ChangeRow(tid, startTS, pk1, c12));
committer.commit(conn);
}
}
committer.reset();
return changes;
}
private void _copyScopeIfRelevant(Set<PTable.CDCChangeScope> changeScopes,
PTable.CDCChangeScope changeScope,
Map<String, Object> change, Map<String, Object> expChange,
String scopeKeyName) {
if (changeScopes.contains(changeScope) && change.containsKey(scopeKeyName)) {
expChange.put(scopeKeyName, change.get(scopeKeyName));
}
}
protected void verifyChanges(String tenantId, ResultSet rs, List<ChangeRow> changes,
Set<PTable.CDCChangeScope> changeScopes,
boolean mutableTable) throws Exception {
for (int i = 0, changenr = 0; i < changes.size(); ++i) {
ChangeRow changeRow = changes.get(i);
if (changeRow.getTenantID() != tenantId) {
continue;
}
Map<String, Object> expChange = new HashMap<>();
Map<String, Object> change = changeRow.change;
expChange.put(CDC_EVENT_TYPE, change.get(CDC_EVENT_TYPE));
_copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.PRE, change, expChange,
CDC_PRE_IMAGE);
_copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.POST, change, expChange,
CDC_POST_IMAGE);
_copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.CHANGE, change, expChange,
CDC_CHANGE_IMAGE);
String changeDesc = "Change " + (changenr+1) + ": " + changeRow;
assertTrue(changeDesc, rs.next());
Map cdcObj = gson.fromJson(rs.getString(3), HashMap.class);
// This is needed because for immutable tables, CDC can't distinguish a null value from
// that of a a missing cell.
if (!mutableTable && changeDesc != null) {
_purgeNulls(changeRow.getPreImage());
_purgeNulls(changeRow.getChangeImage());
_purgeNulls(changeRow.getPostImage());
}
assertEquals(changeDesc, changeRow.getChangeTimestamp(),
rs.getDate(1).getTime());
for (Map.Entry<String, Object> pk: changeRow.getPrimaryKeys().entrySet()) {
assertEquals(changeDesc, pk.getValue(), rs.getObject(pk.getKey()));
}
assertEquals(changeDesc, expChange, cdcObj);
++changenr;
}
assertFalse(rs.next());
}
private Object _ifIntConvertToLong(Object val) {
return (val instanceof Integer) ? new Long(((Integer) val).intValue()) : val;
}
private void _purgeNulls(Map image) {
if (image == null) {
return;
}
for (Iterator<Map.Entry> it = image.entrySet().iterator(); it.hasNext(); ) {
if (it.next().getValue() == null) {
it.remove();
}
}
}
protected List<ChangeRow> generateChangesImmutableTable(long startTS, String[] tenantids,
String tableName, CommitAdapter committer)
throws Exception {
List<ChangeRow> changes = new ArrayList<>();
EnvironmentEdgeManager.injectEdge(injectEdge);
injectEdge.setValue(startTS);
committer.init();
Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
for (String tid: tenantids) {
try (Connection conn = newConnection(tid)) {
c1 = addChange(conn, new HashMap(), startTS,
CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
put("V1", 100L);
put("V2", 1000L);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c1));
c2 = addChange(conn, new HashMap(), startTS += 100,
CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
put("V1", 200L);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk2, c2));
c3 = addChange(conn, new HashMap(), startTS += 100,
CDC_UPSERT_EVENT_TYPE,
tableName, pk3, new TreeMap<String, Object>() {{
put("V1", 300L);
put("V2", null);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk3, c3));
c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE), startTS += 100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c4));
c5 = addChange(conn, new HashMap(),
startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 102L);
put("V2", 1002L);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c5));
c6 = addChange(conn, (Map) c5.get(CDC_POST_IMAGE), startTS += 100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c6));
c7 = addChange(conn, new HashMap(),
startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 103L);
put("V2", 1003L);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c7));
c8 = addChange(conn, (Map) c7.get(CDC_POST_IMAGE), startTS += 100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c8));
c9 = addChange(conn, new HashMap(),
startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
new TreeMap<String, Object>() {{
put("V1", 104L);
put("V2", 1004L);
}});
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c9));
c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS += 100,
CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
committer.commit(conn);
changes.add(new ChangeRow(tid, startTS, pk1, c10));
}
}
committer.reset();
return changes;
}
protected class ChangeRow {
private final String tenantid;
private final long changeTS;
private final Map<String, Object> pks;
public String getTenantID() {
return tenantid;
}
public Map<String, Object> getPreImage() {
return (Map<String, Object>) change.get(CDC_PRE_IMAGE);
}
public Map<String, Object> getChangeImage() {
return (Map<String, Object>) change.get(CDC_CHANGE_IMAGE);
}
public Map<String, Object> getPostImage() {
return (Map<String, Object>) change.get(CDC_POST_IMAGE);
}
private final Map<String, Object> change;
ChangeRow(String tenantid, long changeTS, Map<String, Object> pks, Map<String, Object> change) {
this.tenantid = tenantid;
this.changeTS = changeTS;
this.pks = pks;
this.change = change;
}
public String toString() {
return gson.toJson(this);
}
public Map<String, Object> getPrimaryKeys() {
return pks;
}
public long getChangeTimestamp() {
return changeTS;
}
}
protected abstract class CommitAdapter {
abstract void commit(Connection conn) throws SQLException;
void init() {
EnvironmentEdgeManager.injectEdge(injectEdge);
}
public void reset() {
EnvironmentEdgeManager.reset();
}
public Connection getConnection(String tid) throws SQLException {
return newConnection(tid);
}
}
protected final CommitAdapter COMMIT_SUCCESS = new CommitAdapter() {
@Override
public void commit(Connection conn) throws SQLException {
conn.commit();
}
};
protected final CommitAdapter COMMIT_FAILURE_EXPECTED = new CommitAdapter() {
@Override
public void commit(Connection conn) throws SQLException {
try {
conn.commit();
// It is config issue commit didn't fail.
fail("Commit expected to fail");
} catch (SQLException e) {
// this is expected
}
}
@Override
void init() {
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
}
@Override
public void reset() {
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
}
};
}