blob: ad334ee7e795f120e1775788f268e7c4930742a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.pack.alpha.server.tcc.service;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.stub.StreamObserver;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.GlobalTxEventRepository;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.ParticipatedEvent;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.ParticipatedEventRepository;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.TccTxType;
import org.apache.servicecomb.pack.alpha.server.tcc.TccApplication;
import org.apache.servicecomb.pack.alpha.server.tcc.callback.OmegaCallbacksRegistry;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.GlobalTxEvent;
import org.apache.servicecomb.pack.alpha.server.tcc.jpa.TccTxEvent;
import org.apache.servicecomb.pack.common.TransactionStatus;
import org.apache.servicecomb.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.pack.contract.grpc.GrpcTccCoordinateCommand;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TccApplication.class}, properties = {
"spring.jpa.show-sql=true"
})
@ActiveProfiles("tccTest")
public class TccTxEventServiceTest {
@Autowired
private TccTxEventService tccTxEventService;
@Autowired
private TccTxEventRepository tccTxEventRepository;
@Autowired
private GlobalTxEventRepository globalTxEventRepository;
@Autowired
private ParticipatedEventRepository participatedEventRepository;
private final String globalTxId = uniquify("globalTxId");
private final String localTxId = uniquify("localTxId");
private final String parentTxId = uniquify("parentTxId");
private final String confirmMethod = "confirm";
private final String cancelMethod = "cancel";
private final String serviceName = uniquify("serviceName");
private final String instanceId = uniquify("instanceId");
private final GrpcServiceConfig serviceConfig = GrpcServiceConfig.newBuilder()
.setServiceName(serviceName)
.setInstanceId(instanceId)
.build();
private GlobalTxEvent tccStartEvent;
private ParticipatedEvent participationStartedEvent;
private ParticipatedEvent participationEndedEvent;
private GlobalTxEvent tccEndEvent;
private TccTxEvent coordinateEvent;
@Before
public void setup() {
tccStartEvent = newGlobalTxEvent(TccTxType.STARTED, globalTxId, TransactionStatus.Succeed);
participationStartedEvent = newParticipationStartedEvent(globalTxId);
participationEndedEvent = newParticipationEndedEvent(globalTxId, TransactionStatus.Succeed);
tccEndEvent = newGlobalTxEvent(TccTxType.ENDED, globalTxId, TransactionStatus.Succeed);
coordinateEvent = newTccTxEvent(TccTxType.COORDINATED, globalTxId, TransactionStatus.Succeed);
}
@After
public void teardown() {
}
@Test
public void onlyCoordinateParticipatedEventOnce() {
StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
OmegaCallbacksRegistry.register(serviceConfig, observer);
tccTxEventService.onTccStartedEvent(tccStartEvent);
tccTxEventService.onParticipationStartedEvent(participationStartedEvent);
tccTxEventService.onParticipationEndedEvent(participationEndedEvent);
tccTxEventService.onTccEndedEvent(tccEndEvent);
tccTxEventService.onCoordinatedEvent(coordinateEvent);
verify(observer).onNext(any());
// if end command was send by twice, coordinate should only be executed once.
tccTxEventService.onTccEndedEvent(tccEndEvent);
verify(observer).onNext(any());
}
@Test
public void handleTimeoutGlobalTraction() throws InterruptedException {
StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
OmegaCallbacksRegistry.register(serviceConfig, observer);
tccTxEventService.onTccStartedEvent(tccStartEvent);
tccTxEventService.onParticipationStartedEvent(participationStartedEvent);
tccTxEventService.onParticipationEndedEvent(participationEndedEvent);
Thread.sleep(3000l);
Date deadLine = new Date(System.currentTimeMillis() - SECONDS.toMillis(2));
tccTxEventService.handleTimeoutTx(deadLine, 1);
// global tx has timeout, so participated event will be coordinated through cancel.
Optional<GlobalTxEvent> timeoutEvent = globalTxEventRepository.findByUniqueKey(globalTxId, localTxId, TccTxType.END_TIMEOUT.name());
assertThat(timeoutEvent.isPresent(), is(true));
assertThat(timeoutEvent.get().getStatus(), is(TransactionStatus.Failed.name()));
assertThat(timeoutEvent.get().getTxType(), is(TccTxType.END_TIMEOUT.name()));
assertThat(timeoutEvent.get().getGlobalTxId(), is(globalTxId));
assertThat(timeoutEvent.get().getLocalTxId(), is(localTxId));
assertThat(timeoutEvent.get().getParentTxId(), is(parentTxId));
assertThat(timeoutEvent.get().getServiceName(), is(serviceName));
verify(observer).onNext(any());
Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
assertThat(events.get().size(), is(4));
}
@Test
public void clearUpCompletedTxFromGlobalTxTable() {
StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
OmegaCallbacksRegistry.register(serviceConfig, observer);
tccTxEventService.onTccStartedEvent(tccStartEvent);
tccTxEventService.onParticipationStartedEvent(participationStartedEvent);
tccTxEventService.onParticipationEndedEvent(participationEndedEvent);
tccTxEventService.onTccEndedEvent(tccEndEvent);
tccTxEventService.onCoordinatedEvent(coordinateEvent);
tccTxEventService.clearCompletedGlobalTx(1);
assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
assertThat(events.get().size(), is(5));
}
@Test
public void clearUpCompletedTxFromGlobalTxTableMoreThanOne() {
StreamObserver<GrpcTccCoordinateCommand> observer = mock(StreamObserver.class);
OmegaCallbacksRegistry.register(serviceConfig, observer);
// one global tx
tccTxEventService.onTccStartedEvent(tccStartEvent);
tccTxEventService.onParticipationStartedEvent(participationStartedEvent);
tccTxEventService.onParticipationEndedEvent(participationEndedEvent);
tccTxEventService.onTccEndedEvent(tccEndEvent);
tccTxEventService.onCoordinatedEvent(coordinateEvent);
// another global tx
String globalTxId_2 = uniquify("globalTxId");
tccTxEventService.onTccStartedEvent(newGlobalTxEvent(TccTxType.STARTED, globalTxId_2, TransactionStatus.Succeed));
tccTxEventService.onParticipationStartedEvent(newParticipationStartedEvent(globalTxId_2));
tccTxEventService.onParticipationEndedEvent(newParticipationEndedEvent(globalTxId_2, TransactionStatus.Succeed));
tccTxEventService.onTccEndedEvent(newGlobalTxEvent(TccTxType.ENDED, globalTxId_2, TransactionStatus.Succeed));
tccTxEventService.onCoordinatedEvent(newTccTxEvent(TccTxType.COORDINATED, globalTxId_2, TransactionStatus.Succeed));
tccTxEventService.clearCompletedGlobalTx(2);
assertThat(participatedEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
assertThat(globalTxEventRepository.findByGlobalTxId(globalTxId).isPresent(), is(false));
Optional<List<TccTxEvent>> events = tccTxEventRepository.findByGlobalTxId(globalTxId);
assertThat(events.get().size(), is(5));
events = tccTxEventRepository.findByGlobalTxId(globalTxId_2);
assertThat(events.get().size(), is(5));
}
private ParticipatedEvent newParticipationStartedEvent(String globalTxId) {
return new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
parentTxId, confirmMethod, cancelMethod, "");
}
private ParticipatedEvent newParticipationEndedEvent(String globalTxId, TransactionStatus transactionStatus) {
return new ParticipatedEvent(serviceName, instanceId, globalTxId, localTxId,
parentTxId, confirmMethod, cancelMethod, transactionStatus.name());
}
private GlobalTxEvent newGlobalTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
return new GlobalTxEvent(serviceName, instanceId, globalTxId,
localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
}
private TccTxEvent newTccTxEvent(TccTxType tccTxType, String globalTxId, TransactionStatus transactionStatus) {
return new TccTxEvent(serviceName, instanceId, globalTxId,
localTxId, parentTxId, tccTxType.name(), transactionStatus.name());
}
}