[SLING-8582] provide some test coverage for Eventing's JobHandler (#6)
* provide some test coverage for JobHandler
* add refactorings to cleanup code
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
index a3203fc..1a0ae86 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
@@ -23,12 +23,12 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.BiFunction;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
@@ -78,31 +78,23 @@
* Reschedule the job
* Update the retry count and remove the started time.
* @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
- */
+ */
public boolean reschedule() {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
- if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
- mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
- }
- mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
- mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
- try {
- resolver.commit();
- return true;
- } catch ( final PersistenceException pe ) {
- this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
- }
+ return withJobResource((jobResource,mvm) -> {
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+ mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
}
- } finally {
- resolver.close();
- }
-
- return false;
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+ try {
+ jobResource.getResourceResolver().commit();
+ return true;
+ } catch ( final PersistenceException pe ) {
+ this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
+ }
+ return false;
+ });
}
/**
@@ -110,59 +102,53 @@
* @param state The state of the processing
* @param keepJobInHistory whether to keep the job in the job history.
* @param duration the duration of the processing.
- */
- public void finished(final Job.JobState state,
- final boolean keepJobInHistory,
- final Long duration) {
+ */
+ public void finished(final Job.JobState state, final boolean keepJobInHistory, final Long duration) {
final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- String newPath = null;
- if ( keepJobInHistory ) {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
- final Map<String, Object> props = new HashMap<>(vm);
- props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
- if ( isSuccess ) {
- // we set the finish date to start date + duration
- final Date finishDate = new Date();
- finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
- final Calendar finishCal = Calendar.getInstance();
- finishCal.setTime(finishDate);
- props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
- } else {
- // current time is good enough
- props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
- }
- if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
- props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
- }
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ withJobResource((jobResource,mvm) -> {
+ try {
+ ResourceResolver rr = jobResource.getResourceResolver();
+ String newPath = null;
+ if (keepJobInHistory) {
+ newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
+ final Map<String, Object> props = new HashMap<>(mvm);
+ props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
+ if (isSuccess) {
+ // we set the finish date to start date + duration
+ final Date finishDate = new Date();
+ finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+ final Calendar finishCal = Calendar.getInstance();
+ finishCal.setTime(finishDate);
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+ } else {
+ // current time is good enough
+ props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
}
- resolver.delete(jobResource);
- resolver.commit();
-
- if ( keepJobInHistory && configuration.getMainLogger().isDebugEnabled() ) {
- if ( isSuccess ) {
- configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job), newPath);
- } else {
- configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
- }
+ if (job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null) {
+ props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
}
- } catch ( final PersistenceException pe ) {
- this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+ ResourceHelper.getOrCreateResource(rr, newPath, props);
}
+ rr.delete(jobResource);
+ rr.commit();
+
+ if (keepJobInHistory && configuration.getMainLogger().isDebugEnabled()) {
+ if (isSuccess) {
+ configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job),
+ newPath);
+ } else {
+ configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job),
+ newPath);
+ }
+ }
+ } catch (final PersistenceException pe) {
+ this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
}
- } finally {
- resolver.close();
- }
+ return false; // this return value is ignored
+ });
}
+
+
/**
* Reassign to a new instance.
@@ -171,80 +157,65 @@
final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
// Sanity check if queue configuration has changed
final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
- final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+ final String targetId = (caps == null ? null
+ : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- try {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+ withJobResource((jobResource, mvm) -> {
+ final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(),
+ job.getProperties());
- final Map<String, Object> props = new HashMap<>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- if ( targetId == null ) {
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- } else {
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- }
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(jobResource);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
- }
+ final Map<String, Object> props = new HashMap<>(mvm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ if (targetId == null) {
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ } else {
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
}
- } finally {
- resolver.close();
- }
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceResolver r = jobResource.getResourceResolver();
+ ResourceHelper.getOrCreateResource(r, newPath, props);
+ r.delete(jobResource);
+ r.commit();
+ } catch (final PersistenceException pe) {
+ this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
+ }
+ return true; // this return value is ignored
+ });
}
/**
* Update the property of a job in the resource tree
* @param propNames the property names to update
* @return {@code true} if the update was successful.
- */
+ */
public boolean persistJobProperties(final String... propNames) {
- if ( propNames != null ) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobResource = resolver.getResource(job.getResourcePath());
- if ( jobResource != null ) {
- final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
- for(final String propName : propNames) {
- final Object val = job.getProperty(propName);
- if ( val != null ) {
- if ( val.getClass().isEnum() ) {
- mvm.put(propName, val.toString());
- } else {
- mvm.put(propName, val);
- }
- } else {
- mvm.remove(propName);
- }
+ if (propNames == null) {
+ return true;
+ }
+ return withJobResource((jobResource,mvm) -> {
+ for(final String propName : propNames) {
+ final Object val = job.getProperty(propName);
+ if ( val != null ) {
+ if ( val.getClass().isEnum() ) {
+ mvm.put(propName, val.toString());
+ } else {
+ mvm.put(propName, val);
}
- resolver.commit();
-
- return true;
} else {
- this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+ mvm.remove(propName);
}
- } catch ( final PersistenceException ignore ) {
+ }
+ try {
+ jobResource.getResourceResolver().commit();
+ return true;
+ } catch (PersistenceException ignore) {
this.configuration.getMainLogger().debug("Unable to persist properties", ignore);
- } finally {
- resolver.close();
}
return false;
- }
- return true;
+ });
+
}
public boolean isStopped() {
@@ -281,4 +252,34 @@
public String toString() {
return "JobHandler(" + this.job.getId() + ")";
}
+
+ /**
+ * Helper method to execute a function on the job resource. Performs all necessary checks and validations
+ * so that the function does not need to perform any null checks and such.
+ *
+ * The second parameter is a ModifiableValueMap (non-null) adapted from the JobResource,
+ * so both a read-only and a read/write case can be implemented.
+ *
+ * @param func the function to execute
+ * @return the status (which is passed thru from func
+ */
+ private boolean withJobResource (BiFunction<Resource,ModifiableValueMap,Boolean> func) {
+ try (ResourceResolver resolver = this.configuration.createResourceResolver()) {
+ Resource jobResource = resolver.getResource(job.getResourcePath());
+ if (jobResource == null) {
+ this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+ return false;
+ }
+ // This implicitly assumes that the ResourceResolver provided by the configuration allows
+ // r/w access to the jobResource.
+ ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ if (mvm == null){
+ this.configuration.getMainLogger().debug("Cannot adapt resource {} to ModifiableValueMap, no write permissions?", job.getResourcePath());
+ return false;
+ }
+ return func.apply(jobResource,mvm);
+ }
+
+ }
+
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index 71db186..1c267f6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -287,6 +287,9 @@
/**
* Create a new resource resolver for reading and writing the resource tree.
* The resolver needs to be closed by the client.
+ * This ResourceResolver provides read and write access to all resources relevant for the event
+ * and job handling.
+ *
* @return A resource resolver or {@code null} if the component is already deactivated.
* @throws RuntimeException if the resolver can't be created.
*/
@@ -361,11 +364,11 @@
final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( targetId != null ) {
- sb.append(this.assignedJobsPath);
+ sb.append(this.getAssginedJobsPath());
sb.append('/');
sb.append(targetId);
} else {
- sb.append(this.unassignedJobsPath);
+ sb.append(this.getUnassignedJobsPath());
}
sb.append('/');
sb.append(topicName);
@@ -441,9 +444,9 @@
final String topicName = topic.replace('/', '.');
final StringBuilder sb = new StringBuilder();
if ( isSuccess ) {
- sb.append(this.storedSuccessfulJobsPath);
+ sb.append(this.getStoredSuccessfulJobsPath());
} else {
- sb.append(this.storedCancelledJobsPath);
+ sb.append(this.getStoredCancelledJobsPath());
}
sb.append('/');
sb.append(topicName);
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
new file mode 100644
index 0000000..bebc711
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.serviceusermapping.ServiceUserMapped;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class JobHandlerTest {
+ public static final String JOB_TOPIC = "job/topic";
+ public static final String JOB_ID = "12345";
+ public static final String JOB_PATH = "/var/eventing/jobs/path/to/my/job";
+
+ @Rule
+ public SlingContext context = new SlingContext();
+
+ ResourceResolver resolver; // a spy to the context.resourceResolver()
+
+ JobManagerConfiguration config;
+ JobImpl job;
+ JobHandler handler;
+
+ @Before
+ public void setup() throws Exception {
+ Environment.APPLICATION_ID = "slingId"; // oh, really hacky ...
+
+ config = buildConfiguration();
+ job = buildJobImpl();
+ handler = new JobHandler (job, buildJobExecutor(), config);
+ }
+
+ @Test
+ public void rescheduleWithNonExistingJobPath() throws Exception {
+
+ // resolver.commit is used internally a few times, reset this spy before we actually start testing
+ Mockito.reset(resolver);
+ assertFalse(handler.reschedule());
+ Mockito.verify(resolver, Mockito.never()).commit();
+ }
+
+ @Test
+ public void rescheduleWithExistingJobPath() throws Exception {
+ context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+ // resolver.commit is used internally a few times, reset this spy before we actually start testing
+ Mockito.reset(resolver);
+ assertTrue(handler.reschedule());
+ Mockito.verify(resolver, Mockito.times(1)).commit();
+ }
+
+ @Test
+ public void finishedKeepHistory () throws Exception {
+ context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+ // resolver.commit is used internally a few times, reset this spy before we actually start testing
+ Mockito.reset(resolver);
+
+ Job.JobState state = Job.JobState.SUCCEEDED;
+ handler.finished(state,true, 1000L);
+
+ // check that the jobpath has been deleted
+ ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+ Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+ assertEquals(JOB_PATH,captor.getValue().getPath());
+
+ // check that the history resource is present
+ String historyPath = config.getStoragePath(job.getTopic(), job.getId(), true);
+ assertNotNull(resolver.getResource(historyPath));
+ }
+
+ @Test
+ public void finishedDropHistory () throws Exception {
+ context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+ // resolver.commit is used internally a few times, reset this spy before we actually start testing
+ Mockito.reset(resolver);
+ Job.JobState state = Job.JobState.SUCCEEDED;
+ handler.finished(state,false, 1000L);
+
+ // check that the jobpath has been deleted
+ ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+ Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+ assertEquals(JOB_PATH,captor.getValue().getPath());
+
+ // check that the history resource is not present
+ String historyPath = config.getStoragePath(job.getTopic(), job.getId(), true);
+ assertNull(resolver.getResource(historyPath));
+ }
+
+ @Test
+ public void reassign () throws Exception {
+ context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+ // resolver.commit is used internally a few times, reset this spy before we actually start testing
+ Mockito.reset(resolver);
+ handler.reassign();
+
+ // check that the old jobpath has been deleted
+ ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+ Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+ assertEquals(JOB_PATH,captor.getValue().getPath());
+
+ // check that the reassigned job is present
+ String newJobPath = config.getUniquePath("targetId", job.getTopic(), job.getId(), job.getProperties());
+ assertNotNull(resolver.getResource(newJobPath));
+ }
+
+ @Test
+ public void persistJobPropertiesTest() {
+
+ Map<String,Object> props = new HashMap<>();
+ props.put("prop1", "value1");
+ props.put("toBeUpdated", "value2");
+ props.put("toBeRemoved","value3");
+
+ context.build().resource(JOB_PATH, props).commit();
+ assertTrue(handler.persistJobProperties());
+ assertTrue(handler.persistJobProperties(null));
+
+ assertTrue(handler.persistJobProperties("toBeUpdated","toBeRemoved"));
+ Resource jobResource = resolver.getResource(JOB_PATH);
+ assertNotNull(jobResource);
+ ValueMap vm = jobResource.adaptTo(ValueMap.class);
+ assertNotNull(vm);
+
+ assertEquals("value1",vm.get("prop1"));
+ assertEquals("updatedValue2",vm.get("toBeUpdated"));
+ assertNull(vm.get("toBeRemoved"));
+ }
+
+ // supporting methods
+
+ private JobManagerConfiguration buildConfiguration () throws LoginException {
+
+ JobManagerConfiguration originalConfiguration = new JobManagerConfiguration ();
+ JobManagerConfiguration configuration = spy (originalConfiguration);
+ ResourceResolverFactory rrf = context.getService(ResourceResolverFactory.class);
+
+ // we don't care what type of resolver we use for these tests
+ resolver = spy (rrf.getAdministrativeResourceResolver(null));
+
+ when (configuration.createResourceResolver()).thenReturn(resolver);
+
+ // these are mocked because it's easier than invoking the activate method
+ when (configuration.getJobsBasePathWithSlash()).thenReturn("/var/events/");
+ when (configuration.getStoredCancelledJobsPath()).thenReturn("/var/events/cancelled");
+ when (configuration.getStoredSuccessfulJobsPath()).thenReturn("/var/events/finished");
+ when (configuration.getAssginedJobsPath()).thenReturn("/var/events/assigned");
+ when (configuration.getUnassignedJobsPath()).thenReturn("/var/events/unassigned");
+
+ // just simple mocks for all dependencies for the JobManagerConfiguration and provide
+ // stubs when required
+ ServiceUserMapped sum = mock (ServiceUserMapped.class);
+ context.registerService(ServiceUserMapped.class,sum);
+
+ QueueConfigurationManager qcm = mock (QueueConfigurationManager.class);
+ when (qcm.getQueueInfo(Mockito.anyString())).thenReturn(null);
+
+ TopologyCapabilities caps = mock (TopologyCapabilities.class);
+ when(caps.detectTarget(Mockito.matches(JOB_TOPIC), ArgumentMatchers.<Map<String,Object>>any(),
+ ArgumentMatchers.<QueueConfigurationManager.QueueInfo>any())).thenReturn("targetId");
+ when(configuration.getTopologyCapabilities()).thenReturn(caps);
+ when(configuration.getQueueConfigurationManager()).thenReturn(qcm);
+
+ return configuration;
+ }
+
+ private JobExecutor buildJobExecutor() {
+ return mock (JobExecutor.class);
+ }
+
+ private JobImpl buildJobImpl() {
+
+ Map<String,Object> props = new HashMap<>();
+ props.put(JobImpl.PROPERTY_RESOURCE_PATH, JOB_PATH);
+ props.put(Job.PROPERTY_RESULT_MESSAGE,"result message");
+ props.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
+
+ props.put("toBeUpdated", "updatedValue2");
+ props.put("toBeRemoved",null);
+ JobImpl job = new JobImpl(JOB_TOPIC,JOB_ID, props);;
+ return job;
+ }
+}