blob: 2c9c69bb0dc5b30d86165cde47c5f2d5301ac6be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package brooklyn.entity.rebind;
import static org.testng.Assert.assertEquals;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import brooklyn.entity.Feed;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.event.AttributeSensor;
import brooklyn.management.Task;
import brooklyn.management.ha.HighAvailabilityMode;
import brooklyn.management.internal.LocalManagementContext;
import brooklyn.test.EntityTestUtils;
import brooklyn.test.entity.TestApplication;
import brooklyn.test.entity.TestEntity;
import brooklyn.util.http.BetterMockWebServer;
import brooklyn.util.repeat.Repeater;
import brooklyn.util.task.BasicExecutionManager;
import brooklyn.util.time.Duration;
import com.google.common.collect.Iterables;
import com.google.mockwebserver.MockResponse;
public class RebindFeedWithHaTest extends RebindTestFixtureWithApp {
private static final Logger log = LoggerFactory.getLogger(RebindFeedWithHaTest.class);
final static AttributeSensor<String> SENSOR_STRING = RebindFeedTest.SENSOR_STRING;
final static AttributeSensor<Integer> SENSOR_INT = RebindFeedTest.SENSOR_INT;
private BetterMockWebServer server;
private URL baseUrl;
@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
super.setUp();
server = BetterMockWebServer.newInstanceLocalhost();
for (int i = 0; i < 100; i++) {
server.enqueue(new MockResponse().setResponseCode(200).addHeader("content-type: application/json").setBody("{\"foo\":\"myfoo\"}"));
}
server.play();
baseUrl = server.getUrl("/");
}
@AfterMethod(alwaysRun=true)
@Override
public void tearDown() throws Exception {
super.tearDown();
if (server != null) server.shutdown();
}
@Override
protected TestApplication createApp() {
origManagementContext.getHighAvailabilityManager().changeMode(HighAvailabilityMode.MASTER);
return super.createApp();
}
@Test
public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover() throws Exception {
TestEntity origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class).impl(RebindFeedTest.MyEntityWithHttpFeedImpl.class)
.configure(RebindFeedTest.MyEntityWithHttpFeedImpl.BASE_URL, baseUrl));
EntityTestUtils.assertAttributeEqualsEventually(origEntity, SENSOR_INT, (Integer)200);
EntityTestUtils.assertAttributeEqualsEventually(origEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
assertEquals(origEntity.feeds().getFeeds().size(), 1);
origManagementContext.getRebindManager().forcePersistNow();
List<Task<?>> tasksBefore = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks();
log.info("tasks before disabling HA, "+tasksBefore.size()+": "+tasksBefore);
Assert.assertFalse(tasksBefore.isEmpty());
origManagementContext.getHighAvailabilityManager().changeMode(HighAvailabilityMode.DISABLED);
origApp = null;
Repeater.create().every(Duration.millis(20)).backoffTo(Duration.ONE_SECOND).limitTimeTo(Duration.THIRTY_SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
origManagementContext.getGarbageCollector().gcIteration();
List<Task<?>> tasksAfter = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks();
log.info("tasks after disabling HA, "+tasksAfter.size()+": "+tasksAfter);
return tasksAfter.isEmpty();
}
}).runRequiringTrue();
newManagementContext = createNewManagementContext();
newApp = (TestApplication) RebindTestUtils.rebind((LocalManagementContext)newManagementContext, classLoader);
TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(newApp.getChildren());
Collection<Feed> newFeeds = newEntity.feeds().getFeeds();
assertEquals(newFeeds.size(), 1);
// Expect the feed to still be polling
newEntity.setAttribute(SENSOR_INT, null);
newEntity.setAttribute(SENSOR_STRING, null);
EntityTestUtils.assertAttributeEqualsEventually(newEntity, SENSOR_INT, (Integer)200);
EntityTestUtils.assertAttributeEqualsEventually(newEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
}
@Test(groups="Integration", invocationCount=50)
public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailoverManyTimes() throws Exception {
testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover();
}
}