blob: de24e4c385844ce06076b49cec8e3ca7b1ec1d5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.coprocessor.ReplicationSinkEndpoint;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests to replicate Phoenix metadata attributes like tenant id, schema, logical table name,
* table type etc over to the Sink cluster. For this, we need to set two important attributes:
* 1. hbase.coprocessor.regionserver.classes
* 2. phoenix.append.metadata.to.wal
*/
@Category(NeedsOwnMiniClusterTest.class)
public class ReplicationWithWALAnnotationIT extends BaseTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationWithWALAnnotationIT.class);
private static final String SCHEMA_NAME = generateUniqueName();
private static final String DATA_TABLE_NAME = generateUniqueName();
private static final String INDEX_TABLE_NAME = "IDX_" + DATA_TABLE_NAME;
private static final String TENANT_VIEW_NAME = generateUniqueName();
private static final String TENANT_VIEW_INDEX_NAME = "IDX_" + TENANT_VIEW_NAME;
private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME);
private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME);
private static final long REPLICATION_WAIT_TIME_MILLIS = 10000;
private static String url1;
private static String url2;
protected static Configuration conf1 = HBaseConfiguration.create();
protected static Configuration conf2;
protected static ZKWatcher zkw1;
protected static ZKWatcher zkw2;
protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
protected static final int REPLICATION_RETRIES = 10;
protected static final byte[] tableName = Bytes.toBytes("test");
protected static final byte[] row = Bytes.toBytes("row");
private static Map<String, String> props;
@BeforeClass
public static synchronized void setUpBeforeClass() throws Exception {
setupConfigsAndStartCluster();
props = Maps.newHashMapWithExpectedSize(3);
props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
url1 = getLocalClusterUrl(utility1);
url2 = getLocalClusterUrl(utility2);
}
@AfterClass
public static void afterClass() throws Exception {
try {
utility1.shutdownMiniCluster();
utility2.shutdownMiniCluster();
} finally {
ServerMetadataCacheTestImpl.resetCache();
}
}
private static void setupConfigsAndStartCluster() throws Exception {
conf1.setInt("zookeeper.recovery.retry", 1);
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5);
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
// The most important configs:
// 1. hbase.coprocessor.regionserver.classes
// 2. phoenix.append.metadata.to.wal
conf1.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
ReplicationSinkEndpoint.class.getName());
conf1.setBoolean(IndexRegionObserver.PHOENIX_APPEND_METADATA_TO_WAL, true);
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
setUpConfigForMiniCluster(conf1);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
conf1 = utility1.getConfiguration();
zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
Admin admin = ConnectionFactory.createConnection(conf1).getAdmin();
// Base conf2 on conf1 so it gets the right zk cluster, and general cluster configs
conf2 = HBaseConfiguration.create(conf1);
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("dfs.support.append", true);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility2 = new HBaseTestingUtility(conf2);
utility2.startMiniZKCluster();
zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
admin.addReplicationPeer("1",
ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build());
}
private boolean isReplicationSinkEndpointEnabled() {
// true for 2.4.16+ or 2.5.3+ versions, false otherwise
String hbaseVersion = VersionInfo.getVersion();
String[] versionArr = hbaseVersion.split("\\.");
int majorVersion = Integer.parseInt(versionArr[0]);
int minorVersion = Integer.parseInt(versionArr[1]);
int patchVersion = Integer.parseInt(versionArr[2].split("-hadoop")[0]);
if (majorVersion > 2) {
return true;
}
if (majorVersion < 2) {
return false;
}
if (minorVersion > 5) {
return true;
}
if (minorVersion < 4) {
return false;
}
if (minorVersion == 4) {
return patchVersion >= 16;
}
return patchVersion >= 3;
}
@Test
public void testReplicationWithWALExtendedAttributes() throws Exception {
Assume.assumeTrue("Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+",
isReplicationSinkEndpointEnabled());
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
Connection primaryConnection = getConnection(url1);
primaryConnection.createStatement()
.execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
//create the primary and index tables
primaryConnection.createStatement().execute(
String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "
+ "REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
primaryConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
ResultSet rs = primaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = primaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
// de-register drive
destroyDriver(driver);
// register driver for url2
driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
Connection secondaryConnection = getConnection(url2);
secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
//create the primary and index tables
secondaryConnection.createStatement().execute(
String.format("CREATE TABLE %s (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "
+ "REPLICATION_SCOPE = 1", DATA_TABLE_FULL_NAME));
secondaryConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
rs = secondaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = secondaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
// add coproc to verify whether required attributes are getting replicated to the Sink side
TestUtil.addCoprocessor(secondaryConnection, DATA_TABLE_FULL_NAME,
TestCoprocessorForWALAnnotationAtSink.class);
// de-register drive
destroyDriver(driver);
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
primaryConnection = getConnection(url1);
// load some data into the source cluster table
PreparedStatement stmt =
primaryConnection.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
stmt.setString(1, "a1");
stmt.setString(2, "x1");
stmt.setString(3, "11");
stmt.execute();
stmt.setString(1, "a2");
stmt.setString(2, "x2");
stmt.setString(3, "12");
stmt.execute();
primaryConnection.commit();
// make sure the index is working as expected
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = primaryConnection.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x1", rs.getString(1));
assertTrue(rs.next());
assertEquals("x2", rs.getString(1));
assertFalse(rs.next());
primaryConnection.close();
assertReplicatedData(DATA_TABLE_FULL_NAME);
// de-register drive
destroyDriver(driver);
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
primaryConnection = getConnection(url1);
primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
// de-register drive
destroyDriver(driver);
// register driver for url2
driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
secondaryConnection = getConnection(url2);
secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
// de-register drive
destroyDriver(driver);
}
@Test
public void testReplicationWithWALExtendedAttributesWithTenants() throws Exception {
Assume.assumeTrue("Replication sink endpoint on hbase versions 2.4.16+ or 2.5.3+",
isReplicationSinkEndpointEnabled());
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
Connection primaryConnection = getConnection(url1);
primaryConnection.createStatement()
.execute(String.format("DROP TABLE IF EXISTS %s CASCADE", DATA_TABLE_FULL_NAME));
//create the primary and index tables
primaryConnection.createStatement().execute(
String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 "
+ "VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) "
+ "REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
primaryConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
Connection primaryTenantConnection = getTenantConnection(url1, "tenant01");
primaryTenantConnection.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ",
TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
primaryTenantConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
ResultSet rs = primaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = primaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + TENANT_VIEW_NAME;
rs = primaryTenantConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
rs = primaryTenantConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
// de-register drive
destroyDriver(driver);
// register driver for url2
driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
Connection secondaryConnection = getConnection(url2);
secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
//create the primary and index tables
secondaryConnection.createStatement().execute(
String.format("CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, k VARCHAR NOT NULL, v1 "
+ "VARCHAR, v2 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, k)) "
+ "REPLICATION_SCOPE = 1, MULTI_TENANT = true", DATA_TABLE_FULL_NAME));
secondaryConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", INDEX_TABLE_NAME, DATA_TABLE_FULL_NAME));
Connection secondaryTenantConnection = getTenantConnection(url2, "tenant01");
secondaryTenantConnection.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s ",
TENANT_VIEW_NAME, DATA_TABLE_FULL_NAME));
secondaryTenantConnection.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (v1)", TENANT_VIEW_INDEX_NAME, TENANT_VIEW_NAME));
query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
rs = secondaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
rs = secondaryConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + TENANT_VIEW_NAME;
rs = secondaryTenantConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
rs = secondaryTenantConnection.createStatement().executeQuery(query);
assertFalse(rs.next());
// add coproc to verify whether required attributes are getting replicated to the Sink side
TestUtil.addCoprocessor(secondaryConnection, DATA_TABLE_FULL_NAME,
TestTenantCoprocessorForWALAnnotationAtSink.class);
// de-register drive
destroyDriver(driver);
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
primaryTenantConnection = getTenantConnection(url1, "tenant01");
// load some data into the source cluster table
PreparedStatement stmt =
primaryTenantConnection.prepareStatement("UPSERT INTO " + TENANT_VIEW_NAME + " VALUES(?,?,?)");
stmt.setString(1, "a1");
stmt.setString(2, "x1");
stmt.setString(3, "11");
stmt.execute();
stmt.setString(1, "a2");
stmt.setString(2, "x2");
stmt.setString(3, "12");
stmt.execute();
primaryTenantConnection.commit();
// make sure the index is working as expected
query = "SELECT * FROM " + TENANT_VIEW_INDEX_NAME;
rs = primaryTenantConnection.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("x1", rs.getString(1));
assertTrue(rs.next());
assertEquals("x2", rs.getString(1));
assertFalse(rs.next());
primaryTenantConnection.close();
assertReplicatedData(DATA_TABLE_FULL_NAME);
// de-register drive
destroyDriver(driver);
// register driver for url1
driver = initAndRegisterTestDriver(url1, new ReadOnlyProps(props.entrySet().iterator()));
primaryConnection = getConnection(url1);
primaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
// de-register drive
destroyDriver(driver);
// register driver for url2
driver = initAndRegisterTestDriver(url2, new ReadOnlyProps(props.entrySet().iterator()));
secondaryConnection = getConnection(url2);
secondaryConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s CASCADE",
DATA_TABLE_FULL_NAME));
// de-register drive
destroyDriver(driver);
}
private void assertReplicatedData(String tableName) throws IOException, InterruptedException {
LOGGER.info("Looking up tables in replication target");
org.apache.hadoop.hbase.client.Connection hbaseConn =
ConnectionFactory.createConnection(utility2.getConfiguration());
Table remoteTable = hbaseConn.getTable(TableName.valueOf(tableName));
for (int i = 0; i < REPLICATION_RETRIES; i++) {
if (i >= REPLICATION_RETRIES - 1) {
fail("Waited too much time for put replication on table " + remoteTable
.getDescriptor().getTableName());
}
if (ensureRows(remoteTable, 2)) {
break;
}
LOGGER.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+ " for edits to get replicated");
Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
}
remoteTable.close();
}
private boolean ensureRows(Table remoteTable, int numRows) throws IOException {
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scanner = remoteTable.getScanner(scan);
int rows = 0;
for (Result r : scanner) {
LOGGER.info("got row: {}", r);
rows++;
}
scanner.close();
return rows == numRows;
}
private static Connection getConnection(String url) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
return DriverManager.getConnection(url, props);
}
private static Connection getTenantConnection(String url, String tenantId) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
return DriverManager.getConnection(url, props);
}
public static class TestCoprocessorForWALAnnotationAtSink implements RegionCoprocessor, RegionObserver {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
String tenantId = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
String schemaName = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
String logicalTableName = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
String tableType = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, "
+ "logicalTableName: {}, tableType: {}", tenantId, schemaName, logicalTableName, tableType);
if (tenantId != null || !SCHEMA_NAME.equals(schemaName) || !DATA_TABLE_NAME.equals(logicalTableName) ||
!PTableType.TABLE.getValue().toString().equals(tableType)) {
throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
}
}
}
public static class TestTenantCoprocessorForWALAnnotationAtSink implements RegionCoprocessor, RegionObserver {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
String tenantId = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()));
String schemaName = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()));
String logicalTableName = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()));
String tableType = Bytes.toString(miniBatchOp.getOperation(0)
.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()));
LOGGER.info("TestCoprocessorForWALAnnotationAtSink preBatchMutate: tenantId: {}, schemaName: {}, "
+ "logicalTableName: {}, tableType: {}", tenantId, schemaName, logicalTableName, tableType);
if (!"tenant01".equals(tenantId) || !"".equals(schemaName) || !TENANT_VIEW_NAME.equals(logicalTableName) ||
!PTableType.VIEW.getValue().toString().equals(tableType)) {
throw new IOException("Replication Sink mutation attributes are not matching. Abort the mutation.");
}
}
}
}