| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.io.debezium; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertTrue; |
| |
| import io.debezium.config.Configuration; |
| import io.debezium.relational.Tables; |
| import io.debezium.relational.ddl.DdlParserSql2003; |
| import io.debezium.relational.ddl.LegacyDdlParser; |
| import io.debezium.relational.history.DatabaseHistory; |
| import io.debezium.relational.history.DatabaseHistoryListener; |
| import io.debezium.text.ParsingException; |
| import io.debezium.util.Collect; |
| import java.util.Map; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProducerConsumerBase; |
| import org.apache.pulsar.client.api.Schema; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| /** |
| * Test the implementation of {@link PulsarDatabaseHistory}. |
| */ |
| public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { |
| |
| private PulsarDatabaseHistory history; |
| private Map<String, Object> position; |
| private Map<String, String> source; |
| private String topicName; |
| private String ddl; |
| |
| @BeforeMethod |
| @Override |
| protected void setup() throws Exception { |
| super.internalSetup(); |
| super.producerBaseSetup(); |
| |
| source = Collect.hashMapOf("server", "my-server"); |
| setLogPosition(0); |
| this.topicName = "persistent://my-property/my-ns/schema-changes-topic"; |
| this.history = new PulsarDatabaseHistory(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| private void testHistoryTopicContent(boolean skipUnparseableDDL) { |
| // Start up the history ... |
| Configuration config = Configuration.create() |
| .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) |
| .with(PulsarDatabaseHistory.TOPIC, topicName) |
| .with(DatabaseHistory.NAME, "my-db-history") |
| .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL) |
| .build(); |
| history.configure(config, null, DatabaseHistoryListener.NOOP, true); |
| history.start(); |
| |
| // Should be able to call start more than once ... |
| history.start(); |
| |
| history.initializeStorage(); |
| |
| // Calling it another time to ensure we can work with the DB history topic already existing |
| history.initializeStorage(); |
| |
| LegacyDdlParser recoveryParser = new DdlParserSql2003(); |
| LegacyDdlParser ddlParser = new DdlParserSql2003(); |
| ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well |
| Tables tables1 = new Tables(); |
| Tables tables2 = new Tables(); |
| Tables tables3 = new Tables(); |
| |
| // Recover from the very beginning ... |
| setLogPosition(0); |
| history.recover(source, position, tables1, recoveryParser); |
| |
| // There should have been nothing to recover ... |
| assertEquals(tables1.size(), 0); |
| |
| // Now record schema changes, which writes out to kafka but doesn't actually change the Tables ... |
| setLogPosition(10); |
| ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" + |
| "CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" + |
| "CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n"; |
| history.record(source, position, "db1", ddl); |
| |
| // Parse the DDL statement 3x and each time update a different Tables object ... |
| ddlParser.parse(ddl, tables1); |
| assertEquals(3, tables1.size()); |
| ddlParser.parse(ddl, tables2); |
| assertEquals(3, tables2.size()); |
| ddlParser.parse(ddl, tables3); |
| assertEquals(3, tables3.size()); |
| |
| // Record a drop statement and parse it for 2 of our 3 Tables... |
| setLogPosition(39); |
| ddl = "DROP TABLE foo;"; |
| history.record(source, position, "db1", ddl); |
| ddlParser.parse(ddl, tables2); |
| assertEquals(2, tables2.size()); |
| ddlParser.parse(ddl, tables3); |
| assertEquals(2, tables3.size()); |
| |
| // Record another DDL statement and parse it for 1 of our 3 Tables... |
| setLogPosition(10003); |
| ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);"; |
| history.record(source, position, "db1", ddl); |
| ddlParser.parse(ddl, tables3); |
| assertEquals(3, tables3.size()); |
| |
| // Stop the history (which should stop the producer) ... |
| history.stop(); |
| history = new PulsarDatabaseHistory(); |
| history.configure(config, null, DatabaseHistoryListener.NOOP, true); |
| // no need to start |
| |
| // Recover from the very beginning to just past the first change ... |
| Tables recoveredTables = new Tables(); |
| setLogPosition(15); |
| history.recover(source, position, recoveredTables, recoveryParser); |
| assertEquals(recoveredTables, tables1); |
| |
| // Recover from the very beginning to just past the second change ... |
| recoveredTables = new Tables(); |
| setLogPosition(50); |
| history.recover(source, position, recoveredTables, recoveryParser); |
| assertEquals(recoveredTables, tables2); |
| |
| // Recover from the very beginning to just past the third change ... |
| recoveredTables = new Tables(); |
| setLogPosition(10010); |
| history.recover(source, position, recoveredTables, recoveryParser); |
| assertEquals(recoveredTables, tables3); |
| |
| // Recover from the very beginning to way past the third change ... |
| recoveredTables = new Tables(); |
| setLogPosition(100000010); |
| history.recover(source, position, recoveredTables, recoveryParser); |
| assertEquals(recoveredTables, tables3); |
| } |
| |
| protected void setLogPosition(int index) { |
| this.position = Collect.hashMapOf("filename", "my-txn-file.log", |
| "position", index); |
| } |
| |
| @Test |
| public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception { |
| // Create the empty topic ... |
| testHistoryTopicContent(false); |
| } |
| |
| @Test |
| public void shouldIgnoreUnparseableMessages() throws Exception { |
| try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) |
| .topic(topicName) |
| .create() |
| ) { |
| producer.send(""); |
| producer.send("{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); |
| producer.send("{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); |
| producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\""); |
| producer.send("\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}"); |
| producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); |
| } |
| |
| testHistoryTopicContent(true); |
| } |
| |
| @Test(expectedExceptions = ParsingException.class) |
| public void shouldStopOnUnparseableSQL() throws Exception { |
| try (final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { |
| producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}"); |
| } |
| |
| testHistoryTopicContent(false); |
| } |
| |
| |
| @Test |
| public void testExists() { |
| // happy path |
| testHistoryTopicContent(true); |
| assertTrue(history.exists()); |
| |
| // Set history to use dummy topic |
| Configuration config = Configuration.create() |
| .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) |
| .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") |
| .with(DatabaseHistory.NAME, "my-db-history") |
| .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) |
| .build(); |
| |
| history.configure(config, null, DatabaseHistoryListener.NOOP, true); |
| history.start(); |
| |
| // dummytopic should not exist yet |
| assertFalse(history.exists()); |
| } |
| } |