[SLING-8582] provide some test coverage for Eventing's JobHandler (#6)

* provide some test coverage for JobHandler
* add refactorings to cleanup code
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
index a3203fc..1a0ae86 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
@@ -23,12 +23,12 @@
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
@@ -78,31 +78,23 @@
      * Reschedule the job
      * Update the retry count and remove the started time.
      * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
-     */
+     */    
     public boolean reschedule() {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
-                if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
-                    mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
-                }
-                mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
-                mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
-                try {
-                    resolver.commit();
-                    return true;
-                } catch ( final PersistenceException pe ) {
-                    this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
-                }
+        return withJobResource((jobResource,mvm) -> {
+            mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
+            if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+                mvm.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
             }
-        } finally {
-            resolver.close();
-        }
-
-        return false;
+            mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+            mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+            try {
+                jobResource.getResourceResolver().commit();
+                return true;
+            } catch ( final PersistenceException pe ) {
+                this.configuration.getMainLogger().debug("Unable to update reschedule properties for job " + job.getId(), pe);
+            }
+            return false;
+        });
     }
 
     /**
@@ -110,59 +102,53 @@
      * @param state The state of the processing
      * @param keepJobInHistory whether to keep the job in the job history.
      * @param duration the duration of the processing.
-     */
-    public void finished(final Job.JobState state,
-                          final boolean keepJobInHistory,
-                          final Long duration) {
+     */    
+    public void finished(final Job.JobState state, final boolean keepJobInHistory, final Long duration) {
         final boolean isSuccess = (state == Job.JobState.SUCCEEDED);
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    String newPath = null;
-                    if ( keepJobInHistory ) {
-                        final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                        newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
-                        final Map<String, Object> props = new HashMap<>(vm);
-                        props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
-                        if ( isSuccess ) {
-                            // we set the finish date to start date + duration
-                            final Date finishDate = new Date();
-                            finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
-                            final Calendar finishCal = Calendar.getInstance();
-                            finishCal.setTime(finishDate);
-                            props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
-                        } else {
-                            // current time is good enough
-                            props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
-                        }
-                        if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
-                            props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
-                        }
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+        withJobResource((jobResource,mvm) -> {
+            try {
+                ResourceResolver rr = jobResource.getResourceResolver();
+                String newPath = null;
+                if (keepJobInHistory) {
+                    newPath = this.configuration.getStoragePath(job.getTopic(), job.getId(), isSuccess);
+                    final Map<String, Object> props = new HashMap<>(mvm);
+                    props.put(JobImpl.PROPERTY_FINISHED_STATE, state.name());
+                    if (isSuccess) {
+                        // we set the finish date to start date + duration
+                        final Date finishDate = new Date();
+                        finishDate.setTime(job.getProcessingStarted().getTime().getTime() + duration);
+                        final Calendar finishCal = Calendar.getInstance();
+                        finishCal.setTime(finishDate);
+                        props.put(JobImpl.PROPERTY_FINISHED_DATE, finishCal);
+                    } else {
+                        // current time is good enough
+                        props.put(JobImpl.PROPERTY_FINISHED_DATE, Calendar.getInstance());
                     }
-                    resolver.delete(jobResource);
-                    resolver.commit();
-
-                    if ( keepJobInHistory && configuration.getMainLogger().isDebugEnabled() ) {
-                        if ( isSuccess ) {
-                            configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job), newPath);
-                        } else {
-                            configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job), newPath);
-                        }
+                    if (job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null) {
+                        props.put(Job.PROPERTY_RESULT_MESSAGE, job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
                     }
-                } catch ( final PersistenceException pe ) {
-                    this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
+                    ResourceHelper.getOrCreateResource(rr, newPath, props);
                 }
+                rr.delete(jobResource);
+                rr.commit();
+
+                if (keepJobInHistory && configuration.getMainLogger().isDebugEnabled()) {
+                    if (isSuccess) {
+                        configuration.getMainLogger().debug("Kept successful job {} at {}", Utility.toString(job),
+                                newPath);
+                    } else {
+                        configuration.getMainLogger().debug("Moved cancelled job {} to {}", Utility.toString(job),
+                                newPath);
+                    }
+                }
+            } catch (final PersistenceException pe) {
+                this.configuration.getMainLogger().warn("Unable to finish job " + job.getId(), pe);
             }
-        } finally {
-            resolver.close();
-        }
+            return false; // this return value is ignored
+        });
     }
+    
+    
 
     /**
      * Reassign to a new instance.
@@ -171,80 +157,65 @@
         final QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(job.getTopic());
         // Sanity check if queue configuration has changed
         final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
-        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+        final String targetId = (caps == null ? null
+                : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
 
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+        withJobResource((jobResource, mvm) -> {
+            final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(),
+                    job.getProperties());
 
-                    final Map<String, Object> props = new HashMap<>(vm);
-                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                    if ( targetId == null ) {
-                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                    } else {
-                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                    }
-                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                    try {
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                        resolver.delete(jobResource);
-                        resolver.commit();
-                    } catch ( final PersistenceException pe ) {
-                        this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
-                    }
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.configuration.getMainLogger().debug("Unable to instantiate job", ie);
-                }
+            final Map<String, Object> props = new HashMap<>(mvm);
+            props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+            if (targetId == null) {
+                props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+            } else {
+                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
             }
-        } finally {
-            resolver.close();
-        }
+            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+            try {
+                ResourceResolver r = jobResource.getResourceResolver();
+                ResourceHelper.getOrCreateResource(r, newPath, props);
+                r.delete(jobResource);
+                r.commit();
+            } catch (final PersistenceException pe) {
+                this.configuration.getMainLogger().warn("Unable to reassign job " + job.getId(), pe);
+            }
+            return true; // this return value is ignored
+        });
     }
 
     /**
      * Update the property of a job in the resource tree
      * @param propNames the property names to update
      * @return {@code true} if the update was successful.
-     */
+     */    
     public boolean persistJobProperties(final String... propNames) {
-        if ( propNames != null ) {
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final Resource jobResource = resolver.getResource(job.getResourcePath());
-                if ( jobResource != null ) {
-                    final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
-                    for(final String propName : propNames) {
-                        final Object val = job.getProperty(propName);
-                        if ( val != null ) {
-                            if ( val.getClass().isEnum() ) {
-                                mvm.put(propName, val.toString());
-                            } else {
-                                mvm.put(propName, val);
-                            }
-                        } else {
-                            mvm.remove(propName);
-                        }
+        if (propNames == null) {
+            return true;
+        }
+        return withJobResource((jobResource,mvm) -> {
+            for(final String propName : propNames) {
+                final Object val = job.getProperty(propName);
+                if ( val != null ) {
+                    if ( val.getClass().isEnum() ) {
+                        mvm.put(propName, val.toString());
+                    } else {
+                        mvm.put(propName, val);
                     }
-                    resolver.commit();
-
-                    return true;
                 } else {
-                    this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+                    mvm.remove(propName);
                 }
-            } catch ( final PersistenceException ignore ) {
+            }
+            try {
+                jobResource.getResourceResolver().commit();
+                return true;
+            } catch (PersistenceException ignore) {
                 this.configuration.getMainLogger().debug("Unable to persist properties", ignore);
-            } finally {
-                resolver.close();
             }
             return false;
-        }
-        return true;
+        });
+        
     }
 
     public boolean isStopped() {
@@ -281,4 +252,34 @@
     public String toString() {
         return "JobHandler(" + this.job.getId() + ")";
     }
+    
+    /**
+     * Helper method to execute a function on the job resource. Performs all necessary checks and validations
+     * so that the function does not need to perform any null checks and such.
+     * 
+     * The second parameter is a ModifiableValueMap (non-null) adapted from the JobResource,
+     * so both a read-only and a read/write case can be implemented.
+     *
+     * @param func the function to execute
+     * @return the status (which is passed thru from func
+     */
+    private boolean withJobResource (BiFunction<Resource,ModifiableValueMap,Boolean> func) {
+        try (ResourceResolver resolver = this.configuration.createResourceResolver()) {
+            Resource jobResource = resolver.getResource(job.getResourcePath());
+            if (jobResource == null) {
+                this.configuration.getMainLogger().debug("No job resource found at {}", job.getResourcePath());
+                return false;  
+            }
+            // This implicitly assumes that the ResourceResolver provided by the configuration allows 
+            // r/w access to the jobResource.
+            ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+            if (mvm == null){
+                this.configuration.getMainLogger().debug("Cannot adapt resource {} to ModifiableValueMap, no write permissions?", job.getResourcePath());
+                return false;  
+            }
+            return func.apply(jobResource,mvm);
+        }
+        
+    }
+    
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index 71db186..1c267f6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -287,6 +287,9 @@
     /**
      * Create a new resource resolver for reading and writing the resource tree.
      * The resolver needs to be closed by the client.
+     * This ResourceResolver provides read and write access to all resources relevant for the event
+     * and job handling.
+     * 
      * @return A resource resolver or {@code null} if the component is already deactivated.
      * @throws RuntimeException if the resolver can't be created.
      */
@@ -361,11 +364,11 @@
         final String topicName = topic.replace('/', '.');
         final StringBuilder sb = new StringBuilder();
         if ( targetId != null ) {
-            sb.append(this.assignedJobsPath);
+            sb.append(this.getAssginedJobsPath());
             sb.append('/');
             sb.append(targetId);
         } else {
-            sb.append(this.unassignedJobsPath);
+            sb.append(this.getUnassignedJobsPath());
         }
         sb.append('/');
         sb.append(topicName);
@@ -441,9 +444,9 @@
         final String topicName = topic.replace('/', '.');
         final StringBuilder sb = new StringBuilder();
         if ( isSuccess ) {
-            sb.append(this.storedSuccessfulJobsPath);
+            sb.append(this.getStoredSuccessfulJobsPath());
         } else {
-            sb.append(this.storedCancelledJobsPath);
+            sb.append(this.getStoredCancelledJobsPath());
         }
         sb.append('/');
         sb.append(topicName);
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
new file mode 100644
index 0000000..bebc711
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.serviceusermapping.ServiceUserMapped;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class JobHandlerTest {
+    public static final String JOB_TOPIC = "job/topic";
+    public static final String JOB_ID = "12345";
+    public static final String JOB_PATH = "/var/eventing/jobs/path/to/my/job";
+
+    @Rule
+    public SlingContext context = new SlingContext();
+
+    ResourceResolver resolver; // a spy to the context.resourceResolver()
+
+    JobManagerConfiguration config;
+    JobImpl job;
+    JobHandler handler;
+
+    @Before
+    public void setup() throws Exception {
+        Environment.APPLICATION_ID = "slingId"; // oh, really hacky ...
+        
+        config = buildConfiguration();
+        job = buildJobImpl();
+        handler = new JobHandler (job, buildJobExecutor(), config);
+    }
+
+    @Test
+    public void rescheduleWithNonExistingJobPath() throws Exception {
+        
+        // resolver.commit is used internally a few times, reset this spy before we actually start testing
+        Mockito.reset(resolver); 
+        assertFalse(handler.reschedule());
+        Mockito.verify(resolver, Mockito.never()).commit();
+    }
+
+    @Test
+    public void rescheduleWithExistingJobPath() throws Exception {
+        context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+        // resolver.commit is used internally a few times, reset this spy before we actually start testing
+        Mockito.reset(resolver); 
+        assertTrue(handler.reschedule());
+        Mockito.verify(resolver, Mockito.times(1)).commit();
+    }
+
+    @Test
+    public void finishedKeepHistory () throws Exception {
+        context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+        // resolver.commit is used internally a few times, reset this spy before we actually start testing
+        Mockito.reset(resolver); 
+
+        Job.JobState state = Job.JobState.SUCCEEDED;
+        handler.finished(state,true, 1000L);
+
+        // check that the jobpath has been deleted
+        ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+        Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+        assertEquals(JOB_PATH,captor.getValue().getPath());
+
+        // check that the history resource is present
+        String historyPath = config.getStoragePath(job.getTopic(), job.getId(), true);
+        assertNotNull(resolver.getResource(historyPath)); 
+    }
+
+    @Test
+    public void finishedDropHistory () throws Exception {
+        context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+        // resolver.commit is used internally a few times, reset this spy before we actually start testing
+        Mockito.reset(resolver); 
+        Job.JobState state = Job.JobState.SUCCEEDED;
+        handler.finished(state,false, 1000L);
+
+        // check that the jobpath has been deleted
+        ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+        Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+        assertEquals(JOB_PATH,captor.getValue().getPath());
+
+        // check that the history resource is not present
+        String historyPath = config.getStoragePath(job.getTopic(), job.getId(), true);
+        assertNull(resolver.getResource(historyPath)); 
+    }
+
+    @Test
+    public void reassign () throws Exception {
+        context.build().resource(JOB_PATH, Collections.emptyMap()).commit();
+        // resolver.commit is used internally a few times, reset this spy before we actually start testing
+        Mockito.reset(resolver); 
+        handler.reassign();
+
+        // check that the old jobpath has been deleted
+        ArgumentCaptor<Resource> captor = ArgumentCaptor.forClass(Resource.class);
+        Mockito.verify (resolver,Mockito.atLeast(1)).delete(captor.capture());
+        assertEquals(JOB_PATH,captor.getValue().getPath());
+
+        // check that the reassigned job is present
+        String newJobPath = config.getUniquePath("targetId", job.getTopic(), job.getId(), job.getProperties());
+        assertNotNull(resolver.getResource(newJobPath)); 
+    }
+
+    @Test
+    public void persistJobPropertiesTest() {
+        
+        Map<String,Object> props = new HashMap<>();
+        props.put("prop1", "value1");
+        props.put("toBeUpdated", "value2");
+        props.put("toBeRemoved","value3");
+
+        context.build().resource(JOB_PATH, props).commit();
+        assertTrue(handler.persistJobProperties());
+        assertTrue(handler.persistJobProperties(null));
+
+        assertTrue(handler.persistJobProperties("toBeUpdated","toBeRemoved"));
+        Resource jobResource = resolver.getResource(JOB_PATH);
+        assertNotNull(jobResource);
+        ValueMap vm = jobResource.adaptTo(ValueMap.class);
+        assertNotNull(vm);
+
+        assertEquals("value1",vm.get("prop1"));
+        assertEquals("updatedValue2",vm.get("toBeUpdated"));
+        assertNull(vm.get("toBeRemoved"));
+    }
+
+    // supporting methods
+    
+    private JobManagerConfiguration buildConfiguration () throws LoginException {
+     
+        JobManagerConfiguration originalConfiguration = new JobManagerConfiguration ();
+        JobManagerConfiguration configuration = spy (originalConfiguration);
+        ResourceResolverFactory rrf = context.getService(ResourceResolverFactory.class);
+
+        // we don't care what type of resolver we use for these tests
+        resolver = spy (rrf.getAdministrativeResourceResolver(null));
+
+        when (configuration.createResourceResolver()).thenReturn(resolver);
+
+        // these are mocked because it's easier than invoking the activate method
+        when (configuration.getJobsBasePathWithSlash()).thenReturn("/var/events/");
+        when (configuration.getStoredCancelledJobsPath()).thenReturn("/var/events/cancelled");
+        when (configuration.getStoredSuccessfulJobsPath()).thenReturn("/var/events/finished");
+        when (configuration.getAssginedJobsPath()).thenReturn("/var/events/assigned");
+        when (configuration.getUnassignedJobsPath()).thenReturn("/var/events/unassigned");
+
+        // just simple mocks for all dependencies for the JobManagerConfiguration and provide
+        // stubs when required
+        ServiceUserMapped sum = mock (ServiceUserMapped.class);
+        context.registerService(ServiceUserMapped.class,sum);
+
+        QueueConfigurationManager qcm = mock (QueueConfigurationManager.class);
+        when (qcm.getQueueInfo(Mockito.anyString())).thenReturn(null);
+
+        TopologyCapabilities caps = mock (TopologyCapabilities.class);
+        when(caps.detectTarget(Mockito.matches(JOB_TOPIC), ArgumentMatchers.<Map<String,Object>>any(), 
+                ArgumentMatchers.<QueueConfigurationManager.QueueInfo>any())).thenReturn("targetId");
+        when(configuration.getTopologyCapabilities()).thenReturn(caps);
+        when(configuration.getQueueConfigurationManager()).thenReturn(qcm);
+
+        return configuration;
+    }
+
+    private JobExecutor buildJobExecutor() {
+        return mock (JobExecutor.class);
+    }
+
+    private JobImpl buildJobImpl() {
+
+        Map<String,Object> props = new HashMap<>();
+        props.put(JobImpl.PROPERTY_RESOURCE_PATH, JOB_PATH);
+        props.put(Job.PROPERTY_RESULT_MESSAGE,"result message");
+        props.put(Job.PROPERTY_JOB_STARTED_TIME, Calendar.getInstance());
+
+        props.put("toBeUpdated", "updatedValue2");
+        props.put("toBeRemoved",null);
+        JobImpl job =  new JobImpl(JOB_TOPIC,JOB_ID, props);;
+        return job;
+    }
+}