Merge pull request #5 from myrle-krantz/develop
Progress on making sure events are sent the right number of times.
diff --git a/README.md b/README.md
index 186e8ec..61d60e3 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[![Join the chat at https://gitter.im/mifos-initiative/mifos.io](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mifos-initiative/mifos.io?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
-This project provides a heart beat for other microservices which need to start jobs to be executed once and only once at a defined time.
+This project provides a heart beatPublish for other microservices which need to start jobs to be executed once and only once at a defined time.
## Abstract
Mifos I/O is an application framework for digital financial services, a system to support nationwide and cross-national financial transactions and help to level and speed the creation of an inclusive, interconnected digital economy for every nation in the world.
diff --git a/api/src/main/java/io/mifos/rhythm/api/v1/client/RhythmManager.java b/api/src/main/java/io/mifos/rhythm/api/v1/client/RhythmManager.java
index eb9b25b..c3ac951 100644
--- a/api/src/main/java/io/mifos/rhythm/api/v1/client/RhythmManager.java
+++ b/api/src/main/java/io/mifos/rhythm/api/v1/client/RhythmManager.java
@@ -16,7 +16,6 @@
package io.mifos.rhythm.api.v1.client;
import io.mifos.core.api.util.CustomFeignClientsConfiguration;
-import io.mifos.rhythm.api.v1.domain.Application;
import io.mifos.rhythm.api.v1.domain.Beat;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
@@ -34,29 +33,6 @@
public interface RhythmManager {
@RequestMapping(
- value = "/applications",
- method = RequestMethod.GET,
- produces = MediaType.ALL_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE
- )
- List<Application> getAllApplications();
-
- @RequestMapping(
- value = "/applications/{applicationname}",
- method = RequestMethod.GET,
- produces = MediaType.ALL_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE)
- Application getApplication(@PathVariable("applicationname") final String applicationName);
-
- @RequestMapping(
- value = "/applications",
- method = RequestMethod.POST,
- produces = MediaType.APPLICATION_JSON_VALUE,
- consumes = MediaType.APPLICATION_JSON_VALUE
- )
- void createApplication(final Application application);
-
- @RequestMapping(
value = "/applications/{applicationname}",
method = RequestMethod.DELETE,
produces = MediaType.ALL_VALUE,
diff --git a/api/src/main/java/io/mifos/rhythm/api/v1/domain/Application.java b/api/src/main/java/io/mifos/rhythm/api/v1/domain/Application.java
deleted file mode 100644
index 85e0c8c..0000000
--- a/api/src/main/java/io/mifos/rhythm/api/v1/domain/Application.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.api.v1.domain;
-
-import io.mifos.core.lang.validation.constraints.ValidApplicationName;
-
-/**
- * @author Myrle Krantz
- */
-@SuppressWarnings({"WeakerAccess", "unused"})
-public class Application {
- @ValidApplicationName
- private String applicationName;
-
- public Application() {
- super();
- }
-
- public Application(final String applicationName) {
- this.applicationName = applicationName;
- }
-
- public String getApplicationName() {
- return this.applicationName;
- }
-
- public void setApplicationName(final String applicationName) {
- this.applicationName = applicationName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Application application = (Application) o;
-
- return applicationName != null ? applicationName.equals(application.applicationName) : application.applicationName == null;
-
- }
-
- @Override
- public int hashCode() {
- return applicationName != null ? applicationName.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return "Application{" +
- "applicationName='" + applicationName + '\'' +
- '}';
- }
-}
diff --git a/api/src/main/java/io/mifos/rhythm/api/v1/domain/Beat.java b/api/src/main/java/io/mifos/rhythm/api/v1/domain/Beat.java
index da66396..115ce32 100644
--- a/api/src/main/java/io/mifos/rhythm/api/v1/domain/Beat.java
+++ b/api/src/main/java/io/mifos/rhythm/api/v1/domain/Beat.java
@@ -32,6 +32,11 @@
public Beat() {
}
+ public Beat(String identifier, Integer alignmentHour) {
+ this.identifier = identifier;
+ this.alignmentHour = alignmentHour;
+ }
+
public Beat(String identifier) {
this.identifier = identifier;
}
diff --git a/api/src/main/java/io/mifos/rhythm/api/v1/events/BeatEvent.java b/api/src/main/java/io/mifos/rhythm/api/v1/events/BeatEvent.java
index 184bdd8..807c02b 100644
--- a/api/src/main/java/io/mifos/rhythm/api/v1/events/BeatEvent.java
+++ b/api/src/main/java/io/mifos/rhythm/api/v1/events/BeatEvent.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package io.mifos.rhythm.api.v1.events;
import java.util.Objects;
diff --git a/api/src/main/java/io/mifos/rhythm/api/v1/events/EventConstants.java b/api/src/main/java/io/mifos/rhythm/api/v1/events/EventConstants.java
index 92b9ac9..878999c 100644
--- a/api/src/main/java/io/mifos/rhythm/api/v1/events/EventConstants.java
+++ b/api/src/main/java/io/mifos/rhythm/api/v1/events/EventConstants.java
@@ -24,12 +24,10 @@
String DESTINATION = "rhythm-v1";
String SELECTOR_NAME = "action";
String INITIALIZE = "initialize";
- String POST_APPLICATION = "post-application";
String POST_BEAT = "post-beat";
String DELETE_APPLICATION = "delete-application";
String DELETE_BEAT = "delete-beat";
String SELECTOR_INITIALIZE = SELECTOR_NAME + " = '" + INITIALIZE + "'";
- String SELECTOR_POST_APPLICATION = SELECTOR_NAME + " = '" + POST_APPLICATION + "'";
String SELECTOR_POST_BEAT = SELECTOR_NAME + " = '" + POST_BEAT + "'";
String SELECTOR_DELETE_APPLICATION = SELECTOR_NAME + " = '" + DELETE_APPLICATION + "'";
String SELECTOR_DELETE_BEAT = SELECTOR_NAME + " = '" + DELETE_BEAT + "'";
diff --git a/api/src/test/java/io/mifos/rhythm/api/v1/domain/ApplicationTest.java b/api/src/test/java/io/mifos/rhythm/api/v1/domain/ApplicationTest.java
deleted file mode 100644
index 42a3efa..0000000
--- a/api/src/test/java/io/mifos/rhythm/api/v1/domain/ApplicationTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.api.v1.domain;
-
-import io.mifos.core.test.domain.ValidationTest;
-import io.mifos.core.test.domain.ValidationTestCase;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * @author Myrle Krantz
- */
-public class ApplicationTest extends ValidationTest<Application> {
-
- public ApplicationTest(ValidationTestCase<Application> testCase) {
- super(testCase);
- }
-
- @Override
- protected Application createValidTestSubject() {
- return new Application("plop-v35");
- }
-
- @Parameterized.Parameters
- public static Collection testCases() {
- final Collection<ValidationTestCase> ret = new ArrayList<>();
- ret.add(new ValidationTestCase<Application>("basicCase")
- .adjustment(x -> {})
- .valid(true));
- ret.add(new ValidationTestCase<Application>("nullApplicationName")
- .adjustment(x -> x.setApplicationName(null))
- .valid(false));
- ret.add(new ValidationTestCase<Application>("tooShortApplicationName")
- .adjustment(x -> x.setApplicationName("z"))
- .valid(false));
- ret.add(new ValidationTestCase<Application>("invalidApplicationName")
- .adjustment(x -> x.setApplicationName("x-x-v-y"))
- .valid(false));
- return ret;
- }
-}
\ No newline at end of file
diff --git a/api/src/test/java/io/mifos/rhythm/api/v1/domain/BeatTest.java b/api/src/test/java/io/mifos/rhythm/api/v1/domain/BeatTest.java
index 6cd26df..b78f3cd 100644
--- a/api/src/test/java/io/mifos/rhythm/api/v1/domain/BeatTest.java
+++ b/api/src/test/java/io/mifos/rhythm/api/v1/domain/BeatTest.java
@@ -33,7 +33,7 @@
@Override
protected Beat createValidTestSubject() {
- return new Beat();
+ return new Beat("boop", 0);
}
@Parameterized.Parameters
diff --git a/build.gradle b/build.gradle
index b30bd55..30d52dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,13 +4,19 @@
dependsOn gradle.includedBuild('api').task(':publishToMavenLocal')
}
+task publishSpiToMavenLocal {
+ dependsOn gradle.includedBuild('spi').task(':publishToMavenLocal')
+}
+
task publishServiceToMavenLocal {
mustRunAfter publishApiToMavenLocal
+ mustRunAfter publishSpiToMavenLocal
dependsOn gradle.includedBuild('service').task(':publishToMavenLocal')
}
task publishComponentTestToMavenLocal {
mustRunAfter publishApiToMavenLocal
+ mustRunAfter publishSpiToMavenLocal
mustRunAfter publishServiceToMavenLocal
dependsOn gradle.includedBuild('component-test').task(':publishToMavenLocal')
}
@@ -18,6 +24,7 @@
task publishToMavenLocal {
group 'all'
dependsOn publishApiToMavenLocal
+ dependsOn publishSpiToMavenLocal
dependsOn publishServiceToMavenLocal
dependsOn publishComponentTestToMavenLocal
}
@@ -31,6 +38,7 @@
task licenseFormat {
group 'all'
dependsOn gradle.includedBuild('api').task(':licenseFormat')
+ dependsOn gradle.includedBuild('spi').task(':licenseFormat')
dependsOn gradle.includedBuild('service').task(':licenseFormat')
dependsOn gradle.includedBuild('component-test').task(':licenseFormat')
}
diff --git a/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java b/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
index e2c1f2c..17862e5 100644
--- a/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
+++ b/component-test/src/main/java/io/mifos/rhythm/AbstractRhythmTest.java
@@ -24,19 +24,23 @@
import io.mifos.core.test.listener.EnableEventRecording;
import io.mifos.core.test.listener.EventRecorder;
import io.mifos.rhythm.api.v1.client.RhythmManager;
-import io.mifos.rhythm.api.v1.domain.Application;
import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.service.RhythmConfiguration;
+import io.mifos.rhythm.service.internal.service.BeatPublisherService;
import org.junit.*;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
+import org.mockito.AdditionalMatchers;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.context.annotation.Bean;
@@ -45,12 +49,20 @@
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+
+import static org.mockito.Matchers.*;
+
/**
* @author Myrle Krantz
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
- classes = {AbstractRhythmTest.TestConfiguration.class})
+ classes = {AbstractRhythmTest.TestConfiguration.class},
+ properties = {"rhythm.user=homer", "rhythm.beatCheckRate=500"}
+)
public class AbstractRhythmTest {
private static final String APP_NAME = "rhythm-v1";
@@ -76,7 +88,7 @@
private final static TestEnvironment testEnvironment = new TestEnvironment(APP_NAME);
private final static CassandraInitializer cassandraInitializer = new CassandraInitializer();
private final static MariaDBInitializer mariaDBInitializer = new MariaDBInitializer();
- private final static TenantDataStoreContextTestRule tenantDataStoreContext = TenantDataStoreContextTestRule.forRandomTenantName(cassandraInitializer, mariaDBInitializer);
+ final static TenantDataStoreContextTestRule tenantDataStoreContext = TenantDataStoreContextTestRule.forRandomTenantName(cassandraInitializer, mariaDBInitializer);
@ClassRule
public static TestRule orderClassRules = RuleChain
@@ -97,6 +109,9 @@
@Autowired
EventRecorder eventRecorder;
+ @SpyBean
+ BeatPublisherService beatPublisherServiceSpy;
+
@Before
public void prepTest() {
userContext = tenantApplicationSecurityEnvironment.createAutoUserContext(TEST_USER);
@@ -116,21 +131,32 @@
}
}
- Application createApplication(final String name) throws InterruptedException {
- final Application application = new Application(name);
- this.testSubject.createApplication(application);
+ Beat createBeat(final String applicationName, final String beatIdentifier) throws InterruptedException {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
- Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_APPLICATION, application.getApplicationName()));
- return application;
- }
-
- Beat createBeat(final Application application, final String beatIdentifier) throws InterruptedException {
final Beat beat = new Beat();
beat.setIdentifier(beatIdentifier);
- beat.setAlignmentHour(0);
- this.testSubject.createBeat(application.getApplicationName(), beat);
+ beat.setAlignmentHour(now.getHour());
- Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(application.getApplicationName(), beat.getIdentifier())));
+ final LocalDateTime expectedBeatTimestamp = getExpectedBeatTimestamp(now, beat.getAlignmentHour());
+ Mockito.doReturn(true).when(beatPublisherServiceSpy).publishBeat(Matchers.eq(beatIdentifier), Matchers.eq(tenantDataStoreContext.getTenantName()), Matchers.eq(applicationName),
+ AdditionalMatchers.or(Matchers.eq(expectedBeatTimestamp), Matchers.eq(getNextTimeStamp(expectedBeatTimestamp))));
+
+ this.testSubject.createBeat(applicationName, beat);
+
+ Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(applicationName, beat.getIdentifier())));
+
+ Mockito.verify(beatPublisherServiceSpy, Mockito.timeout(2_000).times(1)).publishBeat(beatIdentifier, tenantDataStoreContext.getTenantName(), applicationName, expectedBeatTimestamp);
+
return beat;
}
+
+ LocalDateTime getExpectedBeatTimestamp(final LocalDateTime fromTime, final Integer alignmentHour) {
+ final LocalDateTime midnight = fromTime.truncatedTo(ChronoUnit.DAYS);
+ return midnight.plusHours(alignmentHour);
+ }
+
+ private LocalDateTime getNextTimeStamp(final LocalDateTime fromTime) {
+ return fromTime.plusDays(1);
+ }
}
diff --git a/component-test/src/main/java/io/mifos/rhythm/TestApplications.java b/component-test/src/main/java/io/mifos/rhythm/TestApplications.java
deleted file mode 100644
index b65412b..0000000
--- a/component-test/src/main/java/io/mifos/rhythm/TestApplications.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm;
-
-import io.mifos.core.api.util.NotFoundException;
-import io.mifos.rhythm.api.v1.domain.Application;
-import io.mifos.rhythm.api.v1.events.EventConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * @author Myrle Krantz
- */
-public class TestApplications extends AbstractRhythmTest {
- @Test
- public void shouldCreateApplication() throws InterruptedException {
- final Application application = createApplication("funnybusiness-v1");
-
- final Application createdApplication = this.testSubject.getApplication(application.getApplicationName());
- Assert.assertEquals(application, createdApplication);
-
- final List<Application> allEntities = this.testSubject.getAllApplications();
- Assert.assertTrue(allEntities.contains(application));
- }
-
- @Test
- public void shouldListApplications() {
- final List<Application> allEntities = this.testSubject.getAllApplications();
- Assert.assertNotNull(allEntities);
- }
-
- @Test
- public void shouldDeleteApplication() throws InterruptedException {
- final Application application = createApplication("funnybusiness-v2");
-
- this.testSubject.deleteApplication(application.getApplicationName());
- Assert.assertTrue(this.eventRecorder.wait(EventConstants.DELETE_APPLICATION, application.getApplicationName()));
-
- final List<Application> allEntities = this.testSubject.getAllApplications();
- Assert.assertFalse(allEntities.contains(application));
-
- try {
- this.testSubject.getApplication(application.getApplicationName());
- Assert.fail("NotFoundException should be thrown.");
- }
- catch (final NotFoundException ignored) { }
- }
-}
diff --git a/component-test/src/main/java/io/mifos/rhythm/TestBeats.java b/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
index 62e2a67..352005b 100644
--- a/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
+++ b/component-test/src/main/java/io/mifos/rhythm/TestBeats.java
@@ -16,48 +16,86 @@
package io.mifos.rhythm;
import io.mifos.core.api.util.NotFoundException;
-import io.mifos.rhythm.api.v1.domain.Application;
import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.List;
+import static org.mockito.Matchers.*;
+
/**
* @author Myrle Krantz
*/
public class TestBeats extends AbstractRhythmTest {
+
@Test
public void shouldCreateBeat() throws InterruptedException {
- final Application application = createApplication("funnybusiness-v1");
+ final String appName = "funnybusiness-v1";
+ final Beat beat = createBeat(appName, "bebopthedowop");
- final Beat beat = createBeat(application, "bebopthedowop");
-
- final Beat createdBeat = this.testSubject.getBeat(application.getApplicationName(), beat.getIdentifier());
+ final Beat createdBeat = this.testSubject.getBeat(appName, beat.getIdentifier());
Assert.assertEquals(beat, createdBeat);
- final List<Beat> allEntities = this.testSubject.getAllBeatsForApplication(application.getApplicationName());
+ final List<Beat> allEntities = this.testSubject.getAllBeatsForApplication(appName);
Assert.assertTrue(allEntities.contains(beat));
}
@Test
public void shouldDeleteBeat() throws InterruptedException {
- final Application application = createApplication("funnybusiness-v2");
+ final String appName = "funnybusiness-v2";
- final Beat beat = createBeat(application, "bebopthedowop");
+ final Beat beat = createBeat(appName, "bebopthedowop");
- testSubject.deleteBeat(application.getApplicationName(), beat.getIdentifier());
- Assert.assertTrue(this.eventRecorder.wait(EventConstants.DELETE_BEAT, new BeatEvent(application.getApplicationName(), beat.getIdentifier())));
+ testSubject.deleteBeat(appName, beat.getIdentifier());
+ Assert.assertTrue(this.eventRecorder.wait(EventConstants.DELETE_BEAT, new BeatEvent(appName, beat.getIdentifier())));
- final List<Beat> allEntities = this.testSubject.getAllBeatsForApplication(application.getApplicationName());
+ final List<Beat> allEntities = this.testSubject.getAllBeatsForApplication(appName);
Assert.assertFalse(allEntities.contains(beat));
try {
- this.testSubject.getBeat(application.getApplicationName(), beat.getIdentifier());
+ this.testSubject.getBeat(appName, beat.getIdentifier());
Assert.fail("NotFoundException should be thrown.");
}
catch (final NotFoundException ignored) { }
}
-}
+
+ @Test
+ public void shouldDeleteApplication() throws InterruptedException {
+ final String appName = "funnybusiness-v3";
+ createBeat(appName, "bebopthedowop");
+
+ this.testSubject.deleteApplication(appName);
+ Assert.assertTrue(this.eventRecorder.wait(EventConstants.DELETE_APPLICATION, appName));
+
+ final List<Beat> allEntities = this.testSubject.getAllBeatsForApplication(appName);
+ Assert.assertTrue(allEntities.isEmpty());
+ }
+
+ @Test
+ public void shouldRetryBeatPublishIfFirstAttemptFails() throws InterruptedException {
+ final String appName = "funnybusiness-v4";
+ final String beatId = "bebopthedowop";
+
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+
+ final Beat beat = new Beat();
+ beat.setIdentifier(beatId);
+ beat.setAlignmentHour(now.getHour());
+
+ final LocalDateTime expectedBeatTimestamp = getExpectedBeatTimestamp(now, beat.getAlignmentHour());
+
+ Mockito.when(beatPublisherServiceSpy.publishBeat(beatId, tenantDataStoreContext.getTenantName(), appName, expectedBeatTimestamp)).thenReturn(false, false, true);
+
+ this.testSubject.createBeat(appName, beat);
+
+ Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(appName, beat.getIdentifier())));
+
+ Mockito.verify(beatPublisherServiceSpy, Mockito.timeout(10_000).times(3)).publishBeat(beatId, tenantDataStoreContext.getTenantName(), appName, expectedBeatTimestamp);
+ }
+}
\ No newline at end of file
diff --git a/component-test/src/main/java/io/mifos/rhythm/listener/ApplicationEventListener.java b/component-test/src/main/java/io/mifos/rhythm/listener/ApplicationEventListener.java
index f247387..651354c 100644
--- a/component-test/src/main/java/io/mifos/rhythm/listener/ApplicationEventListener.java
+++ b/component-test/src/main/java/io/mifos/rhythm/listener/ApplicationEventListener.java
@@ -41,16 +41,6 @@
@JmsListener(
subscription = EventConstants.DESTINATION,
destination = EventConstants.DESTINATION,
- selector = EventConstants.SELECTOR_POST_APPLICATION
- )
- public void onCreateApplication(@Header(TenantHeaderFilter.TENANT_HEADER) final String tenant,
- final String payload) {
- this.eventRecorder.event(tenant, EventConstants.POST_APPLICATION, payload, String.class);
- }
-
- @JmsListener(
- subscription = EventConstants.DESTINATION,
- destination = EventConstants.DESTINATION,
selector = EventConstants.SELECTOR_DELETE_APPLICATION
)
public void onDeleteApplication(@Header(TenantHeaderFilter.TENANT_HEADER) final String tenant,
diff --git a/service/build.gradle b/service/build.gradle
index dc790c6..26015c3 100644
--- a/service/build.gradle
+++ b/service/build.gradle
@@ -31,7 +31,9 @@
[group: 'org.springframework.cloud', name: 'spring-cloud-starter-eureka'],
[group: 'org.springframework.boot', name: 'spring-boot-starter-jetty'],
[group: 'io.mifos.rhythm', name: 'api', version: project.version],
+ [group: 'io.mifos.rhythm', name: 'spi', version: project.version],
[group: 'io.mifos.anubis', name: 'library', version: versions.frameworkanubis],
+ [group: 'io.mifos.permitted-feign-client', name: 'library', version: versions.frameworkpermittedfeignclient],
[group: 'com.google.code.gson', name: 'gson'],
[group: 'io.mifos.core', name: 'lang', version: versions.frameworklang],
[group: 'io.mifos.core', name: 'async', version: versions.frameworkasync],
@@ -42,7 +44,7 @@
)
}
-// publishToMavenLocal.dependsOn bootRepackage
+publishToMavenLocal.dependsOn bootRepackage
publishing {
publications {
@@ -52,7 +54,6 @@
artifactId project.name
version project.version
}
- /*
bootService(MavenPublication) {
// "boot" jar
artifact ("$buildDir/libs/$project.name-$version-boot.jar")
@@ -60,6 +61,5 @@
artifactId ("$project.name-boot")
version project.version
}
- */
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/RhythmConfiguration.java b/service/src/main/java/io/mifos/rhythm/service/RhythmConfiguration.java
index 11d27fa..1cf17a9 100644
--- a/service/src/main/java/io/mifos/rhythm/service/RhythmConfiguration.java
+++ b/service/src/main/java/io/mifos/rhythm/service/RhythmConfiguration.java
@@ -16,12 +16,14 @@
package io.mifos.rhythm.service;
import io.mifos.anubis.config.EnableAnubis;
+import io.mifos.core.api.config.EnableApiFactory;
import io.mifos.core.async.config.EnableAsync;
import io.mifos.core.cassandra.config.EnableCassandra;
import io.mifos.core.command.config.EnableCommandProcessing;
import io.mifos.core.lang.config.EnableServiceException;
import io.mifos.core.lang.config.EnableTenantContext;
import io.mifos.core.mariadb.config.EnableMariaDB;
+import io.mifos.permittedfeignclient.config.EnablePermissionRequestingFeignClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -30,6 +32,7 @@
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
+import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
@@ -40,17 +43,22 @@
@Configuration
@EnableAutoConfiguration
@EnableDiscoveryClient
+@EnableApiFactory
@EnableAsync
-@EnableTenantContext
@EnableCassandra
-@EnableMariaDB
+@EnableMariaDB(forTenantContext = false)
@EnableCommandProcessing
@EnableAnubis
@EnableServiceException
+@EnableScheduling
+@EnableTenantContext
+@EnablePermissionRequestingFeignClient
@ComponentScan({
"io.mifos.rhythm.service.rest",
+ "io.mifos.rhythm.service.config",
"io.mifos.rhythm.service.internal.service",
"io.mifos.rhythm.service.internal.repository",
+ "io.mifos.rhythm.service.internal.scheduler",
"io.mifos.rhythm.service.internal.command.handler"
})
@EnableJpaRepositories({
diff --git a/service/src/main/java/io/mifos/rhythm/service/ServiceConstants.java b/service/src/main/java/io/mifos/rhythm/service/ServiceConstants.java
index cacc037..78eb856 100644
--- a/service/src/main/java/io/mifos/rhythm/service/ServiceConstants.java
+++ b/service/src/main/java/io/mifos/rhythm/service/ServiceConstants.java
@@ -19,5 +19,5 @@
* @author Myrle Krantz
*/
public interface ServiceConstants {
- String LOGGER_NAME = "rest-logger";
+ String LOGGER_NAME = "rhythm-logger";
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/config/RhythmProperties.java b/service/src/main/java/io/mifos/rhythm/service/config/RhythmProperties.java
new file mode 100644
index 0000000..be0236a
--- /dev/null
+++ b/service/src/main/java/io/mifos/rhythm/service/config/RhythmProperties.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.service.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Myrle Krantz
+ */
+@Component
+@ConfigurationProperties(prefix="rhythm")
+public class RhythmProperties {
+ private String user;
+ private Long beatCheckRate = TimeUnit.MINUTES.toMillis(10);
+
+ public RhythmProperties() {
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Long getBeatCheckRate() {
+ return beatCheckRate;
+ }
+
+ public void setBeatCheckRate(Long beatCheckRate) {
+ this.beatCheckRate = beatCheckRate;
+ }
+}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateApplicationCommand.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateApplicationCommand.java
deleted file mode 100644
index 195aaa6..0000000
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateApplicationCommand.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.service.internal.command;
-
-import io.mifos.rhythm.api.v1.domain.Application;
-
-/**
- * @author Myrle Krantz
- */
-public class CreateApplicationCommand {
-
- private final Application instance;
-
- public CreateApplicationCommand(final Application instance) {
- super();
- this.instance = instance;
- }
-
- public Application getInstance() {
- return this.instance;
- }
-}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateBeatCommand.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateBeatCommand.java
index beda7d3..64f9356 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateBeatCommand.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/CreateBeatCommand.java
@@ -21,16 +21,23 @@
* @author Myrle Krantz
*/
public class CreateBeatCommand {
+ private final String tenantIdentifier;
+
private final String applicationName;
private final Beat instance;
- public CreateBeatCommand(final String applicationName, final Beat instance) {
+ public CreateBeatCommand(final String tenantIdentifier, final String applicationName, final Beat instance) {
super();
+ this.tenantIdentifier = tenantIdentifier;
this.applicationName = applicationName;
this.instance = instance;
}
+ public String getTenantIdentifier() {
+ return tenantIdentifier;
+ }
+
public String getApplicationName() {
return applicationName;
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteApplicationCommand.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteApplicationCommand.java
index d931069..11d517b 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteApplicationCommand.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteApplicationCommand.java
@@ -20,13 +20,19 @@
*/
public class DeleteApplicationCommand {
+ private final String tenantIdentifier;
private final String applicationName;
- public DeleteApplicationCommand(final String applicationName) {
+ public DeleteApplicationCommand(String tenantIdentifier, final String applicationName) {
super();
+ this.tenantIdentifier = tenantIdentifier;
this.applicationName = applicationName;
}
+ public String getTenantIdentifier() {
+ return tenantIdentifier;
+ }
+
public String getApplicationName() {
return this.applicationName;
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteBeatCommand.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteBeatCommand.java
index 8a2d317..8dc92a6 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteBeatCommand.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/DeleteBeatCommand.java
@@ -20,16 +20,23 @@
*/
public class DeleteBeatCommand {
+ private final String tenantIdentifier;
+
private final String applicationName;
private final String identifier;
- public DeleteBeatCommand(final String applicationName, final String identifier) {
+ public DeleteBeatCommand(final String tenantIdentifier, final String applicationName, final String identifier) {
super();
+ this.tenantIdentifier = tenantIdentifier;
this.applicationName = applicationName;
this.identifier = identifier;
}
+ public String getTenantIdentifier() {
+ return tenantIdentifier;
+ }
+
public String getApplicationName() {
return this.applicationName;
}
@@ -37,4 +44,5 @@
public String getIdentifier() {
return identifier;
}
+
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/ApplicationCommandHandler.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/ApplicationCommandHandler.java
index 32a4ee6..7b896fa 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/ApplicationCommandHandler.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/ApplicationCommandHandler.java
@@ -17,57 +17,32 @@
import io.mifos.core.command.annotation.Aggregate;
import io.mifos.core.command.annotation.CommandHandler;
-import io.mifos.core.command.annotation.EventEmitter;
-import io.mifos.core.lang.ServiceException;
import io.mifos.rhythm.api.v1.events.EventConstants;
-import io.mifos.rhythm.service.internal.command.CreateApplicationCommand;
import io.mifos.rhythm.service.internal.command.DeleteApplicationCommand;
-import io.mifos.rhythm.service.internal.mapper.ApplicationMapper;
-import io.mifos.rhythm.service.internal.repository.ApplicationEntity;
-import io.mifos.rhythm.service.internal.repository.ApplicationRepository;
+import io.mifos.rhythm.service.internal.repository.BeatRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Optional;
-
/**
* @author Myrle Krantz
*/
@SuppressWarnings("unused")
@Aggregate
public class ApplicationCommandHandler {
-
- private final ApplicationRepository applicationRepository;
+ private final BeatRepository beatRepository;
+ private final EventHelper eventHelper;
@Autowired
- public ApplicationCommandHandler(final ApplicationRepository applicationRepository) {
+ public ApplicationCommandHandler(final BeatRepository beatRepository, final EventHelper eventHelper) {
super();
- this.applicationRepository = applicationRepository;
+ this.beatRepository = beatRepository;
+ this.eventHelper = eventHelper;
}
@CommandHandler
@Transactional
- @EventEmitter(selectorName = EventConstants.SELECTOR_NAME, selectorValue = EventConstants.POST_APPLICATION)
- public String process(final CreateApplicationCommand createApplicationCommand) {
-
- final ApplicationEntity entity = ApplicationMapper.map(createApplicationCommand.getInstance());
- this.applicationRepository.save(entity);
-
- return createApplicationCommand.getInstance().getApplicationName();
- }
-
- @CommandHandler
- @Transactional
- @EventEmitter(selectorName = EventConstants.SELECTOR_NAME, selectorValue = EventConstants.DELETE_APPLICATION)
- public String process(final DeleteApplicationCommand deleteApplicationCommand) {
-
- final Optional<ApplicationEntity> toDelete
- = this.applicationRepository.findByApplicationName(deleteApplicationCommand.getApplicationName());
- final ApplicationEntity toDeleteForReal
- = toDelete.orElseThrow(() -> ServiceException.notFound("Application with the name " + deleteApplicationCommand.getApplicationName() + " not found."));
-
- this.applicationRepository.delete(toDeleteForReal);
-
- return deleteApplicationCommand.getApplicationName();
+ public void process(final DeleteApplicationCommand deleteApplicationCommand) {
+ this.beatRepository.deleteByTenantIdentifierAndApplicationName(deleteApplicationCommand.getTenantIdentifier(), deleteApplicationCommand.getApplicationName());
+ eventHelper.sendEvent(EventConstants.DELETE_APPLICATION, deleteApplicationCommand.getTenantIdentifier(), deleteApplicationCommand.getApplicationName());
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
index 4d9f690..f287f86 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/BeatCommandHandler.java
@@ -17,10 +17,9 @@
import io.mifos.core.command.annotation.Aggregate;
import io.mifos.core.command.annotation.CommandHandler;
-import io.mifos.core.command.annotation.EventEmitter;
import io.mifos.core.lang.ServiceException;
-import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.api.v1.events.BeatEvent;
+import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.service.internal.command.CreateBeatCommand;
import io.mifos.rhythm.service.internal.command.DeleteBeatCommand;
import io.mifos.rhythm.service.internal.mapper.BeatMapper;
@@ -39,38 +38,44 @@
public class BeatCommandHandler {
private final BeatRepository beatRepository;
+ private final EventHelper eventHelper;
@Autowired
- public BeatCommandHandler(final BeatRepository beatRepository) {
+ public BeatCommandHandler(final BeatRepository beatRepository, final EventHelper eventHelper) {
super();
this.beatRepository = beatRepository;
+ this.eventHelper = eventHelper;
}
@CommandHandler
@Transactional
- @EventEmitter(selectorName = EventConstants.SELECTOR_NAME, selectorValue = EventConstants.POST_BEAT)
- public BeatEvent process(final CreateBeatCommand createBeatCommand) {
+ public void process(final CreateBeatCommand createBeatCommand) {
- final BeatEntity entity = BeatMapper.map(createBeatCommand.getApplicationName(), createBeatCommand.getInstance());
+ final BeatEntity entity = BeatMapper.map(
+ createBeatCommand.getTenantIdentifier(),
+ createBeatCommand.getApplicationName(),
+ createBeatCommand.getInstance());
this.beatRepository.save(entity);
- return new BeatEvent(createBeatCommand.getApplicationName(), createBeatCommand.getInstance().getIdentifier());
+ eventHelper.sendEvent(EventConstants.POST_BEAT, createBeatCommand.getTenantIdentifier(),
+ new BeatEvent(createBeatCommand.getApplicationName(), createBeatCommand.getInstance().getIdentifier()));
}
@CommandHandler
@Transactional
- @EventEmitter(selectorName = EventConstants.SELECTOR_NAME, selectorValue = EventConstants.DELETE_BEAT)
- public BeatEvent process(final DeleteBeatCommand deleteBeatCommand) {
-
- final Optional<BeatEntity> toDelete
- = this.beatRepository.findByApplicationNameAndIdentifier(deleteBeatCommand.getApplicationName(), deleteBeatCommand.getIdentifier());
+ public void process(final DeleteBeatCommand deleteBeatCommand) {
+ final Optional<BeatEntity> toDelete = this.beatRepository.findByTenantIdentifierAndApplicationNameAndBeatIdentifier(
+ deleteBeatCommand.getTenantIdentifier(),
+ deleteBeatCommand.getApplicationName(),
+ deleteBeatCommand.getIdentifier());
final BeatEntity toDeleteForReal
= toDelete.orElseThrow(() -> ServiceException.notFound(
"Beat with for the application " + deleteBeatCommand.getApplicationName() +
- ", and the identifier " + deleteBeatCommand.getApplicationName() + " not found."));
+ ", and the identifier " + deleteBeatCommand.getIdentifier() + " not found."));
this.beatRepository.delete(toDeleteForReal);
- return new BeatEvent(deleteBeatCommand.getApplicationName(), deleteBeatCommand.getIdentifier());
+ eventHelper.sendEvent(EventConstants.DELETE_BEAT, deleteBeatCommand.getTenantIdentifier(),
+ new BeatEvent(deleteBeatCommand.getApplicationName(), deleteBeatCommand.getIdentifier()));
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/EventHelper.java b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/EventHelper.java
new file mode 100644
index 0000000..fb8e5a8
--- /dev/null
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/command/handler/EventHelper.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.service.internal.command.handler;
+
+import com.google.gson.Gson;
+import io.mifos.core.command.util.CommandConstants;
+import io.mifos.core.lang.config.TenantHeaderFilter;
+import io.mifos.rhythm.api.v1.events.EventConstants;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Myrle Krantz
+ */
+@SuppressWarnings("WeakerAccess")
+@Component
+public class EventHelper {
+ private final Gson gson;
+ private final JmsTemplate jmsTemplate;
+
+ public EventHelper(final @Qualifier(CommandConstants.SERIALIZER) Gson gson, final JmsTemplate jmsTemplate) {
+ this.gson = gson;
+ this.jmsTemplate = jmsTemplate;
+ }
+
+ void sendEvent(final String eventName, final String tenantIdentifier, final Object payload) {
+ this.jmsTemplate.convertAndSend(
+ this.gson.toJson(payload),
+ message -> {
+ message.setStringProperty(
+ TenantHeaderFilter.TENANT_HEADER,
+ tenantIdentifier);
+ message.setStringProperty(
+ EventConstants.SELECTOR_NAME,
+ eventName
+ );
+ return message;
+ }
+ );
+ }
+}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/mapper/ApplicationMapper.java b/service/src/main/java/io/mifos/rhythm/service/internal/mapper/ApplicationMapper.java
deleted file mode 100644
index 2862ead..0000000
--- a/service/src/main/java/io/mifos/rhythm/service/internal/mapper/ApplicationMapper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.service.internal.mapper;
-
-import io.mifos.rhythm.api.v1.domain.Application;
-import io.mifos.rhythm.service.internal.repository.ApplicationEntity;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * @author Myrle Krantz
- */
-public interface ApplicationMapper {
- static Application map(final ApplicationEntity entity) {
- final Application ret = new Application();
- ret.setApplicationName(entity.getApplicationName());
- return ret;
- }
-
- static ApplicationEntity map(final Application instance) {
- final ApplicationEntity ret = new ApplicationEntity();
- ret.setApplicationName(instance.getApplicationName());
- return ret;
- }
-
- static List<Application> map(final List<ApplicationEntity> entities) {
- final List<Application> ret = new ArrayList<>(entities.size());
- ret.addAll(entities.stream().map(ApplicationMapper::map).collect(Collectors.toList()));
- return ret;
- }
-}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/mapper/BeatMapper.java b/service/src/main/java/io/mifos/rhythm/service/internal/mapper/BeatMapper.java
index a505988..4df988e 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/mapper/BeatMapper.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/mapper/BeatMapper.java
@@ -18,6 +18,9 @@
import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.service.internal.repository.BeatEntity;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -28,7 +31,7 @@
public interface BeatMapper {
static Beat map(final BeatEntity entity) {
final Beat ret = new Beat();
- ret.setIdentifier(entity.getIdentifier());
+ ret.setIdentifier(entity.getBeatIdentifier());
ret.setAlignmentHour(entity.getAlignmentHour());
return ret;
}
@@ -39,11 +42,14 @@
return ret;
}
- static BeatEntity map(final String applicationName, final Beat instance) {
+ static BeatEntity map(final String tenantIdentifier, final String applicationName, final Beat instance) {
final BeatEntity ret = new BeatEntity();
- ret.setIdentifier(instance.getIdentifier());
+ ret.setBeatIdentifier(instance.getIdentifier());
+ ret.setTenantIdentifier(tenantIdentifier);
ret.setApplicationName(applicationName);
ret.setAlignmentHour(instance.getAlignmentHour());
+ //First beat is today. If it's in the past, it will be created nearly immediately.
+ ret.setNextBeat(LocalDateTime.now(ZoneId.of("UTC")).truncatedTo(ChronoUnit.DAYS).plusHours(instance.getAlignmentHour()));
return ret;
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationEntity.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationEntity.java
deleted file mode 100644
index 9cee984..0000000
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationEntity.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.service.internal.repository;
-
-import javax.persistence.*;
-
-/**
- * @author Myrle Krantz
- */
-@SuppressWarnings({"unused", "WeakerAccess"})
-@Entity
-@Table(name = "khepri_applications")
-public class ApplicationEntity {
-
- @Id
- @GeneratedValue(strategy = GenerationType.IDENTITY)
- @Column(name = "id")
- private Long id;
-
- @Column(name = "application_name")
- private String applicationName;
-
- public ApplicationEntity() {
- super();
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public String getApplicationName() {
- return applicationName;
- }
-
- public void setApplicationName(String applicationName) {
- this.applicationName = applicationName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || !(o instanceof ApplicationEntity)) return false;
-
- ApplicationEntity that = (ApplicationEntity) o;
-
- return getApplicationName().equals(that.getApplicationName());
-
- }
-
- @Override
- public int hashCode() {
- return getApplicationName().hashCode();
- }
-}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java
deleted file mode 100644
index 5df98d3..0000000
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/ApplicationRepository.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.service.internal.repository;
-
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-/**
- * @author Myrle Krantz
- */
-@SuppressWarnings("unused")
-@Repository
-public interface ApplicationRepository extends JpaRepository<ApplicationEntity, Long> {
- Optional<ApplicationEntity> findByApplicationName(String applicationName);
-}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatEntity.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatEntity.java
index 999c54e..3b7f0e5 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatEntity.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatEntity.java
@@ -15,7 +15,10 @@
*/
package io.mifos.rhythm.service.internal.repository;
+import io.mifos.core.mariadb.util.LocalDateTimeConverter;
+
import javax.persistence.*;
+import java.time.LocalDateTime;
import java.util.Objects;
/**
@@ -30,15 +33,22 @@
@Column(name = "id")
private Long id;
- @Column(name = "identifier")
- private String identifier;
+ @Column(name = "beat_identifier", nullable = false)
+ private String beatIdentifier;
- @Column(name = "application_name")
+ @Column(name = "tenant_identifier", nullable = false)
+ private String tenantIdentifier;
+
+ @Column(name = "application_name", nullable = false)
private String applicationName;
- @Column(name = "alignment_hour")
+ @Column(name = "alignment_hour", nullable = false)
private Integer alignmentHour;
+ @Column(name = "next_beat")
+ @Convert(converter = LocalDateTimeConverter.class)
+ private LocalDateTime nextBeat;
+
public BeatEntity() {
super();
}
@@ -51,12 +61,20 @@
this.id = id;
}
- public String getIdentifier() {
- return identifier;
+ public String getBeatIdentifier() {
+ return beatIdentifier;
}
- public void setIdentifier(String identifier) {
- this.identifier = identifier;
+ public void setBeatIdentifier(String beatIdentifier) {
+ this.beatIdentifier = beatIdentifier;
+ }
+
+ public String getTenantIdentifier() {
+ return tenantIdentifier;
+ }
+
+ public void setTenantIdentifier(String tenantIdentifier) {
+ this.tenantIdentifier = tenantIdentifier;
}
public String getApplicationName() {
@@ -75,17 +93,38 @@
this.alignmentHour = alignmentHour;
}
+ public LocalDateTime getNextBeat() {
+ return nextBeat;
+ }
+
+ public void setNextBeat(LocalDateTime nextBeat) {
+ this.nextBeat = nextBeat;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (o == null || !(o instanceof BeatEntity)) return false;
+ if (o == null || getClass() != o.getClass()) return false;
BeatEntity that = (BeatEntity) o;
- return Objects.equals(getIdentifier(), that.getIdentifier()) &&
- Objects.equals(getApplicationName(), that.getApplicationName());
+ return Objects.equals(beatIdentifier, that.beatIdentifier) &&
+ Objects.equals(tenantIdentifier, that.tenantIdentifier) &&
+ Objects.equals(applicationName, that.applicationName);
}
@Override
public int hashCode() {
- return Objects.hash(getIdentifier(), getApplicationName());
+ return Objects.hash(beatIdentifier, tenantIdentifier, applicationName);
+ }
+
+ @Override
+ public String toString() {
+ return "BeatEntity{" +
+ "id=" + id +
+ ", beatIdentifier='" + beatIdentifier + '\'' +
+ ", tenantIdentifier='" + tenantIdentifier + '\'' +
+ ", applicationName='" + applicationName + '\'' +
+ ", alignmentHour=" + alignmentHour +
+ ", nextBeat=" + nextBeat +
+ '}';
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
index 1ccc815..df5d117 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/repository/BeatRepository.java
@@ -18,14 +18,18 @@
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
+import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Stream;
/**
* @author Myrle Krantz
*/
@Repository
public interface BeatRepository extends JpaRepository<BeatEntity, Long> {
- List<BeatEntity> findByApplicationName(String applicationName);
- Optional<BeatEntity> findByApplicationNameAndIdentifier(String applicationName, String identifier);
+ void deleteByTenantIdentifierAndApplicationName(String tenantIdentifier, String applicationName);
+ List<BeatEntity> findByTenantIdentifierAndApplicationName(String tenantIdentifier, String applicationName);
+ Optional<BeatEntity> findByTenantIdentifierAndApplicationNameAndBeatIdentifier(String tenantIdentifier, String applicationName, String identifier);
+ Stream<BeatEntity> findByNextBeatBefore(LocalDateTime currentTime);
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/scheduler/Drummer.java b/service/src/main/java/io/mifos/rhythm/service/internal/scheduler/Drummer.java
new file mode 100644
index 0000000..4f84037
--- /dev/null
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/scheduler/Drummer.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.service.internal.scheduler;
+
+import io.mifos.rhythm.service.ServiceConstants;
+import io.mifos.rhythm.service.internal.repository.BeatEntity;
+import io.mifos.rhythm.service.internal.repository.BeatRepository;
+import io.mifos.rhythm.service.internal.service.BeatPublisherService;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.dao.InvalidDataAccessResourceUsageException;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * @author Myrle Krantz
+ */
+@SuppressWarnings({"unused", "WeakerAccess"})
+@Component
+public class Drummer {
+ private final BeatPublisherService beatPublisherService;
+ private final BeatRepository beatRepository;
+ private final Logger logger;
+
+ @Autowired
+ public Drummer(
+ final BeatPublisherService beatPublisherService,
+ final BeatRepository beatRepository,
+ @Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
+ this.beatPublisherService = beatPublisherService;
+ this.beatRepository = beatRepository;
+ this.logger = logger;
+ }
+
+ @Scheduled(initialDelayString = "${rhythm.beatCheckRate}", fixedRateString = "${rhythm.beatCheckRate}")
+ @Transactional
+ public void checkForBeatsNeeded() {
+ try {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ //Get beats from the last two hours in case restart/start happens close to hour begin.
+ final Stream<BeatEntity> beats = beatRepository.findByNextBeatBefore(now);
+ beats.forEach((beat) -> {
+ logger.info("Checking if beat {} needs publishing.", beat);
+ final Optional<LocalDateTime> nextBeat = beatPublisherService.checkBeatForPublish(now, beat.getBeatIdentifier(), beat.getTenantIdentifier(), beat.getApplicationName(), beat.getAlignmentHour(), beat.getNextBeat());
+ nextBeat.ifPresent(x -> {
+ beat.setNextBeat(x);
+ beatRepository.save(beat);
+ });
+ });
+
+ }
+ catch (final InvalidDataAccessResourceUsageException e) {
+ logger.info("InvalidDataAccessResourceUsageException in check for scheduled beats, probably " +
+ "because initialize hasn't been called yet. {}", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/service/ApplicationService.java b/service/src/main/java/io/mifos/rhythm/service/internal/service/ApplicationService.java
deleted file mode 100644
index 8162a4a..0000000
--- a/service/src/main/java/io/mifos/rhythm/service/internal/service/ApplicationService.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2017 The Mifos Initiative.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.mifos.rhythm.service.internal.service;
-
-import io.mifos.rhythm.api.v1.domain.Application;
-import io.mifos.rhythm.service.internal.mapper.ApplicationMapper;
-import io.mifos.rhythm.service.internal.repository.ApplicationRepository;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-import java.util.Optional;
-
-/**
- * @author Myrle Krantz
- */
-@Service
-public class ApplicationService {
-
- private final ApplicationRepository applicationRepository;
-
- @Autowired
- public ApplicationService(final ApplicationRepository applicationRepository) {
- super();
- this.applicationRepository = applicationRepository;
- }
-
- public List<Application> findAllEntities() {
- return ApplicationMapper.map(this.applicationRepository.findAll());
- }
-
- public Optional<Application> findByIdentifier(final String identifier) {
- return this.applicationRepository.findByApplicationName(identifier).map(ApplicationMapper::map);
- }
-}
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatPublisherService.java b/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatPublisherService.java
new file mode 100644
index 0000000..3d57c9a
--- /dev/null
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatPublisherService.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.service.internal.service;
+
+import io.mifos.core.api.context.AutoUserContext;
+import io.mifos.core.api.util.ApiFactory;
+import io.mifos.core.lang.AutoTenantContext;
+import io.mifos.core.lang.DateConverter;
+import io.mifos.permittedfeignclient.service.ApplicationAccessTokenService;
+import io.mifos.rhythm.service.config.RhythmProperties;
+import io.mifos.rhythm.spi.v1.client.BeatListener;
+import io.mifos.rhythm.spi.v1.domain.BeatPublish;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Nonnull;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import static io.mifos.rhythm.service.ServiceConstants.LOGGER_NAME;
+
+/**
+ * @author Myrle Krantz
+ */
+@Service
+public class BeatPublisherService {
+ private final DiscoveryClient discoveryClient;
+ private final ApplicationAccessTokenService applicationAccessTokenService;
+ private final ApiFactory apiFactory;
+ private final RhythmProperties properties;
+ private final Logger logger;
+
+ @Autowired
+ public BeatPublisherService(
+ @SuppressWarnings("SpringJavaAutowiringInspection") final DiscoveryClient discoveryClient,
+ @SuppressWarnings("SpringJavaAutowiringInspection") final ApplicationAccessTokenService applicationAccessTokenService,
+ final ApiFactory apiFactory,
+ final RhythmProperties properties,
+ @Qualifier(LOGGER_NAME) final Logger logger) {
+ this.discoveryClient = discoveryClient;
+ this.applicationAccessTokenService = applicationAccessTokenService;
+ this.apiFactory = apiFactory;
+ this.properties = properties;
+ this.logger = logger;
+ }
+
+ /**
+ * Authenticate with identity and publish the beat to the application. This function performs all the internal
+ * interprocess communication in rhythm, and therefore most be mocked in unit and component tests.
+ *
+ * @param beatIdentifier The identifier of the beat as provided when the beat was created.
+ * @param tenantIdentifier The tenant identifier as provided via the tenant header when the beat was created.
+ * @param applicationName The name of the application the beat should be sent to.
+ * @param timestamp The publication time for the beat. If rhythm has been down for a while this could be in the past.
+ *
+ * @return true if the beat was published. false if the beat was not published, or we just don't know.
+ */
+ @SuppressWarnings("WeakerAccess") //Access is public for spying in component test.
+ public boolean publishBeat(
+ final String beatIdentifier,
+ final String tenantIdentifier,
+ final String applicationName,
+ final LocalDateTime timestamp) {
+ final BeatPublish beatPublish = new BeatPublish(beatIdentifier, DateConverter.toIsoString(timestamp));
+ logger.info("Attempting publish {} with timestamp {} under user {}.", beatPublish, timestamp, properties.getUser());
+
+ final List<ServiceInstance> applicationsByName = discoveryClient.getInstances(applicationName);
+ if (applicationsByName.isEmpty())
+ return false;
+
+ final ServiceInstance beatListenerService = applicationsByName.get(0);
+ final BeatListener beatListener = apiFactory.create(BeatListener.class, beatListenerService.getUri().toString());
+
+ final String accessToken = applicationAccessTokenService.getAccessToken(
+ properties.getUser(), getEndointSetIdentifier(applicationName));
+ try (final AutoUserContext ignored2 = new AutoUserContext(properties.getUser(), accessToken)) {
+ try (final AutoTenantContext ignored = new AutoTenantContext(tenantIdentifier)) {
+ beatListener.publishBeat(beatPublish);
+ return true;
+ }
+ }
+ catch (final Throwable e) {
+ return false;
+ }
+ }
+
+ private static String getEndointSetIdentifier(final String applicationName) {
+ return applicationName.replace("-", "__") + "__khepri";
+ }
+
+ public Optional<LocalDateTime> checkBeatForPublish(
+ final LocalDateTime now,
+ final String beatIdentifier,
+ final String tenantIdentifier,
+ final String applicationName,
+ final Integer alignmentHour,
+ final LocalDateTime nextBeat) {
+ return checkBeatForPublishHelper(now, alignmentHour, nextBeat, x -> publishBeat(beatIdentifier, tenantIdentifier, applicationName, x));
+ }
+
+ //Helper is separated from original function so that it can be unit-tested separately from publishBeat.
+ static Optional<LocalDateTime> checkBeatForPublishHelper(
+ final LocalDateTime now,
+ final Integer alignmentHour,
+ final LocalDateTime nextBeat,
+ final Predicate<LocalDateTime> publishSucceeded) {
+ final long numberOfBeatPublishesNeeded = getNumberOfBeatPublishesNeeded(now, nextBeat);
+ if (numberOfBeatPublishesNeeded == 0)
+ return Optional.empty();
+
+ final Optional<LocalDateTime> firstFailedBeat = Stream.iterate(nextBeat,
+ x -> incrementToAlignment(x, alignmentHour))
+ .limit(numberOfBeatPublishesNeeded)
+ .filter(x -> !publishSucceeded.test(x))
+ .findFirst();
+
+ if (firstFailedBeat.isPresent())
+ return firstFailedBeat;
+ else
+ return Optional.of(incrementToAlignment(now, alignmentHour));
+ }
+
+ static long getNumberOfBeatPublishesNeeded(final LocalDateTime now, final @Nonnull LocalDateTime nextBeat) {
+ if (nextBeat.isAfter(now))
+ return 0;
+
+ return Math.max(1, nextBeat.until(now, ChronoUnit.DAYS));
+ }
+
+ static LocalDateTime incrementToAlignment(final LocalDateTime toIncrement, final Integer alignmentHour)
+ {
+ return toIncrement.plusDays(1).truncatedTo(ChronoUnit.DAYS).plusHours(alignmentHour);
+ }
+}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatService.java b/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatService.java
index 7a46a45..dfffc12 100644
--- a/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatService.java
+++ b/service/src/main/java/io/mifos/rhythm/service/internal/service/BeatService.java
@@ -38,11 +38,11 @@
this.beatRepository = beatRepository;
}
- public List<Beat> findAllEntities(final String applicationName) {
- return BeatMapper.map(this.beatRepository.findByApplicationName(applicationName));
+ public List<Beat> findAllEntities(final String tenantIdentifier, final String applicationName) {
+ return BeatMapper.map(this.beatRepository.findByTenantIdentifierAndApplicationName(tenantIdentifier, applicationName));
}
- public Optional<Beat> findByIdentifier(final String applicationName, final String identifier) {
- return this.beatRepository.findByApplicationNameAndIdentifier(applicationName, identifier).map(BeatMapper::map);
+ public Optional<Beat> findByIdentifier(final String tenantIdentifier, final String applicationName, final String identifier) {
+ return this.beatRepository.findByTenantIdentifierAndApplicationNameAndBeatIdentifier(tenantIdentifier, applicationName, identifier).map(BeatMapper::map);
}
}
diff --git a/service/src/main/java/io/mifos/rhythm/service/rest/ApplicationRestController.java b/service/src/main/java/io/mifos/rhythm/service/rest/ApplicationRestController.java
index 9f6ef8d..15bbcd0 100644
--- a/service/src/main/java/io/mifos/rhythm/service/rest/ApplicationRestController.java
+++ b/service/src/main/java/io/mifos/rhythm/service/rest/ApplicationRestController.java
@@ -18,18 +18,13 @@
import io.mifos.anubis.annotation.AcceptedTokenType;
import io.mifos.anubis.annotation.Permittable;
import io.mifos.core.command.gateway.CommandGateway;
-import io.mifos.core.lang.ServiceException;
-import io.mifos.rhythm.api.v1.domain.Application;
-import io.mifos.rhythm.service.internal.command.CreateApplicationCommand;
import io.mifos.rhythm.service.internal.command.DeleteApplicationCommand;
-import io.mifos.rhythm.service.internal.service.ApplicationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
-import javax.validation.Valid;
-import java.util.List;
+import static io.mifos.core.lang.config.TenantHeaderFilter.TENANT_HEADER;
/**
* @author Myrle Krantz
@@ -40,54 +35,11 @@
public class ApplicationRestController {
private final CommandGateway commandGateway;
- private final ApplicationService applicationService;
@Autowired
- public ApplicationRestController(final CommandGateway commandGateway,
- final ApplicationService applicationService) {
+ public ApplicationRestController(final CommandGateway commandGateway) {
super();
this.commandGateway = commandGateway;
- this.applicationService = applicationService;
- }
-
- @Permittable(value = AcceptedTokenType.SYSTEM)
- @RequestMapping(
- method = RequestMethod.GET,
- consumes = MediaType.ALL_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE
- )
- public
- @ResponseBody
- List<Application> getAllApplications() {
- return this.applicationService.findAllEntities();
- }
-
- @Permittable(value = AcceptedTokenType.SYSTEM)
- @RequestMapping(
- value = "/{applicationname}",
- method = RequestMethod.GET,
- consumes = MediaType.ALL_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE
- )
- public
- @ResponseBody
- ResponseEntity<Application> getApplication(@PathVariable("applicationname") final String applicationName) {
- return this.applicationService.findByIdentifier(applicationName)
- .map(ResponseEntity::ok)
- .orElseThrow(() -> ServiceException.notFound("Instance with identifier " + applicationName + " doesn't exist."));
- }
-
- @Permittable(value = AcceptedTokenType.SYSTEM)
- @RequestMapping(
- method = RequestMethod.POST,
- consumes = MediaType.APPLICATION_JSON_VALUE,
- produces = MediaType.APPLICATION_JSON_VALUE
- )
- public
- @ResponseBody
- ResponseEntity<Void> createApplication(@RequestBody @Valid final Application instance) throws InterruptedException {
- this.commandGateway.process(new CreateApplicationCommand(instance));
- return ResponseEntity.accepted().build();
}
@Permittable(value = AcceptedTokenType.SYSTEM)
@@ -99,8 +51,10 @@
)
public
@ResponseBody
- ResponseEntity<Void> deleteApplication(@PathVariable("applicationname") final String applicationName) throws InterruptedException {
- this.commandGateway.process(new DeleteApplicationCommand(applicationName));
+ ResponseEntity<Void> deleteApplication(
+ @RequestHeader(TENANT_HEADER) final String tenantIdentifier,
+ @PathVariable("applicationname") final String applicationName) throws InterruptedException {
+ this.commandGateway.process(new DeleteApplicationCommand(tenantIdentifier, applicationName));
return ResponseEntity.accepted().build();
}
}
\ No newline at end of file
diff --git a/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java b/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
index 4c553d7..06ef2fe 100644
--- a/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
+++ b/service/src/main/java/io/mifos/rhythm/service/rest/BeatRestController.java
@@ -31,6 +31,8 @@
import javax.validation.Valid;
import java.util.List;
+import static io.mifos.core.lang.config.TenantHeaderFilter.TENANT_HEADER;
+
/**
* @author Myrle Krantz
*/
@@ -58,8 +60,10 @@
)
public
@ResponseBody
- List<Beat> getAllBeatsForApplication(@PathVariable("applicationname") final String applicationName) {
- return this.beatService.findAllEntities(applicationName);
+ List<Beat> getAllBeatsForApplication(
+ @RequestHeader(TENANT_HEADER) final String tenantIdentifier,
+ @PathVariable("applicationname") final String applicationName) {
+ return this.beatService.findAllEntities(tenantIdentifier, applicationName);
}
@Permittable(value = AcceptedTokenType.SYSTEM)
@@ -71,8 +75,11 @@
)
public
@ResponseBody
- ResponseEntity<Beat> getBeat(@PathVariable("applicationname") final String applicationName, @PathVariable("beatidentifier") final String beatIdentifier) {
- return this.beatService.findByIdentifier(applicationName, beatIdentifier)
+ ResponseEntity<Beat> getBeat(
+ @RequestHeader(TENANT_HEADER) final String tenantIdentifier,
+ @PathVariable("applicationname") final String applicationName,
+ @PathVariable("beatidentifier") final String beatIdentifier) {
+ return this.beatService.findByIdentifier(tenantIdentifier, applicationName, beatIdentifier)
.map(ResponseEntity::ok)
.orElseThrow(() -> ServiceException.notFound("Instance with identifier " + applicationName + " doesn't exist."));
}
@@ -85,8 +92,11 @@
)
public
@ResponseBody
- ResponseEntity<Void> createBeat(@PathVariable("applicationname") final String applicationName, @RequestBody @Valid final Beat instance) throws InterruptedException {
- this.commandGateway.process(new CreateBeatCommand(applicationName, instance));
+ ResponseEntity<Void> createBeat(
+ @RequestHeader(TENANT_HEADER) final String tenantIdentifier,
+ @PathVariable("applicationname") final String applicationName,
+ @RequestBody @Valid final Beat instance) throws InterruptedException {
+ this.commandGateway.process(new CreateBeatCommand(tenantIdentifier, applicationName, instance));
return ResponseEntity.accepted().build();
}
@@ -99,8 +109,11 @@
)
public
@ResponseBody
- ResponseEntity<Void> deleteBeat(@PathVariable("applicationname") final String applicationName, @PathVariable("beatidentifier") final String beatIdentifier) throws InterruptedException {
- this.commandGateway.process(new DeleteBeatCommand(applicationName, beatIdentifier));
+ ResponseEntity<Void> deleteBeat(
+ @RequestHeader(TENANT_HEADER) final String tenantIdentifier,
+ @PathVariable("applicationname") final String applicationName,
+ @PathVariable("beatidentifier") final String beatIdentifier) throws InterruptedException {
+ this.commandGateway.process(new DeleteBeatCommand(tenantIdentifier, applicationName, beatIdentifier));
return ResponseEntity.accepted().build();
}
}
\ No newline at end of file
diff --git a/service/src/main/resources/application.yml b/service/src/main/resources/application.yml
index f6be82e..9f1110c 100644
--- a/service/src/main/resources/application.yml
+++ b/service/src/main/resources/application.yml
@@ -64,3 +64,6 @@
flyway:
enabled: false
+
+rhythm:
+ user: scheduler
diff --git a/service/src/main/resources/db/migrations/mariadb/V1__initial_setup.sql b/service/src/main/resources/db/migrations/mariadb/V1__initial_setup.sql
index e2e2968..bd253d9 100644
--- a/service/src/main/resources/db/migrations/mariadb/V1__initial_setup.sql
+++ b/service/src/main/resources/db/migrations/mariadb/V1__initial_setup.sql
@@ -17,18 +17,13 @@
# noinspection SqlNoDataSourceInspectionForFile
-CREATE TABLE khepri_applications (
- id BIGINT NOT NULL AUTO_INCREMENT,
- application_name VARCHAR(64) NOT NULL,
- CONSTRAINT khepri_applications_uq UNIQUE (application_name),
- CONSTRAINT khepri_applications_pk PRIMARY KEY (id)
-);
-
CREATE TABLE khepri_beats (
id BIGINT NOT NULL AUTO_INCREMENT,
- identifier VARCHAR(32) NOT NULL,
- application_name VARCHAR(32) NOT NULL,
+ tenant_identifier VARCHAR(64) NOT NULL,
+ application_name VARCHAR(64) NOT NULL,
+ beat_identifier VARCHAR(32) NOT NULL,
alignment_hour INT NOT NULL,
- CONSTRAINT khepri_beats_uq UNIQUE (identifier, application_name),
+ next_beat TIMESTAMP(3) NOT NULL,
+ CONSTRAINT khepri_beats_uq UNIQUE (tenant_identifier, application_name, beat_identifier),
CONSTRAINT khepri_beats_pk PRIMARY KEY (id)
);
diff --git a/service/src/test/java/io/mifos/rhythm/service/internal/service/BeatPublisherServiceTest.java b/service/src/test/java/io/mifos/rhythm/service/internal/service/BeatPublisherServiceTest.java
new file mode 100644
index 0000000..3d82b75
--- /dev/null
+++ b/service/src/test/java/io/mifos/rhythm/service/internal/service/BeatPublisherServiceTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.service.internal.service;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * @author Myrle Krantz
+ */
+public class BeatPublisherServiceTest {
+
+ @Test
+ public void incrementToAlignment() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final LocalDateTime tomorrow = BeatPublisherService.incrementToAlignment(now, 3);
+
+ Assert.assertEquals(tomorrow.minusDays(1).truncatedTo(ChronoUnit.DAYS), now.truncatedTo(ChronoUnit.DAYS));
+ Assert.assertEquals(3, tomorrow.getHour());
+ }
+
+ @Test
+ public void getNumberOfBeatPublishesNeeded() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final long eventsNeeded3 = BeatPublisherService.getNumberOfBeatPublishesNeeded(now, now.minus(3, ChronoUnit.DAYS));
+ Assert.assertEquals(3, eventsNeeded3);
+
+ final long eventsNeededPast = BeatPublisherService.getNumberOfBeatPublishesNeeded(now, now.plus(1, ChronoUnit.DAYS));
+ Assert.assertEquals(0, eventsNeededPast);
+
+ final long eventsNeededNow = BeatPublisherService.getNumberOfBeatPublishesNeeded(now, now.minus(2, ChronoUnit.MINUTES));
+ Assert.assertEquals(1, eventsNeededNow);
+ }
+
+ @Test
+ public void checkBeatForPublishAllBeatsSucceed() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final Optional<LocalDateTime> ret = BeatPublisherService.checkBeatForPublishHelper(now, 0, now.minus(3, ChronoUnit.DAYS), x -> true);
+ Assert.assertEquals(Optional.of(BeatPublisherService.incrementToAlignment(now, 0)), ret);
+ }
+
+ @Test
+ public void checkBeatForPublishFirstFails() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final LocalDateTime nextBeat = now.minus(3, ChronoUnit.DAYS);
+ @SuppressWarnings("unchecked") final Predicate<LocalDateTime> produceBeatsMock = Mockito.mock(Predicate.class);
+ Mockito.when(produceBeatsMock.test(nextBeat)).thenReturn(false);
+ final Optional<LocalDateTime> ret = BeatPublisherService.checkBeatForPublishHelper(now, 0, nextBeat, produceBeatsMock);
+ Assert.assertEquals(Optional.of(nextBeat), ret);
+ }
+
+ @Test
+ public void checkBeatForPublishSecondFails() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final LocalDateTime nextBeat = now.minus(3, ChronoUnit.DAYS);
+ final LocalDateTime secondBeat = BeatPublisherService.incrementToAlignment(nextBeat, 0);
+ @SuppressWarnings("unchecked") final Predicate<LocalDateTime> produceBeatsMock = Mockito.mock(Predicate.class);
+ Mockito.when(produceBeatsMock.test(nextBeat)).thenReturn(true);
+ Mockito.when(produceBeatsMock.test(secondBeat)).thenReturn(false);
+ final Optional<LocalDateTime> ret = BeatPublisherService.checkBeatForPublishHelper(now, 0, nextBeat, produceBeatsMock);
+ Assert.assertEquals(Optional.of(secondBeat), ret);
+ }
+
+ @Test
+ public void checkBeatForPublishNoneNeeded() {
+ final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ final Optional<LocalDateTime> ret = BeatPublisherService.checkBeatForPublishHelper(now, 0, now.plus(1, ChronoUnit.DAYS),
+ x -> { Assert.fail("Pubish shouldn't be called"); return true; });
+ Assert.assertEquals(Optional.empty(), ret);
+ }
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 42da305..241f211 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,6 +1,7 @@
rootProject.name = 'rhythm'
includeBuild 'api'
+includeBuild 'spi'
includeBuild 'service'
includeBuild 'component-test'
diff --git a/shared.gradle b/shared.gradle
index e4df2e7..4403324 100644
--- a/shared.gradle
+++ b/shared.gradle
@@ -10,6 +10,7 @@
frameworkcommand : '0.1.0-BUILD-SNAPSHOT',
frameworktest: '0.1.0-BUILD-SNAPSHOT',
frameworkanubis: '0.1.0-BUILD-SNAPSHOT',
+ frameworkpermittedfeignclient: '0.1.0-BUILD-SNAPSHOT',
validator : '5.3.0.Final'
]
diff --git a/spi/build.gradle b/spi/build.gradle
new file mode 100644
index 0000000..17c6648
--- /dev/null
+++ b/spi/build.gradle
@@ -0,0 +1,39 @@
+buildscript {
+ repositories {
+ jcenter()
+ }
+
+ dependencies {
+ classpath 'io.spring.gradle:dependency-management-plugin:0.6.0.RELEASE'
+ }
+}
+
+plugins {
+ id "com.github.hierynomus.license" version "0.13.1"
+}
+
+apply from: '../shared.gradle'
+
+dependencies {
+ compile(
+ [group: 'org.springframework.cloud', name: 'spring-cloud-starter-feign'],
+ [group: 'io.mifos.core', name: 'api', version: versions.frameworkapi],
+ [group: 'org.hibernate', name: 'hibernate-validator', version: versions.validator],
+ [group: 'org.hibernate', name: 'hibernate-validator-annotation-processor', version: versions.validator]
+ )
+
+ testCompile(
+ [group: 'io.mifos.core', name: 'test', version: versions.frameworktest],
+ )
+}
+
+publishing {
+ publications {
+ api(MavenPublication) {
+ from components.java
+ groupId project.group
+ artifactId project.name
+ version project.version
+ }
+ }
+}
diff --git a/spi/settings.gradle b/spi/settings.gradle
new file mode 100644
index 0000000..371646b
--- /dev/null
+++ b/spi/settings.gradle
@@ -0,0 +1 @@
+rootProject.name = 'spi'
diff --git a/spi/src/main/java/io/mifos/rhythm/spi/v1/client/BeatListener.java b/spi/src/main/java/io/mifos/rhythm/spi/v1/client/BeatListener.java
new file mode 100644
index 0000000..c94a5d1
--- /dev/null
+++ b/spi/src/main/java/io/mifos/rhythm/spi/v1/client/BeatListener.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.spi.v1.client;
+
+import io.mifos.rhythm.spi.v1.domain.BeatPublish;
+import org.springframework.cloud.netflix.feign.FeignClient;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+
+/**
+ * @author Myrle Krantz
+ */
+@SuppressWarnings("unused")
+@FeignClient(path="/beatlistener/v1")
+public interface BeatListener {
+ @RequestMapping(
+ value = "/publishedbeats",
+ method = RequestMethod.POST,
+ produces = MediaType.APPLICATION_JSON_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ void publishBeat(final BeatPublish beatPublish);
+}
diff --git a/spi/src/main/java/io/mifos/rhythm/spi/v1/domain/BeatPublish.java b/spi/src/main/java/io/mifos/rhythm/spi/v1/domain/BeatPublish.java
new file mode 100644
index 0000000..dbd37c0
--- /dev/null
+++ b/spi/src/main/java/io/mifos/rhythm/spi/v1/domain/BeatPublish.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017 The Mifos Initiative.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mifos.rhythm.spi.v1.domain;
+
+import io.mifos.core.lang.validation.constraints.ValidIdentifier;
+
+import java.util.Objects;
+
+/**
+ * @author Myrle Krantz
+ */
+public class BeatPublish {
+ @ValidIdentifier
+ private String identifier;
+
+ private String forTime;
+
+ public BeatPublish() {
+ }
+
+ public BeatPublish(String identifier, String forTime) {
+ this.identifier = identifier;
+ this.forTime = forTime;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public String getForTime() {
+ return forTime;
+ }
+
+ public void setForTime(String forTime) {
+ this.forTime = forTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BeatPublish beatPublish = (BeatPublish) o;
+ return Objects.equals(identifier, beatPublish.identifier) &&
+ Objects.equals(forTime, beatPublish.forTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, forTime);
+ }
+
+ @Override
+ public String toString() {
+ return "BeatPublish{" +
+ "identifier='" + identifier + '\'' +
+ ", forTime='" + forTime + '\'' +
+ '}';
+ }
+}