| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.brooklynnode; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.EntitySpec; |
| import org.apache.brooklyn.api.entity.Group; |
| import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; |
| import org.apache.brooklyn.core.effector.Effectors; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.entity.EntityAsserts; |
| import org.apache.brooklyn.core.feed.AttributePollHandler; |
| import org.apache.brooklyn.core.feed.DelegatingPollHandler; |
| import org.apache.brooklyn.core.feed.Poller; |
| import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; |
| import org.apache.brooklyn.core.test.entity.TestApplication; |
| import org.apache.brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector; |
| import org.apache.brooklyn.entity.brooklynnode.CallbackEntityHttpClient.Request; |
| import org.apache.brooklyn.entity.group.DynamicCluster; |
| import org.apache.brooklyn.util.collections.MutableList; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.http.client.methods.HttpPost; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Objects; |
| import com.google.common.collect.ImmutableMap; |
| |
| public class SelectMasterEffectorTest extends BrooklynAppUnitTestSupport { |
| private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class); |
| |
| protected BrooklynCluster cluster; |
| protected HttpCallback http; |
| protected Poller<Void> poller; |
| |
| @Override |
| @BeforeMethod(alwaysRun=true) |
| public void setUp() throws Exception { |
| super.setUp(); |
| |
| // because the effector calls wait for a state change, use a separate thread to drive that |
| poller = new Poller<Void>(app, null, false); |
| poller.scheduleAtFixedRate( |
| new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| masterFailoverIfNeeded(); |
| return null; |
| } |
| }, |
| new DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super Void>>emptyList()), |
| Duration.millis(20)); |
| poller.start(); |
| } |
| |
| @Override |
| protected void setUpApp() { |
| super.setUpApp(); |
| http = new HttpCallback(); |
| cluster = app.createAndManageChild(EntitySpec.create(BrooklynCluster.class) |
| .location(TestApplication.LOCALHOST_PROVISIONER_SPEC) |
| .configure(BrooklynCluster.MEMBER_SPEC, EntitySpec.create(BrooklynNode.class) |
| .impl(MockBrooklynNode.class) |
| .configure(MockBrooklynNode.HTTP_CLIENT_CALLBACK, http))); |
| } |
| |
| @Override |
| @AfterMethod(alwaysRun=true) |
| public void tearDown() throws Exception { |
| poller.stop(); |
| super.tearDown(); |
| } |
| |
| @Test |
| public void testInvalidNewMasterIdFails() { |
| try { |
| selectMaster(cluster, "1234"); |
| fail("Non-existend entity ID provided."); |
| } catch (Exception e) { |
| assertTrue(e.toString().contains("1234 is not an ID of brooklyn node in this cluster")); |
| } |
| } |
| |
| @Test(groups="Integration") // because slow, due to sensor feeds |
| public void testSelectMasterAfterChange() { |
| List<Entity> nodes = makeTwoNodes(); |
| EntityAsserts.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode) nodes.get(0)); |
| |
| selectMaster(cluster, nodes.get(1).getId()); |
| checkMaster(cluster, nodes.get(1)); |
| } |
| |
| @Test |
| public void testFindMaster() { |
| List<Entity> nodes = makeTwoNodes(); |
| Assert.assertEquals(((BrooklynClusterImpl)Entities.deproxy(cluster)).findMasterChild(), nodes.get(0)); |
| } |
| |
| @Test(groups="Integration") // because slow, due to sensor feeds |
| public void testSelectMasterFailsAtChangeState() { |
| http.setFailAtStateChange(true); |
| |
| List<Entity> nodes = makeTwoNodes(); |
| |
| EntityAsserts.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode)nodes.get(0)); |
| |
| try { |
| selectMaster(cluster, nodes.get(1).getId()); |
| fail("selectMaster should have failed"); |
| } catch (Exception e) { |
| // expected |
| } |
| checkMaster(cluster, nodes.get(0)); |
| } |
| |
| private List<Entity> makeTwoNodes() { |
| List<Entity> nodes = MutableList.copyOf(cluster.resizeByDelta(2)); |
| setManagementState(nodes.get(0), ManagementNodeState.MASTER); |
| setManagementState(nodes.get(1), ManagementNodeState.HOT_STANDBY); |
| return nodes; |
| } |
| |
| private void checkMaster(Group cluster, Entity node) { |
| assertEquals(node.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.MASTER); |
| assertEquals(cluster.getAttribute(BrooklynCluster.MASTER_NODE), node); |
| for (Entity member : cluster.getMembers()) { |
| if (member != node) { |
| assertEquals(member.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.HOT_STANDBY); |
| } |
| assertEquals((int)member.getAttribute(MockBrooklynNode.HA_PRIORITY), 0); |
| } |
| } |
| |
| private static class HttpCallback implements Function<CallbackEntityHttpClient.Request, String> { |
| private enum State { |
| INITIAL, |
| PROMOTED |
| } |
| private State state = State.INITIAL; |
| private boolean failAtStateChange; |
| |
| @Override |
| public String apply(Request input) { |
| if ("/v1/server/ha/state".equals(input.getPath())) { |
| if (failAtStateChange) { |
| throw new RuntimeException("Testing failure at changing node state"); |
| } |
| |
| checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/state", "mode", "HOT_STANDBY"); |
| Entity entity = input.getEntity(); |
| EntityAsserts.assertAttributeEquals(entity, BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER); |
| EntityAsserts.assertAttributeEquals(entity, MockBrooklynNode.HA_PRIORITY, 0); |
| |
| setManagementState(entity, ManagementNodeState.HOT_STANDBY); |
| |
| return "MASTER"; |
| } else { |
| switch(state) { |
| case INITIAL: |
| checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "1"); |
| state = State.PROMOTED; |
| setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority"))); |
| return "0"; |
| case PROMOTED: |
| checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "0"); |
| state = State.INITIAL; |
| setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority"))); |
| return "1"; |
| default: throw new IllegalStateException("Illegal call at state " + state + ". Request = " + input.getMethod() + " " + input.getPath()); |
| } |
| } |
| } |
| |
| public void checkRequest(Request input, String methodName, String path, String key, String value) { |
| if (!input.getMethod().equals(methodName) || !input.getPath().equals(path)) { |
| throw new IllegalStateException("Request doesn't match expected state. Expected = " + input.getMethod() + " " + input.getPath() + ". " + |
| "Actual = " + methodName + " " + path); |
| } |
| |
| String inputValue = input.getParams().get(key); |
| if(!Objects.equal(value, inputValue)) { |
| throw new IllegalStateException("Request doesn't match expected parameter " + methodName + " " + path + ". Parameter " + key + |
| " expected = " + value + ", actual = " + inputValue); |
| } |
| } |
| |
| public void setFailAtStateChange(boolean failAtStateChange) { |
| this.failAtStateChange = failAtStateChange; |
| } |
| |
| } |
| |
| private void masterFailoverIfNeeded() { |
| if (!Entities.isManaged(cluster)) return; |
| if (cluster.getAttribute(BrooklynCluster.MASTER_NODE) == null) { |
| Collection<Entity> members = cluster.getMembers(); |
| if (members.size() > 0) { |
| for (Entity member : members) { |
| if (member.getAttribute(MockBrooklynNode.HA_PRIORITY) == 1) { |
| masterFailover(member); |
| return; |
| } |
| } |
| masterFailover(members.iterator().next()); |
| } |
| } |
| } |
| |
| private void masterFailover(Entity member) { |
| LOG.debug("Master failover to " + member); |
| setManagementState(member, ManagementNodeState.MASTER); |
| EntityAsserts.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode)member); |
| return; |
| } |
| |
| public static void setManagementState(Entity entity, ManagementNodeState state) { |
| entity.sensors().set(BrooklynNode.MANAGEMENT_NODE_STATE, state); |
| } |
| |
| public static void setPriority(Entity entity, int priority) { |
| entity.sensors().set(MockBrooklynNode.HA_PRIORITY, priority); |
| } |
| |
| private void selectMaster(DynamicCluster cluster, String id) { |
| app.getExecutionContext().submit(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, ImmutableMap.of(SelectMasterEffector.NEW_MASTER_ID.getName(), id))).asTask().getUnchecked(); |
| } |
| |
| } |