blob: 42eadfe98d87c7b5ebbdf0f7e236c0b19921ca6d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
public class TransactionClientConnectTest extends TransactionTestBase {
private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
private static final int NUM_PARTITIONS = 1;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setBrokerCount(1);
super.internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace("public/txn", 10);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
protected void cleanup() {
super.internalCleanup();
}
@Test
public void testTransactionNewReconnect() throws Exception {
start();
// when throw CoordinatorNotFoundException client will reconnect tc
try {
pulsarClient.newTransaction()
.withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
reconnect();
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
// tc fence will remove this tc and reopen
try {
pulsarClient.newTransaction()
.withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
fail();
} catch (ExecutionException e) {
assertEquals(e.getCause().getMessage(),
"org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException: " +
"java.lang.Exception: Attempted to use a fenced managed ledger");
}
reconnect();
}
@Test
public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
start();
try {
transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
reconnect();
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
transactionCoordinatorClient.addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test").get();
fail();
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
} else {
assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
}
}
reconnect();
}
@Test
public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
start();
try {
transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
reconnect();
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
} else {
assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
}
}
reconnect();
}
@Test
public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
start();
try {
transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
reconnect();
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
} else {
assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
}
}
reconnect();
}
@Test
public void testTransactionAddPublishPartitionToTxnReconnect() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
start();
try {
transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
Collections.singletonList("test")).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
reconnect();
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
Collections.singletonList("test")).get();
fail();
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException) {
assertEquals(e.getCause().getMessage(), "The transaction with this txdID `(0,0)`not found ");
} else {
assertEquals(e.getCause().getMessage(), "java.lang.Exception: Attempted to use a fenced managed ledger");
}
}
reconnect();
}
@Test
public void testPulsarClientCloseThenCloseTcClient() throws Exception {
TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
field.setAccessible(true);
TransactionMetaStoreHandler[] handlers =
(TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient);
for (TransactionMetaStoreHandler handler : handlers) {
handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
}
pulsarClient.close();
for (TransactionMetaStoreHandler handler : handlers) {
Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
method.setAccessible(true);
assertEquals(method.invoke(handler).toString(), "Closed");
try {
handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
} catch (ExecutionException | InterruptedException e) {
assertTrue(e.getCause()
instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
}
}
}
public void start() throws Exception {
// wait transaction coordinator init success
Awaitility.await().until(() -> {
try {
pulsarClient.newTransaction()
.withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
} catch (Exception e) {
return false;
}
return true;
});
pulsarClient.newTransaction()
.withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
TransactionMetadataStoreService transactionMetadataStoreService =
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
// remove transaction metadata store
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get();
}
public void fence(TransactionMetadataStoreService transactionMetadataStoreService) throws Exception {
Field field = ManagedLedgerImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(((MLTransactionMetadataStore) transactionMetadataStoreService.getStores()
.get(TransactionCoordinatorID.get(0))).getManagedLedger(), ManagedLedgerImpl.State.Fenced);
}
public void reconnect() {
//reconnect
Awaitility.await().until(() -> {
try {
pulsarClient.newTransaction()
.withTransactionTimeout(200, TimeUnit.MILLISECONDS).build().get();
} catch (Exception e) {
return false;
}
return true;
});
}
}