Added test for the case that rhythm was inactive for a few days and
needs to "make up" beats. Because this required changing db data in the
test I discovered a locking bug by writing this test. The locking changes
I made here may be sufficient to allow for a concurrent deployment of rhythm.
Maybe. But that should be tested to be sure.
diff --git a/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java b/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
index 0af10b2..f3c3f1b 100644
--- a/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
+++ b/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
@@ -64,6 +64,7 @@
/**
* @author Myrle Krantz
*/
+@SuppressWarnings("SpringAutowiredFieldsWarningInspection")
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
classes = {AbstractRhythmTest.TestConfiguration.class},
@@ -117,7 +118,7 @@
EventRecorder eventRecorder;
@MockBean
- BeatPublisherService beatPublisherServiceSpy;
+ BeatPublisherService beatPublisherServiceMock;
@Autowired
@Qualifier(LOGGER_NAME)
@@ -148,9 +149,17 @@
final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
int alignmentHour = now.getHour();
final LocalDateTime expectedBeatTimestamp = getExpectedBeatTimestamp(now, alignmentHour);
+
+ Mockito.doAnswer(new Returns(true)).when(beatPublisherServiceMock).publishBeat(
+ Matchers.eq(beatIdentifier),
+ Matchers.eq(tenantDataStoreContext.getTenantName()),
+ Matchers.eq(applicationIdentifier),
+ Matchers.eq(expectedBeatTimestamp));
+
final Beat ret = createBeat(applicationIdentifier, beatIdentifier, alignmentHour, expectedBeatTimestamp);
- Mockito.verify(beatPublisherServiceSpy, Mockito.timeout(2_000).times(1)).publishBeat(beatIdentifier, tenantDataStoreContext.getTenantName(), applicationIdentifier, expectedBeatTimestamp);
+ Mockito.verify(beatPublisherServiceMock, Mockito.timeout(2_000).times(1))
+ .publishBeat(beatIdentifier, tenantDataStoreContext.getTenantName(), applicationIdentifier, expectedBeatTimestamp);
return ret;
}
@@ -182,15 +191,15 @@
beat.setIdentifier(beatIdentifier);
beat.setAlignmentHour(alignmentHour);
- Mockito.doAnswer(new AnswerWithDelay<>(2_000, new Returns(Optional.of(PermittableGroupIds.forApplication(applicationIdentifier))))).when(beatPublisherServiceSpy).requestPermissionForBeats(Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier));
- Mockito.doAnswer(new AnswerWithDelay<>(2_000, new Returns(true))).when(beatPublisherServiceSpy).publishBeat(Matchers.eq(beatIdentifier), Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier),
+ Mockito.doAnswer(new AnswerWithDelay<>(2_000, new Returns(Optional.of(PermittableGroupIds.forApplication(applicationIdentifier))))).when(beatPublisherServiceMock).requestPermissionForBeats(Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier));
+ Mockito.doAnswer(new AnswerWithDelay<>(2_000, new Returns(true))).when(beatPublisherServiceMock).publishBeat(Matchers.eq(beatIdentifier), Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier),
AdditionalMatchers.or(Matchers.eq(expectedBeatTimestamp), Matchers.eq(getNextTimeStamp(expectedBeatTimestamp))));
this.testSubject.createBeat(applicationIdentifier, beat);
Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(applicationIdentifier, beat.getIdentifier())));
- Mockito.verify(beatPublisherServiceSpy, Mockito.timeout(2_500).times(1)).requestPermissionForBeats(tenantIdentifier, applicationIdentifier);
+ Mockito.verify(beatPublisherServiceMock, Mockito.timeout(2_500).times(1)).requestPermissionForBeats(tenantIdentifier, applicationIdentifier);
return beat;
}
diff --git a/component-test/src/main/java/io/mifos/rhythm/TestBeats.java b/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
index 1f57da7..7535700 100644
--- a/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
+++ b/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
@@ -19,11 +19,15 @@
import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
+import io.mifos.rhythm.service.internal.repository.BeatEntity;
+import io.mifos.rhythm.service.internal.repository.BeatRepository;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import javax.transaction.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
@@ -35,6 +39,9 @@
* @author Myrle Krantz
*/
public class TestBeats extends AbstractRhythmTest {
+ @SuppressWarnings("SpringAutowiredFieldsWarningInspection")
+ @Autowired
+ BeatRepository beatRepository;
@Test
public void shouldCreateBeat() throws InterruptedException {
@@ -93,14 +100,14 @@
final LocalDateTime expectedBeatTimestamp = getExpectedBeatTimestamp(now, beat.getAlignmentHour());
- Mockito.doReturn(Optional.of("boop")).when(beatPublisherServiceSpy).requestPermissionForBeats(Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier));
- Mockito.when(beatPublisherServiceSpy.publishBeat(beatId, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp)).thenReturn(false, false, true);
+ Mockito.doReturn(Optional.of("boop")).when(beatPublisherServiceMock).requestPermissionForBeats(Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier));
+ Mockito.when(beatPublisherServiceMock.publishBeat(beatId, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp)).thenReturn(false, false, true);
this.testSubject.createBeat(applicationIdentifier, beat);
Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(applicationIdentifier, beat.getIdentifier())));
- Mockito.verify(beatPublisherServiceSpy, Mockito.timeout(10_000).times(3)).publishBeat(beatId, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp);
+ Mockito.verify(beatPublisherServiceMock, Mockito.timeout(10_000).times(3)).publishBeat(beatId, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp);
}
@Test
@@ -122,4 +129,40 @@
beats.forEach(x -> Assert.assertTrue(allEntities.contains(x)));
}
+
+ @Test
+ public void shouldBeatForMissingDays() throws InterruptedException {
+ final String applicationIdentifier = "funnybusiness-v6";
+ final String beatIdentifier = "fiddlebeat";
+ createBeatForThisHour(applicationIdentifier, beatIdentifier);
+
+ final int daysAgo = 10;
+ final LocalDateTime nextBeat = setBack(applicationIdentifier, beatIdentifier, daysAgo);
+
+ for (int i = daysAgo; i > 0; i--) {
+ Mockito.verify(beatPublisherServiceMock, Mockito.timeout(4_000).times(1))
+ .publishBeat(beatIdentifier, tenantDataStoreContext.getTenantName(), applicationIdentifier, nextBeat.minusDays(daysAgo));
+ }
+ }
+
+ @Transactional
+ LocalDateTime setBack(
+ final String applicationIdentifier,
+ final String beatIdentifier,
+ final int daysAgo) {
+
+ final BeatEntity beatEntity = beatRepository.findByTenantIdentifierAndApplicationIdentifierAndBeatIdentifier(
+ tenantDataStoreContext.getTenantName(),
+ applicationIdentifier,
+ beatIdentifier).orElseThrow(IllegalStateException::new);
+
+ Mockito.reset(beatPublisherServiceMock);
+ final LocalDateTime nextBeat = beatEntity.getNextBeat();
+
+ beatEntity.setNextBeat(nextBeat.minusDays(daysAgo));
+
+ beatRepository.save(beatEntity);
+
+ return nextBeat;
+ }
}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
index e0bf47b..a59c0ee 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
@@ -18,7 +18,6 @@
import io.mifos.core.command.annotation.Aggregate;
import io.mifos.core.command.annotation.CommandHandler;
import io.mifos.core.command.annotation.CommandLogLevel;
-import io.mifos.core.lang.ServiceException;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.service.ServiceConstants;
@@ -33,8 +32,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Optional;
-
/**
* @author Myrle Krantz
*/
@@ -91,16 +88,10 @@
@CommandHandler(logStart = CommandLogLevel.INFO, logFinish = CommandLogLevel.NONE)
@Transactional
public void process(final DeleteBeatCommand deleteBeatCommand) {
- final Optional<BeatEntity> toDelete = this.beatRepository.findByTenantIdentifierAndApplicationIdentifierAndBeatIdentifier(
+ this.beatRepository.deleteByTenantIdentifierAndApplicationIdentifierAndBeatIdentifier(
deleteBeatCommand.getTenantIdentifier(),
deleteBeatCommand.getApplicationIdentifier(),
deleteBeatCommand.getIdentifier());
- final BeatEntity toDeleteForReal
- = toDelete.orElseThrow(() -> ServiceException.notFound(
- "Beat for the application ''" + deleteBeatCommand.getApplicationIdentifier() +
- "'' with the identifier ''" + deleteBeatCommand.getIdentifier() + "'' not found."));
-
- this.beatRepository.delete(toDeleteForReal);
eventHelper.sendEvent(EventConstants.DELETE_BEAT, deleteBeatCommand.getTenantIdentifier(),
new BeatEvent(deleteBeatCommand.getApplicationIdentifier(), deleteBeatCommand.getIdentifier()));
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java
index 022b64a..b195b93 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java
@@ -16,8 +16,10 @@
package io.mifos.rhythm.service.internal.repository;
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Lock;
import org.springframework.stereotype.Repository;
+import javax.persistence.LockModeType;
import java.util.Optional;
/**
@@ -25,6 +27,7 @@
*/
@Repository
public interface ApplicationRepository extends JpaRepository<ApplicationEntity, Long> {
+ @Lock(LockModeType.PESSIMISTIC_WRITE)
void deleteByTenantIdentifierAndApplicationIdentifier(String tenantIdentifier, String applicationIdentifier);
Optional<ApplicationEntity> findByTenantIdentifierAndApplicationIdentifier(String tenantIdentifier, String applicationIdentifier);
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
index 05f0a4e..c419de2 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
@@ -16,8 +16,10 @@
package io.mifos.rhythm.service.internal.repository;
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Lock;
import org.springframework.stereotype.Repository;
+import javax.persistence.LockModeType;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
@@ -28,11 +30,18 @@
*/
@Repository
public interface BeatRepository extends JpaRepository<BeatEntity, Long> {
+ @Lock(LockModeType.PESSIMISTIC_WRITE)
void deleteByTenantIdentifierAndApplicationIdentifier
- (String tenantIdentifier, String applicationIdentifier);
+ (String tenantIdentifier, String applicationIdentifier);
+ @Lock(LockModeType.PESSIMISTIC_WRITE)
+ void deleteByTenantIdentifierAndApplicationIdentifierAndBeatIdentifier
+ (String tenantIdentifier,
+ String applicationIdentifier,
+ String beatIdentifier);
List<BeatEntity> findByTenantIdentifierAndApplicationIdentifier
(String tenantIdentifier, String applicationIdentifier);
Optional<BeatEntity> findByTenantIdentifierAndApplicationIdentifierAndBeatIdentifier
(String tenantIdentifier, String applicationIdentifier, String beatIdentifier);
+ @Lock(LockModeType.PESSIMISTIC_WRITE)
Stream<BeatEntity> findByNextBeatBefore(LocalDateTime currentTime);
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/service/Drummer.java b/service/src/main/java/io/mifos/rhythm/service/internal/service/Drummer.java
index 88a21ca..e0851cd 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/service/Drummer.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/service/Drummer.java
@@ -47,10 +47,10 @@
@Autowired
public Drummer(
- final IdentityPermittableGroupService identityPermittableGroupService,
- final BeatPublisherService beatPublisherService,
- final BeatRepository beatRepository,
- @Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
+ final IdentityPermittableGroupService identityPermittableGroupService,
+ final BeatPublisherService beatPublisherService,
+ final BeatRepository beatRepository,
+ @Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
this.identityPermittableGroupService = identityPermittableGroupService;
this.beatPublisherService = beatPublisherService;
this.beatRepository = beatRepository;
diff --git a/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java b/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
index a795a8b..63ab292 100644
--- a/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
+++ b/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
@@ -81,7 +81,7 @@
@PathVariable("beatidentifier") final String beatIdentifier) {
return this.beatService.findByIdentifier(tenantIdentifier, applicationIdentifier, beatIdentifier)
.map(ResponseEntity::ok)
- .orElseThrow(() -> ServiceException.notFound("Instance with identifier " + applicationIdentifier + " doesn't exist."));
+ .orElseThrow(() -> ServiceException.notFound("Instance with identifier ''" + applicationIdentifier + "'' doesn''t exist."));
}
@Permittable(value = AcceptedTokenType.SYSTEM, permittedEndpoint = "/applications/{applicationidentifier}/beats", acceptTokenIntendedForForeignApplication = true) //Allow apps to use this endpoint in their provisioning code.