| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.servicecomb.pack.alpha.server.tcc; |
| |
| import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.awaitility.Awaitility.await; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertThat; |
| |
| import io.grpc.ManagedChannel; |
| |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| |
| import org.apache.servicecomb.pack.alpha.server.tcc.jpa.TccTxType; |
| import org.apache.servicecomb.pack.alpha.server.tcc.callback.GrpcOmegaTccCallback; |
| import org.apache.servicecomb.pack.alpha.server.tcc.callback.OmegaCallbacksRegistry; |
| import org.apache.servicecomb.pack.alpha.server.tcc.jpa.TccTxEvent; |
| import org.apache.servicecomb.pack.alpha.server.tcc.jpa.EventConverter; |
| import org.apache.servicecomb.pack.alpha.server.tcc.service.TccTxEventRepository; |
| import org.apache.servicecomb.pack.common.TransactionStatus; |
| import org.apache.servicecomb.pack.contract.grpc.*; |
| import org.apache.servicecomb.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub; |
| import org.apache.servicecomb.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub; |
| import org.hamcrest.core.Is; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Test; |
| import org.springframework.beans.factory.annotation.Autowired; |
| |
| public abstract class AlphaTccServerTestBase { |
| |
| protected static ManagedChannel clientChannel; |
| |
| private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel); |
| |
| private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel); |
| |
| private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); |
| |
| private final TccCoordinateCommandStreamObserver commandStreamObserver = |
| new TccCoordinateCommandStreamObserver(this::onReceivedCoordinateCommand, receivedCommands); |
| |
| private final String globalTxId = UUID.randomUUID().toString(); |
| private final String localTxId = UUID.randomUUID().toString(); |
| private final String parentTxId = UUID.randomUUID().toString(); |
| private final String confirmMethod = "confirm"; |
| private final String cancelMethod = "cancel"; |
| |
| private final String serviceName = uniquify("serviceName"); |
| private final String instanceId = uniquify("instanceId"); |
| |
| private final GrpcServiceConfig serviceConfig = GrpcServiceConfig.newBuilder() |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .build(); |
| |
| @Autowired |
| private TccTxEventRepository tccTxEventRepository; |
| |
| |
| @AfterClass |
| public static void tearDown() { |
| clientChannel.shutdown(); |
| clientChannel = null; |
| } |
| |
| @After |
| public void after() { |
| blockingStub.onDisconnected(serviceConfig); |
| tccTxEventRepository.deleteAll(); |
| } |
| |
| @Test |
| public void assertOnConnect() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| assertThat( |
| OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class)) |
| ); |
| } |
| |
| @Test |
| public void assertOnDisConnect() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| assertThat( |
| OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class)) |
| ); |
| blockingStub.onDisconnected(serviceConfig); |
| await().atMost(2, SECONDS).until(()->commandStreamObserver.isCompleted()); |
| } |
| |
| private void awaitUntilConnected() { |
| await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName))); |
| } |
| |
| @Test |
| public void assertOnTransactionStart() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); |
| List<TccTxEvent> events = tccTxEventRepository.findByGlobalTxId(globalTxId).get(); |
| assertThat(events.size(), is(2)); |
| |
| Iterator<TccTxEvent> iterator = events.iterator(); |
| TccTxEvent event = iterator.next(); |
| assertThat(event.getGlobalTxId(), is(globalTxId)); |
| assertThat(event.getLocalTxId(), is(localTxId)); |
| assertThat(event.getInstanceId(), is(instanceId)); |
| assertThat(event.getServiceName(), is(serviceName)); |
| assertThat(event.getTxType(), Is.is(TccTxType.STARTED.name())); |
| assertThat(event.getStatus(), is(TransactionStatus.Succeed.name())); |
| |
| event = iterator.next(); |
| assertThat(event.getGlobalTxId(), is(globalTxId)); |
| assertThat(event.getLocalTxId(), is(localTxId)); |
| assertThat(event.getInstanceId(), is(instanceId)); |
| assertThat(event.getServiceName(), is(serviceName)); |
| assertThat(event.getTxType(), is(TccTxType.ENDED.name())); |
| assertThat(event.getStatus(), is(TransactionStatus.Succeed.name())); |
| } |
| |
| @Test |
| public void assertOnParticipated() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| List<TccTxEvent> events = tccTxEventRepository.findByGlobalTxId(globalTxId).get(); |
| |
| assertThat(events.size(), is(2)); |
| |
| TccTxEvent event = events.get(0); |
| assertThat(event.getGlobalTxId(), is(globalTxId)); |
| assertThat(event.getLocalTxId(), is(localTxId)); |
| assertThat(event.getInstanceId(), is(instanceId)); |
| assertThat(event.getServiceName(), is(serviceName)); |
| assertThat(event.getTxType(), is(TccTxType.P_TX_STATED.name())); |
| assertThat(EventConverter.getMethodName(event.getMethodInfo(), true), is(confirmMethod)); |
| assertThat(EventConverter.getMethodName(event.getMethodInfo(), false), is(cancelMethod)); |
| |
| event = events.get(1); |
| assertThat(event.getGlobalTxId(), is(globalTxId)); |
| assertThat(event.getLocalTxId(), is(localTxId)); |
| assertThat(event.getInstanceId(), is(instanceId)); |
| assertThat(event.getServiceName(), is(serviceName)); |
| assertThat(event.getTxType(), is(TccTxType.P_TX_ENDED.name())); |
| assertThat(EventConverter.getMethodName(event.getMethodInfo(), true), is(confirmMethod)); |
| assertThat(EventConverter.getMethodName(event.getMethodInfo(), false), is(cancelMethod)); |
| assertThat(event.getStatus(), is("Succeed")); |
| } |
| |
| @Test |
| public void assertOnTccTransactionSucceedEnded() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); |
| |
| await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); |
| assertThat(receivedCommands.size(), is(1)); |
| GrpcTccCoordinateCommand command = receivedCommands.poll(); |
| assertThat(command.getMethod(), is("confirm")); |
| assertThat(command.getGlobalTxId(), is(globalTxId)); |
| assertThat(command.getServiceName(), is(serviceName)); |
| |
| GrpcAck result = blockingStub.onTccCoordinated(newCoordinatedEvent("Succeed", "Confirm")); |
| assertThat(result.getAborted(), is(false)); |
| } |
| |
| @Test |
| public void assertOnTccTransactionFailedEnded() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| blockingStub.onTccTransactionEnded(newTxEnd("Failed")); |
| |
| await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); |
| assertThat(receivedCommands.size(), is(1)); |
| GrpcTccCoordinateCommand command = receivedCommands.poll(); |
| assertThat(command.getMethod(), is("cancel")); |
| assertThat(command.getGlobalTxId(), is(globalTxId)); |
| assertThat(command.getServiceName(), is(serviceName)); |
| assertThat(commandStreamObserver.isCompleted(), is(false)); |
| } |
| |
| @Test |
| public void assertOnCallbackNotExist() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| |
| OmegaCallbacksRegistry.getRegistry().remove(serviceName); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); |
| assertThat(result.getAborted(), is(true)); |
| } |
| |
| @Test |
| public void assertOnCallbacksExecuteError() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| awaitUntilConnected(); |
| |
| OmegaCallbacksRegistry.getRegistry().get(serviceName).put(instanceId, new GrpcOmegaTccCallback(null)); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); |
| |
| assertThat(result.getAborted(), is(true)); |
| assertThat(OmegaCallbacksRegistry.getRegistry().get(serviceName).size(), is(0)); |
| } |
| |
| @Test |
| public void assertOnSwitchOtherCallbackInstance() { |
| asyncStub.onConnected(serviceConfig, commandStreamObserver); |
| GrpcServiceConfig config = GrpcServiceConfig.newBuilder() |
| .setServiceName(serviceName) |
| .setInstanceId(uniquify("instanceId")) |
| .build(); |
| asyncStub.onConnected(config, commandStreamObserver); |
| |
| await().atMost(2, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName) != null)); |
| await().atMost(2, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName).size() == 2)); |
| |
| OmegaCallbacksRegistry.getRegistry().get(serviceName).remove(instanceId); |
| blockingStub.onTccTransactionStarted(newTxStart()); |
| blockingStub.onParticipationStarted(newParticipationStartedEvent()); |
| blockingStub.onParticipationEnded(newParticipationEndedEvent("Succeed")); |
| GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); |
| |
| await().atMost(4, SECONDS).until(() -> !receivedCommands.isEmpty()); |
| assertThat(receivedCommands.size(), is(1)); |
| GrpcTccCoordinateCommand command = receivedCommands.poll(); |
| assertThat(command.getMethod(), is("confirm")); |
| assertThat(command.getGlobalTxId(), is(globalTxId)); |
| assertThat(command.getServiceName(), is(serviceName)); |
| |
| assertThat(result.getAborted(), is(false)); |
| } |
| |
| private GrpcParticipationStartedEvent newParticipationStartedEvent() { |
| return GrpcParticipationStartedEvent.newBuilder() |
| .setGlobalTxId(globalTxId) |
| .setLocalTxId(localTxId) |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .setConfirmMethod(confirmMethod) |
| .setCancelMethod(cancelMethod) |
| .build(); |
| } |
| |
| private GrpcParticipationEndedEvent newParticipationEndedEvent(String status) { |
| return GrpcParticipationEndedEvent.newBuilder() |
| .setGlobalTxId(globalTxId) |
| .setLocalTxId(localTxId) |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .setConfirmMethod(confirmMethod) |
| .setCancelMethod(cancelMethod) |
| .setStatus(status) |
| .build(); |
| } |
| |
| private GrpcTccTransactionStartedEvent newTxStart() { |
| return GrpcTccTransactionStartedEvent.newBuilder() |
| .setGlobalTxId(globalTxId) |
| .setLocalTxId(localTxId) |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .setLocalTxId(localTxId) |
| .build(); |
| } |
| |
| private GrpcTccTransactionEndedEvent newTxEnd(String status) { |
| return GrpcTccTransactionEndedEvent.newBuilder() |
| .setGlobalTxId(globalTxId) |
| .setLocalTxId(localTxId) |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .setLocalTxId(localTxId) |
| .setStatus(status) |
| .build(); |
| } |
| |
| private GrpcTccCoordinatedEvent newCoordinatedEvent(String status, String method) { |
| return GrpcTccCoordinatedEvent.newBuilder() |
| .setGlobalTxId(globalTxId) |
| .setLocalTxId(localTxId) |
| .setServiceName(serviceName) |
| .setInstanceId(instanceId) |
| .setMethodName(method) |
| .setStatus(status) |
| .build(); |
| } |
| |
| private GrpcAck onReceivedCoordinateCommand(GrpcTccCoordinateCommand command) { |
| return GrpcAck.newBuilder().setAborted(false).build(); |
| } |
| } |