SLING-8915 Lazily replace previously cached ResourceBundles when reloaded on preload enabled
diff --git a/src/main/java/org/apache/sling/i18n/impl/JcrResourceBundleProvider.java b/src/main/java/org/apache/sling/i18n/impl/JcrResourceBundleProvider.java
index 0b97f10..c51cf9f 100644
--- a/src/main/java/org/apache/sling/i18n/impl/JcrResourceBundleProvider.java
+++ b/src/main/java/org/apache/sling/i18n/impl/JcrResourceBundleProvider.java
@@ -193,10 +193,6 @@
 
     @Override
     public ResourceBundle getResourceBundle(final String baseName, Locale locale) {
-        if (locale == null) {
-            locale = defaultLocale;
-        }
-
         return getResourceBundleInternal(null, baseName, locale);
     }
 
@@ -361,18 +357,12 @@
     }
 
     void reloadBundle(final Key key) {
-        // remove bundle from cache
-        resourceBundleCache.remove(key);
         log.info("Reloading resource bundle for {}", key);
-        // unregister bundle
-        ServiceRegistration<ResourceBundle> serviceRegistration = null;
-        synchronized (this) {
-            serviceRegistration = bundleServiceRegistrations.remove(key);
-        }
-        if (serviceRegistration != null) {
-            serviceRegistration.unregister();
-        } else {
-            log.warn("Could not find resource bundle service for {}", key);
+        if (!this.preloadBundles) {
+            // remove bundle from cache
+            resourceBundleCache.remove(key);
+            // unregister bundle
+            unregisterResourceBundle(key);
         }
 
         Collection<JcrResourceBundle> dependentBundles = new ArrayList<>();
@@ -393,7 +383,7 @@
 
         if (preloadBundles) {
             // reload the bundle from the repository (will also fill cache and register as a service)
-            getResourceBundle(key.baseName, key.locale);
+            getResourceBundleInternal(null, key.baseName, key.locale, true);
         }
     }
 
@@ -435,9 +425,17 @@
      *             created and the <code>ResourceResolver</code> is not
      *             available to access the resources.
      */
-    private ResourceBundle getResourceBundleInternal(ResourceResolver optionalResolver, final String baseName, final Locale locale) {
+    private ResourceBundle getResourceBundleInternal(ResourceResolver optionalResolver, String baseName, Locale locale) {
+        return getResourceBundleInternal(optionalResolver, baseName, locale, false);
+    }
+
+    private ResourceBundle getResourceBundleInternal(ResourceResolver optionalResolver, final String baseName, Locale locale, final boolean overwriteCache) {
+        if (locale == null) {
+            locale = defaultLocale;
+        }
+
         final Key key = new Key(baseName, locale);
-        JcrResourceBundle resourceBundle = resourceBundleCache.get(key);
+        JcrResourceBundle resourceBundle = !overwriteCache ? resourceBundleCache.get(key) : null;
         if (resourceBundle != null) {
             log.debug("getResourceBundleInternal({}): got cache hit on first try", key);
         } else {
@@ -447,7 +445,7 @@
             final Semaphore loadingGuard = loadingGuards.get(key);
             try {
                 loadingGuard.acquire();
-                resourceBundle = resourceBundleCache.get(key);
+                resourceBundle = !overwriteCache ? resourceBundleCache.get(key) : null;
                 if (resourceBundle != null) {
                     log.debug("getResourceBundleInternal({}): got cache hit on second try", key);
                 } else {
@@ -460,8 +458,13 @@
                         }
 
                         resourceBundle = createResourceBundle(optionalResolver, key.baseName, key.locale);
-                        resourceBundleCache.put(key, resourceBundle);
+                        // put the newly created ResourceBundle to the cache. If it replaces an existing entry unregister the existing
+                        // service registration first before re-registering the new ResourceBundle.
+                        if (resourceBundleCache.put(key, resourceBundle) != null) {
+                            unregisterResourceBundle(key);
+                        }
                         registerResourceBundle(key, resourceBundle);
+
                     } catch ( final LoginException le) {
                         throw (MissingResourceException)new MissingResourceException("Unable to create service resource resolver",
                                 baseName,
@@ -482,6 +485,18 @@
         return resourceBundle;
     }
 
+    private void unregisterResourceBundle(Key key) {
+        ServiceRegistration<ResourceBundle> serviceRegistration = null;
+        synchronized (this) {
+            serviceRegistration = bundleServiceRegistrations.remove(key);
+        }
+        if (serviceRegistration != null) {
+            serviceRegistration.unregister();
+        } else {
+            log.warn("Could not find resource bundle service for {}", key);
+        }
+    }
+
     private void registerResourceBundle(Key key, JcrResourceBundle resourceBundle) {
         Dictionary<String, Object> serviceProps = new Hashtable<>();
         if (key.baseName != null) {
diff --git a/src/test/java/org/apache/sling/i18n/impl/ConcurrentJcrResourceBundleLoadingTest.java b/src/test/java/org/apache/sling/i18n/impl/ConcurrentJcrResourceBundleLoadingTest.java
index 841ae39..2127ee6 100644
--- a/src/test/java/org/apache/sling/i18n/impl/ConcurrentJcrResourceBundleLoadingTest.java
+++ b/src/test/java/org/apache/sling/i18n/impl/ConcurrentJcrResourceBundleLoadingTest.java
@@ -19,42 +19,64 @@
 package org.apache.sling.i18n.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
 
 import java.lang.annotation.Annotation;
+import java.util.Arrays;
 import java.util.Locale;
+import java.util.ResourceBundle;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.i18n.impl.JcrResourceBundleProvider.Key;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.osgi.framework.BundleContext;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test case to verify that each bundle is only loaded once, even
  * if concurrent requests for the same bundle are made.
  */
 @RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(Parameterized.class)
 @PrepareForTest(JcrResourceBundleProvider.class)
 public class ConcurrentJcrResourceBundleLoadingTest {
 
+    @Parameterized.Parameters(name = "preload_bundles={0}")
+    public static Iterable<? extends Object> PRELOAD_BUNDLES() {
+        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJcrResourceBundleLoadingTest.class);
+
     @Mock JcrResourceBundle english;
     @Mock JcrResourceBundle german;
 
+    @Parameterized.Parameter public Boolean preload = Boolean.FALSE;
+
     private JcrResourceBundleProvider provider;
 
     @Before
@@ -69,7 +91,7 @@
 
             @Override
             public boolean preload_bundles() {
-                return false;
+                return preload;
             }
 
             @Override
@@ -155,4 +177,35 @@
         verifyPrivate(provider, times(2)).invoke("createResourceBundle", any(ResourceResolver.class), eq(null), eq(Locale.ENGLISH));
         verifyPrivate(provider, times(2)).invoke("createResourceBundle", any(ResourceResolver.class), eq(null), eq(Locale.GERMAN));
     }
+
+    /**
+     * Test that when a ResourceBundle is reloaded the already cached ResourceBundle is returned (preload=true) as long as a long running
+     * call to createResourceBundle() takes. For preload=false that will be blocking in getResourceBundle() instead.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void newBundleReplacesOldBundleAfterReload() throws Exception {
+        final ResourceBundle oldBundle = provider.getResourceBundle(Locale.ENGLISH);
+        final ResourceBundle newBundle = mock(JcrResourceBundle.class);
+        final AtomicBoolean newBundleReady = new AtomicBoolean(false);
+
+        doAnswer(new Answer<ResourceBundle>() {
+            @Override public ResourceBundle answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Thread.sleep(1000);
+                newBundleReady.set(true);
+                return newBundle;
+            }
+        }).when(provider, "createResourceBundle", any(ResourceResolver.class), eq(null), eq(Locale.ENGLISH));
+
+        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                ResourceBundle expected = newBundleReady.get() ? newBundle : oldBundle;
+                assertSame(expected, provider.getResourceBundle(Locale.ENGLISH));
+            }
+        }, 0, 200, TimeUnit.MILLISECONDS);
+
+        provider.reloadBundle(new Key(null, Locale.ENGLISH));
+    }
 }
\ No newline at end of file